TL;DR

In this post we will be learning in a practical way everything I have been learning for the past few months regarding CodeQL for Python. I hope you like it as much as I do! :)

Learning resources

Environment SetUp

In order to be able to try out the examples this post will show, this section will help you understand what LGTM is and to set up a working codeql environment to run the queries on your end.

Remote queries

LGTM.com is a website holding github/codeql’s lgtm.com branch with an online codeql editor that lets you run any codeql snippet using the core codeql libraries.

Run an example!

As you can see, it lets you select several projects to run the query on (being able to create custom lists) and it also shows the results in a pretty way. The former example shows just a string, but using @kind path-problem (query metadata) and DataFlow::PathGraph is much prettier:

Run an example!

This post will refer you to LGTM each time there’s a codeql snippet whose behaviour may be shown.

Automation

The existence of a cloud-based CodeQL “instance” opens a wide range of ideas regarding automation. An aggressive automation clearly goes against LGTM ToS, so use this information at your own risk.

gagliardetto/lgtm-cli and JLLeitschuh/lgtm_hack_scripts let you follow repos (for them to be built by LGTM) based on GitHub API search or dependency network, create custom lists, and query already-built projects.

This automation helps measuring the impact and precission of the query, and lets you provide results for the bounty submissions if any. (see #submission).

Local queries

This is the way I’d recommend to run queries and play with them. Let’s start!

  • Clone jorgectf/codeql inside an empty folder.
  • Open the empty folder with VSCode.
  • Install the CodeQL extension.
  • Checkout Practical-CodeQL-Introduction branch:
    • Open a terminal Terminal > New Terminal and run (cd codeql/ && git checkout Practical-CodeQL-Introduction).
    • OR
    • Go to Source Control pane, click main and choose Practical-CodeQL-Introduction.
  • Go to Testing pane, expand codeql > python / ql / test > experimental > query-tests > Security > Practical-CodeQL-Introduction and click the “play”/“run” button.
  • Once the tests have finished (they will intendedly fail because the results don’t match those from .expected file) a CodeQL database should have been created.
  • Go to the CodeQL pane, click Add a CodeQL database: From a Folder and choose codeql/python/ql/test/experimental/query-tests/Security/Practical-CodeQL-Introduction/Practical-CodeQL-Introduction.testproj.
  • Find a file called query.ql inside codeql/python/ql/src/experimental/Security/Practical-CodeQL-Introduction/.
  • You are ready to go! Feel free to run any query inside query.ql by writing the desired code and running it (Right Click > CodeQL: Run Query). You may also run an specific snippet by selecting it, right-click > CodeQL: Quick Evaluation.

In case CodeQL CLI doesn’t get installed (a binary capable of running everything related to codeql) head to Extensions > CodeQL > Extension Settings, find Code QL › Cli: Executable Path, add a random string like “a” inside the input form, click outside the form (for vscode to update the value) and remove the written input. You may see a vscode notification showing that the CodeQL CLI is being installed now.

Concepts

In order to fully understand the incoming points regarding query deveploment we need to look at a few concepts (some which you may already now, but focused on CodeQL).

Source

We may understand a “source” as the very first appearance of the code whose flow we want to follow. For example, a source could be user input or a hardcoded string (matching the form of an specific string), and we will sometimes be referring to it as “tainted” data (e.g., TaintTracking: coming from TaintTracking::Configuration, a class allowing us to specify and customize the source, sink and several other parts of a flow configuration).

RemoteFlowSource

Since most of the security-related queries' focus is to check whether user input flows into a specific part of the code (e.g., a function’s argument), CodeQL introduced a structure (see #concepts-again) that compiles every user input for developers not to worry about it. (Since CodeQL is under development, some frameworks may not be introduced yet, but the objective of this structure is to hold as much as user-input-providing functions as possible)

import python
import semmle.python.dataflow.new.RemoteFlowSources

from RemoteFlowSource rfs // create a 'rfs' variable of type RemoteFlowSource
select rfs // return all of its appearances

Run it!

You may see some structures used inside an any() function. This is because the from clause can be avoided like this:

import python
import semmle.python.dataflow.new.RemoteFlowSources

select any(RemoteFlowSource rfs) // select any RemoteFlowSource appearance

Run it!

Source in Regular Expression Injection query

Given the following snippet:

@app.route("/direct")
def direct():

    unsafe_pattern = request.args["pattern"]
    re.search(unsafe_pattern, "foo")

Since the vulnerability we are looking for happens when user input flows into the first argument of a regular expression operation (regular expression injection), the source here would be request.args["pattern"]. Even though there are other ways to model this vulnerability (as seen below), the source of the flow will stay the same because request.args["pattern"] is the very first appearance of user input (the exact thing whose flow we want to track).

@app.route("/compiled")
def compiled():

    unsafe_pattern = request.args["pattern"]
    compiled_pattern = re.compile(unsafe_pattern)
    compiled_pattern.search("foo")

@app.route("/inline")
def inline():

    unsafe_pattern = request.args["pattern"]
    re.compile(unsafe_pattern).search("foo")

(see #advanced-modeling)

Source in LDAP Injection query

Given the following snippets (python2 and python3 examples):

@app.route("/normal2")
def normal2():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(foo={})".format(unsafe_filter)

    ldap_connection = ldap.initialize("ldap://127.0.0.1")
    user = ldap_connection.search_s(
        dn, ldap.SCOPE_SUBTREE, search_filter)
@app.route("/normal3")
def normal3():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(user={})".format(unsafe_filter)

    srv = ldap3.Server('ldap://127.0.0.1')
    conn = ldap3.Connection(srv, user=dn, auto_bind=True)
    conn.search(dn, search_filter)

In both cases (ldap injection), the source keeps being user input like request.args['dc'] and request.args['username'].

(see #advanced-modeling)

Source in XXE query

@app.route("/lxml.etree.fromstring")
def lxml_fromstring():

    xml_content = request.args['xml_content']
    return lxml.etree.fromstring(xml_content).text


@app.route("/lxml.etree.XML")
def lxml_XML():

    xml_content = request.args['xml_content']
    return lxml.etree.XML(xml_content).text


@app.route("/lxml.etree.parse")
def lxml_parse():

    xml_content = request.args['xml_content']
    return lxml.etree.parse(StringIO(xml_content)).text

Yet again (XXE), the very first appearance of user input is request.args['xml_content'] and so our source would be.

(see #advanced-modeling)

Sink

As the opposite of what a source is, the “sink” is the last place our source has to arrive to be vulnerable.

Given this simple snippet:

@app.route("/demo")
def demo():

    cmd = request.args["pattern"]
    result = os.popen(cmd).read() # [1]
    return f"{cmd} has returned {result}" # [2]

As you may have noticed, it’s clear that the actual last place request.args["pattern"] (our source) appears/flows to (i.e., where our source sinks) is [2] ({cmd} and {result} respectively), but according to the previous definition, the actual sink in this query (last place to be vulnerable) would be [1] (os.popen(cmd).read()).

Sink in Regular Expression Injection query

Given the following snippet:

@app.route("/direct")
def direct():

    unsafe_pattern = request.args["pattern"]
    re.search(unsafe_pattern, "foo")

The sink in this example would be re.search’s first argument (i.e., the first argument of re’s search method call) unsafe_pattern.

@app.route("/inline")
def inline():

    unsafe_pattern = request.args["pattern"]
    re.compile(unsafe_pattern).search("foo")

This example’s vulnerable call is a bit tricky, because it embeds a method call inside of another method call. The sink would be re.compile’s first argument (i.e., the first argument of re’s compile method call) being then the object of re’s search method call.

This snippet may help to understand the approach:

import python
import semmle.python.dataflow.new.DataFlow

from DataFlow::MethodCallNode mc, DataFlow::Node mco
where
    mc.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mco = mc.getObject() and
    mco instanceof DataFlow::MethodCallNode
select mco, mc

Run it!

@app.route("/compile")
def compile():

    unsafe_pattern = request.args["pattern"]
    compiled_pattern = re.compile(unsafe_pattern)
    compiled_pattern.search("")

In this example, the approach would be the same, but we have to uncover getALocalSource(), a function in charge of finding where the variable comes from (i.e., gets declared).

import python
import semmle.python.dataflow.new.DataFlow

from DataFlow::MethodCallNode mc, DataFlow::Node mco
where
    mc.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mco = mc.getObject().getALocalSource() and
    mco instanceof DataFlow::MethodCallNode
select mco, mc

Run it!

However, we are facing some false positives (that actually make sense). This happens because we are not restricting the MethodCallNodes to the re library ones, we are just looking for specific structures.

(see #advanced-modeling)

Sink in LDAP Injection query

Given the following snippet:

@app.route("/normal2")
def normal2():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(user={})".format(unsafe_filter)

    ldap_connection = ldap.initialize("ldap://127.0.0.1")
    user = ldap_connection.search_s(
        dn, ldap.SCOPE_SUBTREE, search_filter)

We can use MethodCallNode and getALocalSource() again to find ldap_connection.search_s’s first/third argument (being ldap_connection a variable holding ldap.initialize); in other words, we have to find the first/third argument of a method call whose object’s local source is a call to ldap’s initialize method.

@app.route("/normal3")
def normal3():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(user={})".format(unsafe_filter)

    srv = ldap3.Server('ldap://127.0.0.1')
    conn = ldap3.Connection(srv, user=dn, auto_bind=True)
    conn.search(dn, search_filter)

In this python3 example, the modeling would have to search for the first/second argument of conn.search call, being conn a variable holding ldap3.Connection whose first argument local source is ldap3.Server.

(see #advanced-modeling)

Sink in XXE query

Given the following snippet:

@app.route("/lxml.etree.fromstring")
def lxml_fromstring():

    xml_content = request.args['xml_content']
    return lxml.etree.fromstring(xml_content).text

The sink would be the first argument of lxml.etree.fromstring call.

@app.route("/lxml.etree.parse")
def lxml_parse():

    xml_content = request.args['xml_content']
    return lxml.etree.parse(StringIO(xml_content)).text

In this example, the sink is again the first arg… wait… the first argument of lxml.etree.parse is actually StringIO(xml_content)! Does that mean that we should be looking for the first argument of StringIO call, being it the first argument of lxml.etree.parse? That would imply modeling all of these related functions just for the sink (like BytesIO).

That would be a bad practice in terms of effectiveness, since the former modeling idea would not cover the following example, and we would be loosing the power of taint flow analysis (taint tracking).

@app.route("/lxml.etree.parse")
def lxml_parse():

    xml_content = request.args['xml_content']
    xml_content = StringIO(xml_content)

    return lxml.etree.parse(xml_content).text

Because of that, we should be using some taint tracking configuration predicates like isAdditionalTaintStep and Sanitizers.

Taint tracking configuration predicates

This predicates are like “extras” that let us specify some details for our taint tracking configuration.

Additional taint steps

Additional taint steps let us specify additional “jumps” that the flow may make in order to “bypass” known functions. If specified, once the flow ends (the specified source doesn’t flow anymore) CodeQL applies the specified steps and continues looking for flow.

For example:

@app.route("/lxml.etree.parse")
def lxml_parse():

    xml_content = request.args['xml_content']
    xml_content = StringIO(xml_content)

    return lxml.etree.parse(xml_content).text

CodeQL taint tracking will see that request.args['xml_content'] flows to StringIO(here), and would stop since the next step would be lxml.etree.parse(here), but here here would be StringIO(request.args['xml_content']) instead of being just request.args['xml_content']. In other words, the first argument of lxml.etree.parse is seen as the result of StringIO (even though the code is vulnerable). This happens because CodeQL may stop taint flow analysis if the tainted data flows into a function that changes its content. In this case, StringIO returns a file’s filename whose content is the provided argument.

To make it work, we should specify an additional taint step: StringIO’s first argument being the nodeFrom and StringIO’s entire call being the nodeTo.

(isAdditionalTaintStep predicate override inside a taint tracking configuration)

override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
  exists(DataFlow::CallCfgNode ioCalls |
    ioCalls = API::moduleImport("io").getMember(["StringIO", "BytesIO"]).getACall() and
    nodeFrom = ioCalls.getArg(0) and
    nodeTo = ioCalls
  )
}

Example

import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode ioCalls, DataFlow::Node nodeFrom, DataFlow::Node nodeTo
where 
    ioCalls.getLocation().getFile().getBaseName().matches("XXE%") and // just restricting the file to be queried
    ioCalls = API::moduleImport("io").getMember(["StringIO", "BytesIO"]).getACall() and
    nodeFrom = ioCalls.getArg(0) and
    nodeTo = ioCalls
select ioCalls, nodeFrom, nodeTo

Run it!

Sanitizers

Sanitizers, as the opposite to additional taint steps, let us specify functions or behaviours we don’t want CodeQL flow to follow. If specified, each time the flow makes an step, it will be checking that this specific step/behaviour isn’t specified as a sanitizer (if it is, the flow will stop).

For example, given the following snippet:

@app.route("/direct")
def direct():

    unsafe_pattern = request.args['pattern']
    safe_pattern = re.escape(unsafe_pattern)
    re.search(safe_pattern, "")

In case CodeQL saw re.escape as a function that doesn’t decontaminates the source (it keeps being tainted and so the flow wouldn’t stop) we should be specifying it as a sanitizer behaviour.

Specifying re.escape’s first argument (i.e., the first argument of re’s escape method call) as the node argument of isSanitizer, if CodeQL’s flow is in that position, it will stop.

override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
}

Example

Furthermore, we may use isSanitizerGuard to specify another situation that we want the flow to stop in. For example, StringConstCompare:

(According to its qldoc: A validation of unknown node by comparing with a constant string value.)

override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
    guard instanceof StringConstCompare
}

Example

Concepts ¿again?

CodeQL “Concepts” is a structure in charge of holding many different modelings inside the same structure depending if its Range is extended.

For example, RemoteFlowSource’s modeling (although it is not inside Concepts.qll) is quite the same. This is how flask’s request is developed to extend RemoteFlowSource. Because of that, everytime RequestFlowSource is used with python/ql/lib/semmle/python/frameworks/Flask.qll imported, it makes a reference to the extended structure.

As an example of an actual Concept inside Concepts.qll, we may see LDAPQuery from the LDAP Injection query:

  • Concept declaration inside Concepts.qll.
  • Extending the concept with python2’s LDAP code modeling.
  • Extending the concept with python3’s LDAP code modeling.
  • Using the concept to match all the extended APIs.

Concepts' predicates are created for Concepts to be fully customizable. (Notice that RemoteFlowSource does not have any useful predicate while LDAPQuery has a getQuery one, for LDAPQuery to be used as a call to a search call modeling and its getQuery as the compilation of all inputs belonging to a LDAP query).

Query development

Basic approaches

  • Finding calls of library methods:
import semmle.python.ApiGraphs

select API::moduleImport("re").getAMember().getACall()

Run it!

This let us get a general idea of how the library is being used along the codebase. You can specify the method name using getMember(name).

  • Getting arguments from method calls:
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode call, DataFlow::Node arg
where
  call = API::moduleImport(_).getAMember().getACall() and
  arg in [call.getArgByName("auto_bind"), call.getArg(2)]
select arg

Run it!

We are using _ in API::moduleImport(_) to get all the module imports as if we used getMember(_) instead of getAMember(). The query finds any call for a library.method and gets the keyword argument auto_bind and the third positional argument.

  • Finding calls of any-level library methods:
import semmle.python.ApiGraphs

select API::moduleImport("re").getAMember*().getACall()

Run it!

  • Finding a string that flows to an argument:
import python
import semmle.python.ApiGraphs

from StrConst str
where
  DataFlow::exprNode(str)
      .(DataFlow::LocalSourceNode)
      .flowsTo(API::builtin("eval").getACall().getArg(0))
select str.getText()

Run it!

  • Finding a specific string that flows to an argument:
import python
import semmle.python.ApiGraphs

from StrConst str
where
  str.getText().matches("second%") and
  DataFlow::exprNode(str)
      .(DataFlow::LocalSourceNode)
      .flowsTo(API::builtin("eval").getACall().getArg(0))
select str.getText()

Run it!

See this documentation page for the rest of matches() alike functions.

More examples.

  • Void predicates:
predicate doSomething() { any() }

where doSomething()
select "Hello predicate!"

Run it!

predicate doSomething() { none() }

where doSomething()
select "Hello predicate!"

Run it!

any()/none() can also be a condition.

import semmle.python.ApiGraphs

predicate doSomething() { 
  exists(API::moduleImport("re").getMember("match").getACall()) 
}

where doSomething()
select "Hello predicate!"

Run it!

import semmle.python.ApiGraphs

predicate doSomething() { 
  exists(API::moduleImport("re").getMember("motch").getACall()) 
}

where doSomething()
select "Hello predicate!"

Run it!

doSomething() will success and continue the execution if calls to re.m(a|o)tch exist.

  • Creating a custom class and querying it:
import python

class CustomClass extends StrConst {
  CustomClass() { this.getText().matches("this%") }

  predicate doSomething() { this.getText().matches("%demo%") }
}

from CustomClass a
where a.doSomething()
select a.getText()

Run it!

This would be the same as using StrConst directly in the from clause and setting the conditionals in the where clause.

Codebase distribution

Just before digging into pure query development, let’s see how the code is distributed along the codebase.

Placed inside codeql/python/ql/src/experimental/Security/CWE-XXX should be the main query (LDAPInjection.ql), the .qhelp file (see #documentation) and simple examples of what the query covers. Example

Placed inside codeql/python/ql/test/experimental/query-tests/Security/CWE-XXX should be all the tests for the query, a .qlref pointing to the previous “main” query and a .expected (see #tests).

Placed inside codeql/python/ql/src/experimental/semmle/python should be the rest of the modeling:

  • Concepts
    • ...experimental/semmle/python/Concepts.qll
  • Frameworks and libraries' modeling:
    • ...experimental/semmle/python/frameworks
    • ...experimental/semmle/python/libraries
    • ...experimental/semmle/python/templates
  • TaintTracking configurations and query-specific modeling:
    • ...experimental/semmle/python/security

Modeling

Concepts

The first thing we should be doing while developing a query is thinking about a proper way of making its Concepts.

RegexExecution and RegexEscape modeling

LDAPQuery and LDAPEscape modeling

XMLParsing and XMLParser modeling

JWTEncoding and JWTDecoding modeling

As you may have noticed, the above concepts cover the main point of the query (without taking into account the vulnerability itself).

Frameworks/Libraries

In this stage, we should be modeling the libraries or frameworks related to the vulnerability extending the Concepts:

re (Regex Injection) modeling

LDAP (LDAP Injection) modeling

XML (XXE) modeling

JWT libraries modeling:

Taint tracking configuration and query-specific modeling

Finally, it’s time to model the taint tracking config (source, sink, sanitizers and/or additional taint steps):

class QUERYFlowConfig extends TaintTracking::Configuration {
  QUERYFlowConfig() { this = "QUERYFlowConfig" }

  override predicate isSource(DataFlow::Node source) { 
    source instanceof SOURCE // [1]
  }

  override predicate isSink(DataFlow::Node sink) { 
    sink instanceof SINK // [2]
  }

  override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) { 
    guard instanceof StringConstCompare // [3]
  } 

  override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) { // [4]
      nodeFrom instanceof PREV_STEP and
      nodeTo instanceof NEXT_STEP
  }
}

This is the usual syntax of a taint tracking configuration. You may add or remove any predicate as you wish (unless sink/source).

[1] is where the source is declared. When it comes to any injection, it tends to be related to user input flowing into a specific function, for that we would be using RemoteFlowSource like source instanceof RemoteFlowSource (from semmle.python.dataflow.new.RemoteFlowSources) (see #remoteflowsource)

[2] is where the sink is declared. For this we may have a concept like LDAPQuery with a getQuery predicate. To use it we may write sink = any(LDAPQuery foo).getQuery(). You may use any() to be easily readable, but you could also use an exists() clause declaring a variable of type LDAPQuery and setting the sink to that variable’s .getQuery():

exists(LDAPQuery lq |
    sink = lq.getQuery()
)

[3] is where the sanitizer guard is declared (it could also be a Sanitizer with isSanitizer()). (see #sanitizers)

[4] is where the additional taint step is declared. (see #additional-taint-steps)

When it comes to query-specific modeling, it is everything related to the objective of the query rather than the library modeling involved.

For example, here the query-specific modeling is using LDAPBind (a modeling based on the ldap package) to get those who hold a None, empty or unset password.

Basic example of a taint tracking configuration

/**
 * A taint-tracking configuration for detecting code injections.
 */
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
  CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { 
    source instanceof RemoteFlowSource 
  }

  override predicate isSink(DataFlow::Node sink) {
    sink = API::builtin("eval").getACall().getArg(0)
  }
}

This taint tracking configuration will detect all RemoteFlowSources flowing to the first argument of any eval call.

Let’s give it a try against this following snippet:

from flask import Flask, request

app = Flask(__name__)

@app.route("/flow1")
def flow1():
    code = request.args["code"]
    eval(code)


@app.route("/flow2")
def flow2():
    email = request.args["email"]
    eval("./send_email {email}".format(email=email))


def flow3_extra(text):
    return text.split("\n")

@app.route("/flow3")
def flow3():
    text = request.args["text"]
    eval(flow3_extra(text))


@app.route("/flow4")
def flow4():
    text = request.args["text"]
    tixt = text
    toxt = flow3_extra(tixt)
    tuxt = toxt
    eval(tuxt)


@app.route("/flow1_good")
def flow1_good():
    code = request.args["code"]
    if code == "print('Hello, Wo... CodeQL!')":
        eval(code)

In this snippet we are testing:

  • A simple flow with flow1 in which the GET parameter code gets assigned to a variable, and then that variable is used as the first argument of an eval call.
  • A flow flow2 in which the GET parameter email gets assigned to a variable, and then that variable used as an argument of the formatting of the string being used as the first argument of an eval call.
  • A tricky flow involving a function flow3 in which the GET parameter text gets assigned to a variable, and then that variable is used as the first argument to flow3_extra, who returns the text splitted by \n (LF) and is used as the first argument of an eval call.
  • A longer flow flow4 in which the GET parameter text gets assigned to a variable, who is then assigned to another, then used as the first argument of floe3_extra who splits the argument by \n returning it and assigning it to another variable who is then assigned to another one and then used as the first argument of an eval call.

Our query would be something like this:

/*
 * @kind path-problem
 */
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph

/**
 * A taint-tracking configuration for detecting code injections.
 */
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
  CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { 
    source instanceof RemoteFlowSource 
  }

  override predicate isSink(DataFlow::Node sink) {
    sink= API::builtin("eval").getACall().getArg(0)
  }
}

from CodeInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
    config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ eval argument comes from a $@",
    sink.getNode(), "This", source.getNode(), "user-provided value"

Run it!

Basically we are telling CodeQL to give us every source and sink when a configuration holding both of them check source to be a RemoteFlowSource and sink the first argument to an eval call. Since we are using DataFlow::PathNodes and @kind path-problem, the results will get displayed in a way that the flow can be easily followed (i.e., each step/jump is shown).

As you may have seen, all but flow1_good functions are vulnerable even though this query flags them all. As shown in #sanitizers, we can add a sanitizer guard like StringConstCompare to avoid CodeQL passing through the == comparison.

/*
 * @kind path-problem
 */
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.BarrierGuards
import DataFlow::PathGraph

/**
 * A taint-tracking configuration for detecting code injections.
 */
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
  CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { 
    source instanceof RemoteFlowSource 
  }

  override predicate isSink(DataFlow::Node sink) {
    sink= API::builtin("eval").getACall().getArg(0)
  }

  override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
    guard instanceof StringConstCompare
  }
}

from CodeInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
    config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ eval argument comes from a $@",
    sink.getNode(), "This", source.getNode(), "user-provided value"

Run it!

Tests

Basic tests

These are placed inside codeql/python/ql/src/experimental/Security/CWE-XXX, perhaps inside a subfolder if they are several. This tests are in charge of showing a basic pattern the query matches. Example

Advanced tests

These are placed inside codeql/python/ql/test/experimental/query-tests/Security/CWE-XXX and should cover every different situation of vulnerable and non-vulnerable (with sanitizers) code patterns. Example

The .qlref file will tell the test runner where the query using those tests is. The .expected file will be firstly empty and then it can be filled with the results of the tests for the CodeQL engineers (or reviewers) to check that the query is meeting its expected results.

Documentation (qhelp and qldocs)

The .qhelp file is an explanation of the query providing an overview of it, the recommendations to fix the vulnerability, an example of what the query looks for (basic tests) and references.

qldocs is the term to refer to the documentation inside the code. Since CodeQL can be a bit tricky while modeling large patterns, the documentation helps everyone to understand the approach taken:

  • XXE qldocs example.
  • Email injection advanced modeling’s qldocs example.

Submission

Once the query is finished and found some results (at least a new/already discovered CVE) you may make a Pull Request to github/codeql (the latest prerequisite is just for CodeQL’s bug bounty program, see its hackerone hacktivity) and open an issue in github/securitylab.

Advanced query modeling

In this section we will be covering each part of the following queries to see specific examples.

Regular expression injection

Concepts

The main “structures” we will need in the taint tracking configuration are:

  • Regular expression executions (calls to a regex operation) (link) with:
    • A predicate to get the input argument holding the regular expression.
    • A predicate to get the method being used.
  • Regular expression escaping functions with a predicate to get the input being escaped. (link)

Let’s populate them!

re library modeling

re.qll

To begin with, we should be compiling all re methods executing a provided regex:

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

Then it is easy to check if a used method is inside the ones we are looking for: (In the actual modeling, the method in [1]is not hardcoded, it is statically obtained)

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from string method
where 
    method = "search" and
    method instanceof RegexExecutionMethods
select method

Run it!

Let’s model the execution methods:

@app.route("/direct")
def direct():

    unsafe_pattern = request.args['pattern']
    safe_pattern = re.escape(unsafe_pattern)
    re.search(safe_pattern, "")


@app.route("/compile")
def compile():

    unsafe_pattern = request.args['pattern']
    safe_pattern = re.escape(unsafe_pattern)
    compiled_pattern = re.compile(safe_pattern)
    compiled_pattern.search("")
  • Finding re.(RegexExecutionMethods):
import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from DataFlow::CallCfgNode c
where
    c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    c = API::moduleImport("re").getMember(any(RegexExecutionMethods r)).getACall()
select c

Run it!

  • Finding a MethodCallNode whose object is re.compile:
import semmle.python.ApiGraphs

from DataFlow::MethodCallNode mcn
where
    mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mcn.getObject() = API::moduleImport("re").getMember("compile").getACall()
select mcn

Run it!

However, we won’t be getting the case in which re.compile is previously assigned to a variable. To achieve that we are going to use getALocalSource(). Since re.compile()’s local source node is re.compile(), we will be getting all the cases including when the call is variable.search() where variable = re.compile().

import semmle.python.ApiGraphs

from DataFlow::MethodCallNode mcn
where
    mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall()
select mcn

Run it!

Moreover, you may hace noticed that, even though we will have to make a concept using getRegex to hold the argument to re.compile and we could have made this snippet using a CallCfgNode as the main variable (instead of a CallCfgNode) to make it more visual, turns out RegexExecution will hold calls to re.(RegexExecutionMethods), so this way the predicate will be easier to deal with. Otherwise, we would have to create a class variable to correlate the MethodCallNode with the class' CallCfgNode.

  • Restricting MethodCallNode method to RegexExecutionMethods:
import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from DataFlow::MethodCallNode mcn
where
    mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall() and
    mcn.getMethodName() instanceof RegexExecutionMethods
select mcn

Run it!

  • Getting the arguments:
import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from DataFlow::CallCfgNode c
where
    c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    c = API::moduleImport("re").getMember(any(RegexExecutionMethods r)).getACall()
select c, c.getArg(0)

Run it!

In this example, getting the used re method directly from c would be a bit tricky since we would have to get the attribute of the function of the call node like .getFunction().asExpr().(Attribute) which would only work under certain conditions. Because of this complexity (done here in a query-specific modeling), we can create a string variable instance of RegexExecutionMethods, use it inside getMember and use it then in the select clause like:

import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from DataFlow::CallCfgNode c, string method
where
    c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    method instanceof RegexExecutionMethods and
    c = API::moduleImport("re").getMember(method).getACall()
select c, c.getArg(0), method

Run it!

To accomplish this later in the class modeling we will be creating a class variable to be able to share it between predicates.

import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

from DataFlow::MethodCallNode mcn
where
    mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
    mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall() and
    mcn.getMethodName() instanceof RegexExecutionMethods
select mcn, mcn.getArg(0), mcn.getMethodName()

Run it!

  • Making DirectRegex and CompiledRegex classes:
import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

class DirectRegex extends DataFlow::CallCfgNode {
  string reMethod;

  DirectRegex() {
    this = API::moduleImport("re").getMember(reMethod).getACall() and
    reMethod instanceof RegexExecutionMethods
  }

  DataFlow::Node getRegex() { result = this.getArg(0) }

  DataFlow::Node getReMethod() { result = this }

  string getReMethodName() { result = "re." + reMethod }
}

from DirectRegex re
select re, re.getRegex(), re.getReMethodName()

Run it!

import semmle.python.ApiGraphs

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

class CompiledRegex extends DataFlow::MethodCallNode {
  DataFlow::CallCfgNode compileCall;

  CompiledRegex() {
    compileCall = API::moduleImport("re").getMember("compile").getACall() and
    this.getObject().getALocalSource() = compileCall and
    this.getMethodName() instanceof RegexExecutionMethods
  }

  DataFlow::Node getRegex() { result = compileCall.getArg(0) }

  DataFlow::Node getReMethod() { result = compileCall }

  string getReMethodName() { result = "re.compile" }
}

from CompiledRegex re
select re, re.getRegex(), re.getReMethodName()

Run it!

  • Merging and extending RegexExecution::Range:
import semmle.python.ApiGraphs

module RegexExecution {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getRegex();

    abstract DataFlow::Node getReMethod();

    abstract string getReMethodName();
  }
}

class RegexExecution extends DataFlow::Node {
  RegexExecution::Range range;

  RegexExecution() { this = range }

  DataFlow::Node getRegex() { result = range.getRegex() }

  DataFlow::Node getReMethod() { result = range.getReMethod() }

  string getReMethodName() { result = range.getReMethodName() }
}

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
  string reMethod;

  DirectRegex() {
    this = API::moduleImport("re").getMember(reMethod).getACall() and
    reMethod instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = this.getArg(0) }

  override DataFlow::Node getReMethod() { result = this }

  override string getReMethodName() { result = "re." + reMethod }
}

class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
  DataFlow::CallCfgNode compileCall;

  CompiledRegex() {
    compileCall = API::moduleImport("re").getMember("compile").getACall() and
    this.getObject().getALocalSource() = compileCall and
    this.getMethodName() instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = compileCall.getArg(0) }

  override DataFlow::Node getReMethod() { result = compileCall }

  override string getReMethodName() { result = "re.compile" }
}


from RegexExecution re
select re, re.getRegex(), re.getReMethodName()

Run it!

Let’s model the sanitizer!

  • Finding the call:
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode reEscapeCall
where reEscapeCall = API::moduleImport("re").getMember("escape").getACall()
select reEscapeCall

Run it!

  • Getting the argument:
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode reEscapeCall, DataFlow::Node reEscaped
where
  reEscapeCall = API::moduleImport("re").getMember("escape").getACall() and
  reEscaped = reEscapeCall.getArg(0)
select reEscapeCall, reEscaped

Run it!

Since there’s only one escape call related to regular expressions in re that we know of, we will be skipping the escape concept (it’s a good practice though, since it makes it very easy to extend the escape concept with a new call and all the queries depending on these regex escape calls would automatically use it).

Taint tracking configuration

class RegexInjectionFlowConfig extends TaintTracking::Configuration {
  RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

  override predicate isSink(DataFlow::Node sink) { sink = any(RegexExecution re).getRegex() }

  override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
  }
}

With this configuration, CodeQL will try to find every user input which flows to the first argument of any regex-executing call without passing through the first argument of re.escape call.

Complete query

/**
 * @name Regular expression injection
 * @description User input should not be used in regular expressions without first being escaped,
 *              otherwise a malicious user may be able to inject an expression that could require
 *              exponential time on certain inputs.
 * @kind path-problem
 * @problem.severity error
 * @id py/regex-injection
 * @tags security
 *       external/cwe/cwe-730
 *       external/cwe/cwe-400
 */

import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph

module RegexExecution {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getRegex();

    abstract DataFlow::Node getReMethod();

    abstract string getReMethodName();
  }
}

class RegexExecution extends DataFlow::Node {
  RegexExecution::Range range;

  RegexExecution() { this = range }

  DataFlow::Node getRegex() { result = range.getRegex() }

  DataFlow::Node getReMethod() { result = range.getReMethod() }

  string getReMethodName() { result = range.getReMethodName() }
}

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
  string reMethod;

  DirectRegex() {
    this = API::moduleImport("re").getMember(reMethod).getACall() and
    reMethod instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = this.getArg(0) }

  override DataFlow::Node getReMethod() { result = this }

  override string getReMethodName() { result = "re." + reMethod }
}

class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
  DataFlow::CallCfgNode compileCall;

  CompiledRegex() {
    compileCall = API::moduleImport("re").getMember("compile").getACall() and
    this.getObject().getALocalSource() = compileCall and
    this.getMethodName() instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = compileCall.getArg(0) }

  override DataFlow::Node getReMethod() { result = compileCall }

  override string getReMethodName() { result = "re.compile" }
}

class RegexInjectionFlowConfig extends TaintTracking::Configuration {
  RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

  override predicate isSink(DataFlow::Node sink) { sink = any(RegexExecution re).getRegex() }

  override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
  }
}

from RegexInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ regular expression is constructed from a $@.",
  sink.getNode(), "This", source.getNode(), "user-provided value"

Run it!

In order to display getReMethod and getReMethodName in the select clause we have to create a query-specific modeling to act as a sink, who we can access by casting the sink and then we can access the predicates.

/**
 * @name Regular expression injection
 * @description User input should not be used in regular expressions without first being escaped,
 *              otherwise a malicious user may be able to inject an expression that could require
 *              exponential time on certain inputs.
 * @kind path-problem
 * @problem.severity error
 * @id py/regex-injection
 * @tags security
 *       external/cwe/cwe-730
 *       external/cwe/cwe-400
 */

import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph

module RegexExecution {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getRegex();

    abstract DataFlow::Node getReMethod();

    abstract string getReMethodName();
  }
}

class RegexExecution extends DataFlow::Node {
  RegexExecution::Range range;

  RegexExecution() { this = range }

  DataFlow::Node getRegex() { result = range.getRegex() }

  DataFlow::Node getReMethod() { result = range.getReMethod() }

  string getReMethodName() { result = range.getReMethodName() }
}

class RegexExecutionMethods extends string {
  RegexExecutionMethods() {
    this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
  }
}

class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
  string reMethod;

  DirectRegex() {
    this = API::moduleImport("re").getMember(reMethod).getACall() and
    reMethod instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = this.getArg(0) }

  override DataFlow::Node getReMethod() { result = this }

  override string getReMethodName() { result = "re." + reMethod }
}

class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
  DataFlow::CallCfgNode compileCall;

  CompiledRegex() {
    compileCall = API::moduleImport("re").getMember("compile").getACall() and
    this.getObject().getALocalSource() = compileCall and
    this.getMethodName() instanceof RegexExecutionMethods
  }

  override DataFlow::Node getRegex() { result = compileCall.getArg(0) }

  override DataFlow::Node getReMethod() { result = compileCall }

  override string getReMethodName() { result = "re.compile" }
}

class RegexInjectionSink extends DataFlow::Node {
  string reMethodName;
  DataFlow::Node reMethod;

  RegexInjectionSink() {
    exists(RegexExecution reExec |
      this = reExec.getRegex() and
      reMethod = reExec.getReMethod() and
      reMethodName = reExec.getReMethodName()
    )
  }

  DataFlow::Node getReMethod() { result = reMethod }

  string getReMethodName() { result = reMethodName }
}

class RegexInjectionFlowConfig extends TaintTracking::Configuration {
  RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

  override predicate isSink(DataFlow::Node sink) { sink instanceof RegexInjectionSink }

  override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
  }
}

from RegexInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
  "$@ regular expression is constructed from a $@ and executed by $@.", sink.getNode(), "This",
  source.getNode(), "user-provided value", sink.getNode().(RegexInjectionSink).getReMethod(),
  sink.getNode().(RegexInjectionSink).getReMethodName()

Run it!

LDAP Injection

Concepts

The main “structures” we will need in the taint tracking configuration are:

  • LDAP queries with a predicate to get the argument holding the query to be executed. (Link)
  • LDAP escape functions with a predicate to get the argument holding the input being escaped. (Link)

Let’s populate them!

LDAP library modeling

LDAP.qll

LDAP 2

The first thing done is the creation of a class holding all the methods in charge of executing an LDAP query, let’s begin with python2’s ldap:

private class LDAP2QueryMethods extends string {
    LDAP2QueryMethods() {
        this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"]
    }
}

Then it would be easy to check whether a used method is inside the ones we are looking for: (In the actual modeling, the method is not hardcoded)

class LDAP2QueryMethods extends string {
    LDAP2QueryMethods() {
        this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"]
    }
}

from string method
where 
    method = "search_s" and
    method instanceof LDAP2QueryMethods
select method

Run it!

Let’s model the query methods:

@app.route("/normal2")
def normal2():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(foo={})".format(unsafe_filter)

    ldap_connection = ldap.initialize("ldap://127.0.0.1")
    user = ldap_connection.search_s(
        dn, ldap.SCOPE_SUBTREE, search_filter)
  • Finding ldap.initialize():
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode c
where
    c.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
    c = API::moduleImport("ldap").getMember("initialize").getACall()
select c

Run it!

API::moduleImport("ldap") will return a reference to the ldap library, getMember("initialize") will return an instance to ldap’s initialize method, and getACall() will get a call like ldap.initialize("ldap://127.0.0.1").

  • Finding a MethodCallNode whose object local source is the call found previously:
import semmle.python.ApiGraphs

from DataFlow::MethodCallNode searchMethod
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall()
select searchMethod

Run it!

You may have noticed that the main modeling uses DataFlow::AttrRead, a structure meant for an object’s attribute read. I can’t remember why I used it, perhaps because MethodCallNode wasn’t introduced yet, but anyway, we will be using MethodCallNode.

  • Restricting searchMethod to LDAP2QueryMethods:
import semmle.python.ApiGraphs

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

from DataFlow::MethodCallNode searchMethod
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall()
select searchMethod

Run it!

It is faster to restrict searchMethod’s method name before computing that its object’s local source equals a call to ldap.initialize, because if there’s no LDAP2QueryMethods, the latest won’t be computed.

  • Getting the arguments:
import semmle.python.ApiGraphs

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

from DataFlow::MethodCallNode searchMethod, DataFlow::Node arg
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall() and
    arg in [searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")]
select searchMethod, arg

Run it!

arg in [] means that CodeQL will execute the code following that statement setting arg to each object inside the list.

  • Making a LDAP2Query class:
import semmle.python.ApiGraphs

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

class LDAP2Query extends DataFlow::CallCfgNode {
  DataFlow::MethodCallNode searchMethod;

  LDAP2Query() {
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall() and
    this = searchMethod.(DataFlow::CallCfgNode)
  }

  DataFlow::Node getQuery() {
    result in [
        searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
      ]
  }
}

from LDAP2Query lq
select lq, lq.getQuery()

Run it!

Notice that, in order to use searchMethod in a class predicate (getQuery) we need to create a class variable DataFlow::MethodCallNode searchMethod;. Otherwise, the only thing correlating getQuery with its own LDAP2Query would be this and so we should make more type casting than this way.

  • Extending LDAPQuery::Range:
import semmle.python.ApiGraphs

module LDAPQuery {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getQuery();
  }
}

class LDAPQuery extends DataFlow::Node {
  LDAPQuery::Range range;

  LDAPQuery() { this = range }

  DataFlow::Node getQuery() { result = range.getQuery() }
}

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP2Query() {
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall() and
    this = searchMethod.(DataFlow::CallCfgNode) // [1]
  }

  override DataFlow::Node getQuery() {
    result in [
        searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
      ]
  }
}

from LDAPQuery lq // [2]
select lq, lq.getQuery() // [3]

Run it!

Notice the use of this inside LDAP2Query [1] (since calling LDAPQuery will return all query calls and so this refers to LDAP2Query being the call to a LDAP2QueryMethods method), and the use of LDAPQuery in the select statement [2] along with LDAPQuery’s Concept predicate getQuery() [3].

Now we have python2’s LDAPQuery modeled, we may now be heading to model its escape methods:

  • ldap.dn.escape_dn_chars (source)

  • ldap.filter.escape_filter_chars (documentation)

  • Finding the calls:

import semmle.python.ApiGraphs

from DataFlow::CallCfgNode ldap2EscapeCall
where
  ldap2EscapeCall in [
      API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall(),
      API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
    ]
select ldap2EscapeCall, ldap2EscapeCall.getArg(0)

Run it!

  • Wrapping them in their own classes:
import semmle.python.ApiGraphs

class LDAP2EscapeDNCall extends DataFlow::CallCfgNode {
  LDAP2EscapeDNCall() {
    this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
  }

  DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode {
  LDAP2EscapeFilterCall() {
    this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
  }

  DataFlow::Node getAnInput() { result = this.getArg(0) }
}

from DataFlow::CallCfgNode ldap2EscapeCall
where
    ldap2EscapeCall instanceof LDAP2EscapeDNCall or
    ldap2EscapeCall instanceof LDAP2EscapeFilterCall
select ldap2EscapeCall

Run it!

As you may have noticed, we can’t use ldapEscapeCall.getAnInput() in the select statement. This happens because we are dealing with a variable whose type is CallCfgNode, and this specific type does not have any getAnInput predicate. We could do something like ldapEscapeCall.(LDAP2EscapeDNCall).getAnInput() and it would work because the casting would be successful, but using Concepts is easier.

  • Extending LDAPEscape::Range:
import semmle.python.ApiGraphs

module LDAPEscape {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getAnInput();
  }
}

class LDAPEscape extends DataFlow::Node {
  LDAPEscape::Range range;

  LDAPEscape() { this = range }

  DataFlow::Node getAnInput() { result = range.getAnInput() }
}

class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeDNCall() {
    this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeFilterCall() {
    this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()

Run it!

Since we are done with python2’s ldap, we are ready to get python3’s ldap3 modeling done!

LDAP 3

Given the following vulnerable snippet:

@app.route("/normal")
def normal():

    unsafe_dc = request.args['dc']
    unsafe_filter = request.args['username']

    dn = "dc={}".format(unsafe_dc)
    search_filter = "(user={})".format(unsafe_filter)

    srv = ldap3.Server('ldap://127.0.0.1')
    conn = ldap3.Connection(srv, user=dn, auto_bind=True)
    conn.search(dn, search_filter)
  • Finding ldap3.Connection():
import semmle.python.ApiGraphs

from DataFlow::CallCfgNode c
where
    c.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
    c = API::moduleImport("ldap3").getMember("Connection").getACall()
select c

Run it!

API::moduleImport("ldap3") will return a reference to the ldap3 library, getMember("Connection") will return an instance to ldap3’s Connection method, and getACall() will get a call like ldap3.Connection(srv, user=dn, auto_bind=True).

  • Finding a MethodCallNode whose object local source is the call found previously:
import semmle.python.ApiGraphs

from DataFlow::MethodCallNode searchMethod
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap3").getMember("Connection").getACall()
select searchMethod

Run it!

As in python2’s ldap modeling, you may have noticed that the main modeling uses DataFlow::AttrRead, a structure meant for an object’s attribute read. I can’t remember why I used it, perhaps because MethodCallNode wasn’t introduced yet, but anyway, we will be using MethodCallNode.

  • Restricting searchMethod method to search:
import semmle.python.ApiGraphs

from DataFlow::MethodCallNode searchMethod
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
    searchMethod.getMethodName() = "search" and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap3").getMember("Connection").getACall()
select searchMethod

Run it!

  • Bonus: correlating ldap3.Connection()’s first argument local source to ldap3.Server().

Since the appearance of ldap3.Connection means there will be a call to ldap3.Server (otherwise ldap3.Connection wouldn’t work), applying the correlation is a great way to practice.

import semmle.python.ApiGraphs

from DataFlow::MethodCallNode searchMethod, DataFlow::CallCfgNode connectionCall
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
    connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
    searchMethod.getMethodName() = "search" and
    searchMethod.getObject().getALocalSource() = connectionCall and
    connectionCall.getArg(0).getALocalSource() =
      API::moduleImport("ldap3").getMember("Server").getACall()
select searchMethod

Run it!

  • Getting the arguments:
import semmle.python.ApiGraphs

from DataFlow::MethodCallNode searchMethod, DataFlow::CallCfgNode connectionCall, DataFlow::Node arg
where
    searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
    connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
    searchMethod.getMethodName() = "search" and
    searchMethod.getObject().getALocalSource() = connectionCall and
    connectionCall.getArg(0).getALocalSource() =
      API::moduleImport("ldap3").getMember("Server").getACall() and
    arg = searchMethod.getArg([0, 1])
select searchMethod, arg

Run it!

  • Making an LDAP3Query class:
import semmle.python.ApiGraphs

class LDAP3Query extends DataFlow::CallCfgNode {
  DataFlow::MethodCallNode searchMethod;

  LDAP3Query() {
    exists(DataFlow::CallCfgNode connectionCall |
      connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
      searchMethod.getMethodName() = "search" and
      searchMethod.getObject().getALocalSource() = connectionCall and
      connectionCall.getArg(0).getALocalSource() =
        API::moduleImport("ldap3").getMember("Server").getACall() and
      this = searchMethod.(DataFlow::CallCfgNode)
    )
  }

  DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}

from LDAP3Query lq
select lq, lq.getQuery()

Run it!

  • Extending LDAPQuery::Range:
import semmle.python.ApiGraphs
import semmle.python.ApiGraphs

module LDAPQuery {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getQuery();
  }
}

class LDAPQuery extends DataFlow::Node {
  LDAPQuery::Range range;

  LDAPQuery() { this = range }

  DataFlow::Node getQuery() { result = range.getQuery() }
}

class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP3Query() {
    exists(DataFlow::CallCfgNode connectionCall |
      connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
      searchMethod.getMethodName() = "search" and
      searchMethod.getObject().getALocalSource() = connectionCall and
      connectionCall.getArg(0).getALocalSource() =
        API::moduleImport("ldap3").getMember("Server").getACall() and
      this = searchMethod.(DataFlow::CallCfgNode)
    )
  }

  override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}

from LDAPQuery lq
select lq, lq.getQuery()

Run it!

Now we have python3’s LDAPQuery modeled, we may now be heading to model its escape methods:

  • ldap3.utils.dn.escape_rdn (source)

  • ldap3.utils.conv.escape_filter_chars (source)

  • Finding the calls:

import semmle.python.ApiGraphs

from DataFlow::CallCfgNode ldap3EscapeCall
where
  ldap3EscapeCall in [
      API::moduleImport("ldap3").getMember("utils").getMember("dn").getMember("escape_rdn").getACall(),
      API::moduleImport("ldap3").getMember("utils").getMember("conv").getMember("escape_filter_chars").getACall()
    ]
select ldap3EscapeCall, ldap3EscapeCall.getArg(0)

Run it!

  • Wrapping them in their own classes:
import semmle.python.ApiGraphs

class LDAP3EscapeDNCall extends DataFlow::CallCfgNode {
  LDAP3EscapeDNCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("dn")
          .getMember("escape_rdn")
          .getACall()
  }

  DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode {
  LDAP3EscapeFilterCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("conv")
          .getMember("escape_filter_chars")
          .getACall()
  }

  DataFlow::Node getAnInput() { result = this.getArg(0) }
}

from DataFlow::CallCfgNode ldap3EscapeCall
where
    ldap3EscapeCall instanceof LDAP3EscapeDNCall or
    ldap3EscapeCall instanceof LDAP3EscapeFilterCall
select ldap3EscapeCall

Run it!

  • Extending LDAPEscape::Range:
import semmle.python.ApiGraphs

module LDAPEscape {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getAnInput();
  }
}

class LDAPEscape extends DataFlow::Node {
  LDAPEscape::Range range;

  LDAPEscape() { this = range }

  DataFlow::Node getAnInput() { result = range.getAnInput() }
}

class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeDNCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("dn")
          .getMember("escape_rdn")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeFilterCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("conv")
          .getMember("escape_filter_chars")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()

Run it!

Everything together

  • LDAPQuery Concept:
import semmle.python.ApiGraphs

module LDAPQuery {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getQuery();
  }
}

class LDAPQuery extends DataFlow::Node {
  LDAPQuery::Range range;

  LDAPQuery() { this = range }

  DataFlow::Node getQuery() { result = range.getQuery() }
}

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP2Query() {
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall() and
    this = searchMethod.(DataFlow::CallCfgNode)
  }

  override DataFlow::Node getQuery() {
    result in [
        searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
      ]
  }
}

class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP3Query() {
    exists(DataFlow::CallCfgNode connectionCall |
      connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
      searchMethod.getMethodName() = "search" and
      searchMethod.getObject().getALocalSource() = connectionCall and
      connectionCall.getArg(0).getALocalSource() =
        API::moduleImport("ldap3").getMember("Server").getACall() and
      this = searchMethod.(DataFlow::CallCfgNode)
    )
  }

  override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}

from LDAPQuery lq
select lq, lq.getQuery()

Run it!

  • LDAPEscape Concept:
import semmle.python.ApiGraphs

module LDAPEscape {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getAnInput();
  }
}

class LDAPEscape extends DataFlow::Node {
  LDAPEscape::Range range;

  LDAPEscape() { this = range }

  DataFlow::Node getAnInput() { result = range.getAnInput() }
}

class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeDNCall() {
    this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeFilterCall() {
    this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeDNCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("dn")
          .getMember("escape_rdn")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeFilterCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("conv")
          .getMember("escape_filter_chars")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()

Run it!

Taint tracking configuration

Once both concepts have been populated, we are ready to get into the last stage of the query, the taint tracking configuration

Since this is a pretty basic query we will be using only one extra predicate, the Sanitizer.

class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
  LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

  override predicate isSink(DataFlow::Node sink) { sink = any(LDAPQuery ldapQuery).getQuery() }

  override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = any(LDAPEscape ldapEsc).getAnInput()
  }
}

As you can see, we are setting RemoteFlowSource as the source, LDAPQuery’s getQuery as the sink and LDAPEscape’s getAnInput as a sanitizer, so the query will flag a flow from RemoteFlowSource to any LDAPQuery’s getQuery if the mentioned RemoteFlowSource doesn’t flow into any LDAPEscape’s getAnInput.

Complete query

/**
 * @name LDAP query built from user-controlled sources
 * @description Building an LDAP query from user-controlled sources is vulnerable to insertion of
 *              malicious LDAP code by the user.
 * @kind path-problem
 * @problem.severity error
 * @id py/ldap-injection
 * @tags experimental	
 *       security	
 *       external/cwe/cwe-090
 */

import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph

module LDAPQuery {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getQuery();
  }
}

class LDAPQuery extends DataFlow::Node {
  LDAPQuery::Range range;

  LDAPQuery() { this = range }

  DataFlow::Node getQuery() { result = range.getQuery() }
}

module LDAPEscape {
  abstract class Range extends DataFlow::Node {
    abstract DataFlow::Node getAnInput();
  }
}

class LDAPEscape extends DataFlow::Node {
  LDAPEscape::Range range;

  LDAPEscape() { this = range }

  DataFlow::Node getAnInput() { result = range.getAnInput() }
}

class LDAP2QueryMethods extends string {
  LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}

class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP2Query() {
    searchMethod.getMethodName() instanceof LDAP2QueryMethods and
    searchMethod.getObject().getALocalSource() =
      API::moduleImport("ldap").getMember("initialize").getACall() and
    this = searchMethod.(DataFlow::CallCfgNode)
  }

  override DataFlow::Node getQuery() {
    result in [
        searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
      ]
  }
}

class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
  DataFlow::MethodCallNode searchMethod;

  LDAP3Query() {
    exists(DataFlow::CallCfgNode connectionCall |
      connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
      searchMethod.getMethodName() = "search" and
      searchMethod.getObject().getALocalSource() = connectionCall and
      connectionCall.getArg(0).getALocalSource() =
        API::moduleImport("ldap3").getMember("Server").getACall() and
      this = searchMethod.(DataFlow::CallCfgNode)
    )
  }

  override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}

class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeDNCall() {
    this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP2EscapeFilterCall() {
    this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeDNCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("dn")
          .getMember("escape_rdn")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
  LDAP3EscapeFilterCall() {
    this =
      API::moduleImport("ldap3")
          .getMember("utils")
          .getMember("conv")
          .getMember("escape_filter_chars")
          .getACall()
  }

  override DataFlow::Node getAnInput() { result = this.getArg(0) }
}

class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
  LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }

  override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

  override predicate isSink(DataFlow::Node sink) { sink = any(LDAPQuery ldapQuery).getQuery() }

  override predicate isSanitizer(DataFlow::Node sanitizer) {
    sanitizer = any(LDAPEscape ldapEsc).getAnInput()
  }
}

from LDAPInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ LDAP query parameter comes from $@.", sink.getNode(),
  "This", source.getNode(), "a user-provided value"

Run it!

Bonus exercises

If you have enjoyed this post and want to learn further I encourage you to give this exercises a shot and feel free to discuss the solutions.

  • Simplify modeling for MethodCallNode avoiding getObject().getALocalSource(). Spoiler: Z2V0QU1ldGhvZENhbGwobWV0aG9kTmFtZSk=
  • Get text in xml.etree.ElementTree.parse(StringIO(xml_content), parser=parser).getroot().text when xml_content is user-controlled data. Sample code (contains spoilers in other parts of the same page).
  • Model your favourite library even though it is already modeled in the official repository.
  • Contribute your favourite security query and earn some cash $$$$.

The end

This is the end of this post, I really hope you enjoyed learning CodeQL for Python and had a great time as I had writing this!

Jorge