Practical Introduction to CodeQL
TL;DR
In this post we will be learning in a practical way everything I have been learning for the past few months regarding CodeQL for Python. I hope you like it as much as I do! :)
- Learning resources
- Environment SetUp
- Concepts
- Query development
- Advanced query modeling
- Bonus exercises
- The end
Learning resources
- CodeQL documentation (link)
- GitHub Security Lab (link):
- GitHub Learning Lab:
- GitHub YouTube channel (sort by difficulty and learning quality):
-
- C/C++: CodeQL Live Episode 1
- C/C++: Make Memcpy Safe Again: CodeQL
- C/C++: CVE-2017-13782: CodeQL Study Note
- Tutorial: [Live Stream] CodeQL Code Scanning Language Tutorial
- Java: $3,000 CodeQL query for finding LDAP Injection - Github Security Lab - Hackerone
Environment SetUp
In order to be able to try out the examples this post will show, this section will help you understand what LGTM is and to set up a working codeql environment to run the queries on your end.
Remote queries
LGTM.com is a website holding github/codeql
’s lgtm.com
branch with an online codeql editor that lets you run any codeql snippet using the core codeql libraries.
As you can see, it lets you select several projects to run the query on (being able to create custom lists) and it also shows the results in a pretty way. The former example shows just a string, but using @kind path-problem
(query metadata) and DataFlow::PathGraph
is much prettier:
This post will refer you to LGTM each time there’s a codeql snippet whose behaviour may be shown.
Automation
The existence of a cloud-based CodeQL “instance” opens a wide range of ideas regarding automation. An aggressive automation clearly goes against LGTM ToS, so use this information at your own risk.
gagliardetto/lgtm-cli and JLLeitschuh/lgtm_hack_scripts let you follow repos (for them to be built by LGTM) based on GitHub API search or dependency network, create custom lists, and query already-built projects.
This automation helps measuring the impact and precission of the query, and lets you provide results for the bounty submissions if any. (see #submission).
Local queries
This is the way I’d recommend to run queries and play with them. Let’s start!
- Clone jorgectf/codeql inside an empty folder.
- Open the empty folder with VSCode.
- Install the CodeQL extension.
- Checkout
Practical-CodeQL-Introduction
branch: -
- Open a terminal
Terminal > New Terminal
and run(cd codeql/ && git checkout Practical-CodeQL-Introduction)
.
- Open a terminal
-
- OR
-
- Go to
Source Control
pane, clickmain
and choosePractical-CodeQL-Introduction
.
- Go to
- Go to
Testing
pane, expandcodeql > python / ql / test > experimental > query-tests > Security > Practical-CodeQL-Introduction
and click the “play”/“run” button. - Once the tests have finished (they will intendedly fail because the results don’t match those from
.expected
file) a CodeQL database should have been created. - Go to the
CodeQL
pane, clickAdd a CodeQL database: From a Folder
and choosecodeql/python/ql/test/experimental/query-tests/Security/Practical-CodeQL-Introduction/Practical-CodeQL-Introduction.testproj
. - Find a file called
query.ql
insidecodeql/python/ql/src/experimental/Security/Practical-CodeQL-Introduction/
. - You are ready to go! Feel free to run any query inside
query.ql
by writing the desired code and running it (Right Click >CodeQL: Run Query
). You may also run an specific snippet by selecting it, right-click >CodeQL: Quick Evaluation
.
In case CodeQL CLI doesn’t get installed (a binary capable of running everything related to codeql) head to Extensions > CodeQL > Extension Settings
, find Code QL › Cli: Executable Path
, add a random string like “a” inside the input form, click outside the form (for vscode to update the value) and remove the written input. You may see a vscode notification showing that the CodeQL CLI is being installed now.
Concepts
In order to fully understand the incoming points regarding query deveploment we need to look at a few concepts (some which you may already now, but focused on CodeQL).
Source
We may understand a “source” as the very first appearance of the code whose flow we want to follow. For example, a source could be user input or a hardcoded string (matching the form of an specific string), and we will sometimes be referring to it as “tainted” data (e.g., TaintTracking
: coming from TaintTracking::Configuration
, a class allowing us to specify and customize the source, sink and several other parts of a flow configuration).
RemoteFlowSource
Since most of the security-related queries' focus is to check whether user input flows into a specific part of the code (e.g., a function’s argument), CodeQL introduced a structure (see #concepts-again) that compiles every user input for developers not to worry about it. (Since CodeQL is under development, some frameworks may not be introduced yet, but the objective of this structure is to hold as much as user-input-providing functions as possible)
import python
import semmle.python.dataflow.new.RemoteFlowSources
from RemoteFlowSource rfs // create a 'rfs' variable of type RemoteFlowSource
select rfs // return all of its appearances
You may see some structures used inside an any()
function. This is because the from
clause can be avoided like this:
import python
import semmle.python.dataflow.new.RemoteFlowSources
select any(RemoteFlowSource rfs) // select any RemoteFlowSource appearance
Source in Regular Expression Injection query
Given the following snippet:
@app.route("/direct")
def direct():
unsafe_pattern = request.args["pattern"]
re.search(unsafe_pattern, "foo")
Since the vulnerability we are looking for happens when user input flows into the first argument of a regular expression operation (regular expression injection), the source here would be request.args["pattern"]
. Even though there are other ways to model this vulnerability (as seen below), the source of the flow will stay the same because request.args["pattern"]
is the very first appearance of user input (the exact thing whose flow we want to track).
@app.route("/compiled")
def compiled():
unsafe_pattern = request.args["pattern"]
compiled_pattern = re.compile(unsafe_pattern)
compiled_pattern.search("foo")
@app.route("/inline")
def inline():
unsafe_pattern = request.args["pattern"]
re.compile(unsafe_pattern).search("foo")
(see #advanced-modeling)
Source in LDAP Injection query
Given the following snippets (python2 and python3 examples):
@app.route("/normal2")
def normal2():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(foo={})".format(unsafe_filter)
ldap_connection = ldap.initialize("ldap://127.0.0.1")
user = ldap_connection.search_s(
dn, ldap.SCOPE_SUBTREE, search_filter)
@app.route("/normal3")
def normal3():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(user={})".format(unsafe_filter)
srv = ldap3.Server('ldap://127.0.0.1')
conn = ldap3.Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
In both cases (ldap injection), the source keeps being user input like request.args['dc']
and request.args['username']
.
(see #advanced-modeling)
Source in XXE query
@app.route("/lxml.etree.fromstring")
def lxml_fromstring():
xml_content = request.args['xml_content']
return lxml.etree.fromstring(xml_content).text
@app.route("/lxml.etree.XML")
def lxml_XML():
xml_content = request.args['xml_content']
return lxml.etree.XML(xml_content).text
@app.route("/lxml.etree.parse")
def lxml_parse():
xml_content = request.args['xml_content']
return lxml.etree.parse(StringIO(xml_content)).text
Yet again (XXE), the very first appearance of user input is request.args['xml_content']
and so our source would be.
(see #advanced-modeling)
Sink
As the opposite of what a source is, the “sink” is the last place our source has to arrive to be vulnerable.
Given this simple snippet:
@app.route("/demo")
def demo():
cmd = request.args["pattern"]
result = os.popen(cmd).read() # [1]
return f"{cmd} has returned {result}" # [2]
As you may have noticed, it’s clear that the actual last place request.args["pattern"]
(our source) appears/flows to (i.e., where our source sinks) is [2]
({
cmd
}
and {
result
}
respectively), but according to the previous definition, the actual sink in this query (last place to be vulnerable) would be [1]
(os.popen(
cmd
).read()
).
Sink in Regular Expression Injection query
Given the following snippet:
@app.route("/direct")
def direct():
unsafe_pattern = request.args["pattern"]
re.search(unsafe_pattern, "foo")
The sink in this example would be re.search
’s first argument (i.e., the first argument of re
’s search
method call) unsafe_pattern
.
@app.route("/inline")
def inline():
unsafe_pattern = request.args["pattern"]
re.compile(unsafe_pattern).search("foo")
This example’s vulnerable call is a bit tricky, because it embeds a method call inside of another method call. The sink would be re.compile
’s first argument (i.e., the first argument of re
’s compile
method call) being then the object of re
’s search
method call.
This snippet may help to understand the approach:
import python
import semmle.python.dataflow.new.DataFlow
from DataFlow::MethodCallNode mc, DataFlow::Node mco
where
mc.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mco = mc.getObject() and
mco instanceof DataFlow::MethodCallNode
select mco, mc
@app.route("/compile")
def compile():
unsafe_pattern = request.args["pattern"]
compiled_pattern = re.compile(unsafe_pattern)
compiled_pattern.search("")
In this example, the approach would be the same, but we have to uncover getALocalSource()
, a function in charge of finding where the variable comes from (i.e., gets declared).
import python
import semmle.python.dataflow.new.DataFlow
from DataFlow::MethodCallNode mc, DataFlow::Node mco
where
mc.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mco = mc.getObject().getALocalSource() and
mco instanceof DataFlow::MethodCallNode
select mco, mc
However, we are facing some false positives (that actually make sense). This happens because we are not restricting the MethodCallNode
s to the re
library ones, we are just looking for specific structures.
(see #advanced-modeling)
Sink in LDAP Injection query
Given the following snippet:
@app.route("/normal2")
def normal2():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(user={})".format(unsafe_filter)
ldap_connection = ldap.initialize("ldap://127.0.0.1")
user = ldap_connection.search_s(
dn, ldap.SCOPE_SUBTREE, search_filter)
We can use MethodCallNode
and getALocalSource()
again to find ldap_connection.search_s
’s first/third argument (being ldap_connection
a variable holding ldap.initialize
); in other words, we have to find the first/third argument of a method call whose object’s local source is a call to ldap
’s initialize
method.
@app.route("/normal3")
def normal3():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(user={})".format(unsafe_filter)
srv = ldap3.Server('ldap://127.0.0.1')
conn = ldap3.Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
In this python3 example, the modeling would have to search for the first/second argument of conn.search
call, being conn
a variable holding ldap3.Connection
whose first argument local source is ldap3.Server
.
(see #advanced-modeling)
Sink in XXE query
Given the following snippet:
@app.route("/lxml.etree.fromstring")
def lxml_fromstring():
xml_content = request.args['xml_content']
return lxml.etree.fromstring(xml_content).text
The sink would be the first argument of lxml.etree.fromstring
call.
@app.route("/lxml.etree.parse")
def lxml_parse():
xml_content = request.args['xml_content']
return lxml.etree.parse(StringIO(xml_content)).text
In this example, the sink is again the first arg… wait… the first argument of lxml.etree.parse
is actually StringIO(xml_content)
! Does that mean that we should be looking for the first argument of StringIO
call, being it the first argument of lxml.etree.parse
? That would imply modeling all of these related functions just for the sink (like BytesIO
).
That would be a bad practice in terms of effectiveness, since the former modeling idea would not cover the following example, and we would be loosing the power of taint flow analysis (taint tracking).
@app.route("/lxml.etree.parse")
def lxml_parse():
xml_content = request.args['xml_content']
xml_content = StringIO(xml_content)
return lxml.etree.parse(xml_content).text
Because of that, we should be using some taint tracking configuration predicates like isAdditionalTaintStep
and Sanitizer
s.
Taint tracking configuration predicates
This predicates are like “extras” that let us specify some details for our taint tracking configuration.
Additional taint steps
Additional taint steps let us specify additional “jumps” that the flow may make in order to “bypass” known functions. If specified, once the flow ends (the specified source doesn’t flow anymore) CodeQL applies the specified steps and continues looking for flow.
For example:
@app.route("/lxml.etree.parse")
def lxml_parse():
xml_content = request.args['xml_content']
xml_content = StringIO(xml_content)
return lxml.etree.parse(xml_content).text
CodeQL taint tracking will see that request.args['xml_content']
flows to StringIO(here)
, and would stop since the next step would be lxml.etree.parse(here)
, but here
here would be StringIO(request.args['xml_content'])
instead of being just request.args['xml_content']
. In other words, the first argument of lxml.etree.parse
is seen as the result of StringIO
(even though the code is vulnerable). This happens because CodeQL may stop taint flow analysis if the tainted data flows into a function that changes its content. In this case, StringIO
returns a file’s filename whose content is the provided argument.
To make it work, we should specify an additional taint step: StringIO
’s first argument being the nodeFrom
and StringIO
’s entire call being the nodeTo
.
(isAdditionalTaintStep
predicate override inside a taint tracking configuration)
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(DataFlow::CallCfgNode ioCalls |
ioCalls = API::moduleImport("io").getMember(["StringIO", "BytesIO"]).getACall() and
nodeFrom = ioCalls.getArg(0) and
nodeTo = ioCalls
)
}
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode ioCalls, DataFlow::Node nodeFrom, DataFlow::Node nodeTo
where
ioCalls.getLocation().getFile().getBaseName().matches("XXE%") and // just restricting the file to be queried
ioCalls = API::moduleImport("io").getMember(["StringIO", "BytesIO"]).getACall() and
nodeFrom = ioCalls.getArg(0) and
nodeTo = ioCalls
select ioCalls, nodeFrom, nodeTo
Sanitizers
Sanitizers, as the opposite to additional taint steps, let us specify functions or behaviours we don’t want CodeQL flow to follow. If specified, each time the flow makes an step, it will be checking that this specific step/behaviour isn’t specified as a sanitizer (if it is, the flow will stop).
For example, given the following snippet:
@app.route("/direct")
def direct():
unsafe_pattern = request.args['pattern']
safe_pattern = re.escape(unsafe_pattern)
re.search(safe_pattern, "")
In case CodeQL saw re.escape
as a function that doesn’t decontaminates the source (it keeps being tainted and so the flow wouldn’t stop) we should be specifying it as a sanitizer behaviour.
Specifying re.escape
’s first argument (i.e., the first argument of re
’s escape
method call) as the node argument of isSanitizer
, if CodeQL’s flow is in that position, it will stop.
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
}
Furthermore, we may use isSanitizerGuard
to specify another situation that we want the flow to stop in. For example, StringConstCompare
:
(According to its qldoc: A validation of unknown node by comparing with a constant string value.
)
override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
guard instanceof StringConstCompare
}
Concepts ¿again?
CodeQL “Concepts” is a structure in charge of holding many different modelings inside the same structure depending if its Range
is extended.
For example, RemoteFlowSource’s modeling (although it is not inside Concepts.qll
) is quite the same. This is how flask
’s request
is developed to extend RemoteFlowSource
. Because of that, everytime RequestFlowSource
is used with python/ql/lib/semmle/python/frameworks/Flask.qll
imported, it makes a reference to the extended structure.
As an example of an actual Concept inside Concepts.qll
, we may see LDAPQuery
from the LDAP Injection query:
- Concept declaration inside
Concepts.qll
. - Extending the concept with
python2
’s LDAP code modeling. - Extending the concept with
python3
’s LDAP code modeling. - Using the concept to match all the extended APIs.
Concepts' predicates are created for Concepts to be fully customizable. (Notice that RemoteFlowSource
does not have any useful predicate while LDAPQuery
has a getQuery
one, for LDAPQuery
to be used as a call to a search call modeling and its getQuery
as the compilation of all inputs belonging to a LDAP query).
Query development
Basic approaches
- Finding calls of library methods:
import semmle.python.ApiGraphs
select API::moduleImport("re").getAMember().getACall()
This let us get a general idea of how the library is being used along the codebase.
You can specify the method name using getMember(name)
.
- Getting arguments from method calls:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::Node arg
where
call = API::moduleImport(_).getAMember().getACall() and
arg in [call.getArgByName("auto_bind"), call.getArg(2)]
select arg
We are using _
in API::moduleImport(_)
to get all the module imports as if we used getMember(_)
instead of getAMember()
. The query finds any call for a library.method
and gets the keyword argument auto_bind
and the third positional argument.
- Finding calls of any-level library methods:
import semmle.python.ApiGraphs
select API::moduleImport("re").getAMember*().getACall()
- Finding a string that flows to an argument:
import python
import semmle.python.ApiGraphs
from StrConst str
where
DataFlow::exprNode(str)
.(DataFlow::LocalSourceNode)
.flowsTo(API::builtin("eval").getACall().getArg(0))
select str.getText()
- Finding a specific string that flows to an argument:
import python
import semmle.python.ApiGraphs
from StrConst str
where
str.getText().matches("second%") and
DataFlow::exprNode(str)
.(DataFlow::LocalSourceNode)
.flowsTo(API::builtin("eval").getACall().getArg(0))
select str.getText()
See this documentation page for the rest of matches()
alike functions.
More examples.
- Void predicates:
predicate doSomething() { any() }
where doSomething()
select "Hello predicate!"
predicate doSomething() { none() }
where doSomething()
select "Hello predicate!"
any()
/none()
can also be a condition.
import semmle.python.ApiGraphs
predicate doSomething() {
exists(API::moduleImport("re").getMember("match").getACall())
}
where doSomething()
select "Hello predicate!"
import semmle.python.ApiGraphs
predicate doSomething() {
exists(API::moduleImport("re").getMember("motch").getACall())
}
where doSomething()
select "Hello predicate!"
doSomething()
will success and continue the execution if calls to re.m(a|o)tch
exist.
- Creating a custom class and querying it:
import python
class CustomClass extends StrConst {
CustomClass() { this.getText().matches("this%") }
predicate doSomething() { this.getText().matches("%demo%") }
}
from CustomClass a
where a.doSomething()
select a.getText()
This would be the same as using StrConst
directly in the from
clause and setting the conditionals in the where
clause.
Codebase distribution
Just before digging into pure query development, let’s see how the code is distributed along the codebase.
Placed inside codeql/python/ql/src/experimental/Security/CWE-XXX
should be the main query (LDAPInjection.ql
), the .qhelp
file (see #documentation) and simple examples of what the query covers. Example
Placed inside codeql/python/ql/test/experimental/query-tests/Security/CWE-XXX
should be all the tests for the query, a .qlref
pointing to the previous “main” query and a .expected
(see #tests).
Placed inside codeql/python/ql/src/experimental/semmle/python
should be the rest of the modeling:
- Concepts
-
...experimental/semmle/python/Concepts.qll
- Frameworks and libraries' modeling:
-
...experimental/semmle/python/frameworks
-
...experimental/semmle/python/libraries
-
...experimental/semmle/python/templates
- TaintTracking configurations and query-specific modeling:
-
...experimental/semmle/python/security
Modeling
Concepts
The first thing we should be doing while developing a query is thinking about a proper way of making its Concepts.
RegexExecution
and RegexEscape
modeling
LDAPQuery
and LDAPEscape
modeling
XMLParsing
and XMLParser
modeling
JWTEncoding
and JWTDecoding
modeling
As you may have noticed, the above concepts cover the main point of the query (without taking into account the vulnerability itself).
Frameworks/Libraries
In this stage, we should be modeling the libraries or frameworks related to the vulnerability extending the Concepts:
re
(Regex Injection
) modeling
LDAP
(LDAP Injection
) modeling
XML
(XXE
) modeling
JWT
libraries modeling:
Taint tracking configuration and query-specific modeling
Finally, it’s time to model the taint tracking config (source, sink, sanitizers and/or additional taint steps):
class QUERYFlowConfig extends TaintTracking::Configuration {
QUERYFlowConfig() { this = "QUERYFlowConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof SOURCE // [1]
}
override predicate isSink(DataFlow::Node sink) {
sink instanceof SINK // [2]
}
override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
guard instanceof StringConstCompare // [3]
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) { // [4]
nodeFrom instanceof PREV_STEP and
nodeTo instanceof NEXT_STEP
}
}
This is the usual syntax of a taint tracking configuration. You may add or remove any predicate as you wish (unless sink/source).
[1]
is where the source is declared. When it comes to any injection, it tends to be related to user input flowing into a specific function, for that we would be using RemoteFlowSource
like source instanceof RemoteFlowSource
(from semmle.python.dataflow.new.RemoteFlowSources
) (see #remoteflowsource)
[2]
is where the sink is declared. For this we may have a concept like LDAPQuery
with a getQuery
predicate. To use it we may write sink = any(LDAPQuery foo).getQuery()
. You may use any()
to be easily readable, but you could also use an exists()
clause declaring a variable of type LDAPQuery
and setting the sink to that variable’s .getQuery()
:
exists(LDAPQuery lq |
sink = lq.getQuery()
)
[3]
is where the sanitizer guard is declared (it could also be a Sanitizer with isSanitizer()
). (see #sanitizers)
[4]
is where the additional taint step is declared. (see #additional-taint-steps)
Regex Injection
taint tracking config with a query-specific modeling in order to display the used regex method in the provided alert.LDAP Injection
pure taint tracking config.XXE
taint tracking config with “extras”.
When it comes to query-specific modeling, it is everything related to the objective of the query rather than the library modeling involved.
For example, here the query-specific modeling is using LDAPBind
(a modeling based on the ldap
package) to get those who hold a None
, empty or unset password.
Basic example of a taint tracking configuration
/**
* A taint-tracking configuration for detecting code injections.
*/
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
sink = API::builtin("eval").getACall().getArg(0)
}
}
This taint tracking configuration will detect all RemoteFlowSource
s flowing to the first argument of any eval
call.
Let’s give it a try against this following snippet:
from flask import Flask, request
app = Flask(__name__)
@app.route("/flow1")
def flow1():
code = request.args["code"]
eval(code)
@app.route("/flow2")
def flow2():
email = request.args["email"]
eval("./send_email {email}".format(email=email))
def flow3_extra(text):
return text.split("\n")
@app.route("/flow3")
def flow3():
text = request.args["text"]
eval(flow3_extra(text))
@app.route("/flow4")
def flow4():
text = request.args["text"]
tixt = text
toxt = flow3_extra(tixt)
tuxt = toxt
eval(tuxt)
@app.route("/flow1_good")
def flow1_good():
code = request.args["code"]
if code == "print('Hello, Wo... CodeQL!')":
eval(code)
In this snippet we are testing:
- A simple flow with
flow1
in which the GET parametercode
gets assigned to a variable, and then that variable is used as the first argument of aneval
call. - A flow
flow2
in which the GET parameteremail
gets assigned to a variable, and then that variable used as an argument of the formatting of the string being used as the first argument of aneval
call. - A tricky flow involving a function
flow3
in which the GET parametertext
gets assigned to a variable, and then that variable is used as the first argument toflow3_extra
, who returns the text splitted by\n
(LF) and is used as the first argument of aneval
call. - A longer flow
flow4
in which the GET parametertext
gets assigned to a variable, who is then assigned to another, then used as the first argument offloe3_extra
who splits the argument by\n
returning it and assigning it to another variable who is then assigned to another one and then used as the first argument of aneval
call.
Our query would be something like this:
/*
* @kind path-problem
*/
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph
/**
* A taint-tracking configuration for detecting code injections.
*/
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
sink= API::builtin("eval").getACall().getArg(0)
}
}
from CodeInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ eval argument comes from a $@",
sink.getNode(), "This", source.getNode(), "user-provided value"
Basically we are telling CodeQL to give us every source and sink when a configuration holding both of them check source
to be a RemoteFlowSource
and sink
the first argument to an eval
call. Since we are using DataFlow::PathNode
s and @kind path-problem
, the results will get displayed in a way that the flow can be easily followed (i.e., each step/jump is shown).
As you may have seen, all but flow1_good
functions are vulnerable even though this query flags them all. As shown in #sanitizers, we can add a sanitizer guard like StringConstCompare
to avoid CodeQL passing through the ==
comparison.
/*
* @kind path-problem
*/
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.BarrierGuards
import DataFlow::PathGraph
/**
* A taint-tracking configuration for detecting code injections.
*/
class CodeInjectionFlowConfig extends TaintTracking::Configuration {
CodeInjectionFlowConfig() { this = "CodeInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
sink= API::builtin("eval").getACall().getArg(0)
}
override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
guard instanceof StringConstCompare
}
}
from CodeInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ eval argument comes from a $@",
sink.getNode(), "This", source.getNode(), "user-provided value"
Tests
Basic tests
These are placed inside codeql/python/ql/src/experimental/Security/CWE-XXX
, perhaps inside a subfolder if they are several. This tests are in charge of showing a basic pattern the query matches. Example
Advanced tests
These are placed inside codeql/python/ql/test/experimental/query-tests/Security/CWE-XXX
and should cover every different situation of vulnerable and non-vulnerable (with sanitizers) code patterns. Example
The .qlref
file will tell the test runner where the query using those tests is. The .expected
file will be firstly empty and then it can be filled with the results of the tests for the CodeQL engineers (or reviewers) to check that the query is meeting its expected results.
Documentation (qhelp and qldocs)
The .qhelp
file is an explanation of the query providing an overview of it, the recommendations to fix the vulnerability, an example of what the query looks for (basic tests) and references.
qldocs
is the term to refer to the documentation inside the code. Since CodeQL can be a bit tricky while modeling large patterns, the documentation helps everyone to understand the approach taken:
Submission
Once the query is finished and found some results (at least a new/already discovered CVE) you may make a Pull Request to github/codeql
(the latest prerequisite is just for CodeQL’s bug bounty program, see its hackerone hacktivity) and open an issue in github/securitylab
.
Advanced query modeling
In this section we will be covering each part of the following queries to see specific examples.
Regular expression injection
Concepts
The main “structures” we will need in the taint tracking configuration are:
- Regular expression executions (calls to a regex operation) (link) with:
-
- A predicate to get the input argument holding the regular expression.
-
- A predicate to get the method being used.
- Regular expression escaping functions with a predicate to get the input being escaped. (link)
Let’s populate them!
re library modeling
To begin with, we should be compiling all re
methods executing a provided regex:
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
Then it is easy to check if a used method is inside the ones we are looking for: (In the actual modeling, the method in [1]
is not hardcoded, it is statically obtained)
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from string method
where
method = "search" and
method instanceof RegexExecutionMethods
select method
Let’s model the execution methods:
@app.route("/direct")
def direct():
unsafe_pattern = request.args['pattern']
safe_pattern = re.escape(unsafe_pattern)
re.search(safe_pattern, "")
@app.route("/compile")
def compile():
unsafe_pattern = request.args['pattern']
safe_pattern = re.escape(unsafe_pattern)
compiled_pattern = re.compile(safe_pattern)
compiled_pattern.search("")
- Finding
re.(RegexExecutionMethods)
:
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from DataFlow::CallCfgNode c
where
c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
c = API::moduleImport("re").getMember(any(RegexExecutionMethods r)).getACall()
select c
- Finding a
MethodCallNode
whose object isre.compile
:
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode mcn
where
mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mcn.getObject() = API::moduleImport("re").getMember("compile").getACall()
select mcn
However, we won’t be getting the case in which re.compile
is previously assigned to a variable. To achieve that we are going to use getALocalSource()
. Since re.compile()
’s local source node is re.compile()
, we will be getting all the cases including when the call is variable.search()
where variable = re.compile()
.
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode mcn
where
mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall()
select mcn
Moreover, you may hace noticed that, even though we will have to make a concept using getRegex
to hold the argument to re.compile
and we could have made this snippet using a CallCfgNode
as the main variable (instead of a CallCfgNode
) to make it more visual, turns out RegexExecution
will hold calls to re.(RegexExecutionMethods)
, so
this way the predicate will be easier to deal with. Otherwise, we would have to create a class variable to correlate the MethodCallNode
with the class' CallCfgNode
.
- Restricting
MethodCallNode
method toRegexExecutionMethods
:
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from DataFlow::MethodCallNode mcn
where
mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall() and
mcn.getMethodName() instanceof RegexExecutionMethods
select mcn
- Getting the arguments:
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from DataFlow::CallCfgNode c
where
c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
c = API::moduleImport("re").getMember(any(RegexExecutionMethods r)).getACall()
select c, c.getArg(0)
In this example, getting the used re
method directly from c
would be a bit tricky since we would have to get the attribute of the function of the call node like .getFunction().asExpr().(Attribute)
which would only work under certain conditions. Because of this complexity (done here in a query-specific modeling), we can create a string variable instance of RegexExecutionMethods
, use it inside getMember
and use it then in the select clause like:
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from DataFlow::CallCfgNode c, string method
where
c.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
method instanceof RegexExecutionMethods and
c = API::moduleImport("re").getMember(method).getACall()
select c, c.getArg(0), method
To accomplish this later in the class modeling we will be creating a class variable to be able to share it between predicates.
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
from DataFlow::MethodCallNode mcn
where
mcn.getLocation().getFile().getBaseName().matches("Regex%") and // just restricting the file to be queried
mcn.getObject().getALocalSource() = API::moduleImport("re").getMember("compile").getACall() and
mcn.getMethodName() instanceof RegexExecutionMethods
select mcn, mcn.getArg(0), mcn.getMethodName()
- Making
DirectRegex
andCompiledRegex
classes:
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
class DirectRegex extends DataFlow::CallCfgNode {
string reMethod;
DirectRegex() {
this = API::moduleImport("re").getMember(reMethod).getACall() and
reMethod instanceof RegexExecutionMethods
}
DataFlow::Node getRegex() { result = this.getArg(0) }
DataFlow::Node getReMethod() { result = this }
string getReMethodName() { result = "re." + reMethod }
}
from DirectRegex re
select re, re.getRegex(), re.getReMethodName()
import semmle.python.ApiGraphs
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
class CompiledRegex extends DataFlow::MethodCallNode {
DataFlow::CallCfgNode compileCall;
CompiledRegex() {
compileCall = API::moduleImport("re").getMember("compile").getACall() and
this.getObject().getALocalSource() = compileCall and
this.getMethodName() instanceof RegexExecutionMethods
}
DataFlow::Node getRegex() { result = compileCall.getArg(0) }
DataFlow::Node getReMethod() { result = compileCall }
string getReMethodName() { result = "re.compile" }
}
from CompiledRegex re
select re, re.getRegex(), re.getReMethodName()
- Merging and extending
RegexExecution::Range
:
import semmle.python.ApiGraphs
module RegexExecution {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getRegex();
abstract DataFlow::Node getReMethod();
abstract string getReMethodName();
}
}
class RegexExecution extends DataFlow::Node {
RegexExecution::Range range;
RegexExecution() { this = range }
DataFlow::Node getRegex() { result = range.getRegex() }
DataFlow::Node getReMethod() { result = range.getReMethod() }
string getReMethodName() { result = range.getReMethodName() }
}
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
string reMethod;
DirectRegex() {
this = API::moduleImport("re").getMember(reMethod).getACall() and
reMethod instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = this.getArg(0) }
override DataFlow::Node getReMethod() { result = this }
override string getReMethodName() { result = "re." + reMethod }
}
class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
DataFlow::CallCfgNode compileCall;
CompiledRegex() {
compileCall = API::moduleImport("re").getMember("compile").getACall() and
this.getObject().getALocalSource() = compileCall and
this.getMethodName() instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = compileCall.getArg(0) }
override DataFlow::Node getReMethod() { result = compileCall }
override string getReMethodName() { result = "re.compile" }
}
from RegexExecution re
select re, re.getRegex(), re.getReMethodName()
Let’s model the sanitizer!
- Finding the call:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode reEscapeCall
where reEscapeCall = API::moduleImport("re").getMember("escape").getACall()
select reEscapeCall
- Getting the argument:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode reEscapeCall, DataFlow::Node reEscaped
where
reEscapeCall = API::moduleImport("re").getMember("escape").getACall() and
reEscaped = reEscapeCall.getArg(0)
select reEscapeCall, reEscaped
Since there’s only one escape call related to regular expressions in re
that we know of, we will be skipping the escape concept (it’s a good practice though, since it makes it very easy to extend the escape concept with a new call and all the queries depending on these regex escape calls would automatically use it).
Taint tracking configuration
class RegexInjectionFlowConfig extends TaintTracking::Configuration {
RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink = any(RegexExecution re).getRegex() }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
}
}
With this configuration, CodeQL will try to find every user input which flows to the first argument of any regex-executing call without passing through the first argument of re.escape
call.
Complete query
/**
* @name Regular expression injection
* @description User input should not be used in regular expressions without first being escaped,
* otherwise a malicious user may be able to inject an expression that could require
* exponential time on certain inputs.
* @kind path-problem
* @problem.severity error
* @id py/regex-injection
* @tags security
* external/cwe/cwe-730
* external/cwe/cwe-400
*/
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph
module RegexExecution {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getRegex();
abstract DataFlow::Node getReMethod();
abstract string getReMethodName();
}
}
class RegexExecution extends DataFlow::Node {
RegexExecution::Range range;
RegexExecution() { this = range }
DataFlow::Node getRegex() { result = range.getRegex() }
DataFlow::Node getReMethod() { result = range.getReMethod() }
string getReMethodName() { result = range.getReMethodName() }
}
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
string reMethod;
DirectRegex() {
this = API::moduleImport("re").getMember(reMethod).getACall() and
reMethod instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = this.getArg(0) }
override DataFlow::Node getReMethod() { result = this }
override string getReMethodName() { result = "re." + reMethod }
}
class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
DataFlow::CallCfgNode compileCall;
CompiledRegex() {
compileCall = API::moduleImport("re").getMember("compile").getACall() and
this.getObject().getALocalSource() = compileCall and
this.getMethodName() instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = compileCall.getArg(0) }
override DataFlow::Node getReMethod() { result = compileCall }
override string getReMethodName() { result = "re.compile" }
}
class RegexInjectionFlowConfig extends TaintTracking::Configuration {
RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink = any(RegexExecution re).getRegex() }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
}
}
from RegexInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ regular expression is constructed from a $@.",
sink.getNode(), "This", source.getNode(), "user-provided value"
In order to display getReMethod
and getReMethodName
in the select
clause we have to create a query-specific modeling to act as a sink, who we can access by casting the sink and then we can access the predicates.
/**
* @name Regular expression injection
* @description User input should not be used in regular expressions without first being escaped,
* otherwise a malicious user may be able to inject an expression that could require
* exponential time on certain inputs.
* @kind path-problem
* @problem.severity error
* @id py/regex-injection
* @tags security
* external/cwe/cwe-730
* external/cwe/cwe-400
*/
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph
module RegexExecution {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getRegex();
abstract DataFlow::Node getReMethod();
abstract string getReMethodName();
}
}
class RegexExecution extends DataFlow::Node {
RegexExecution::Range range;
RegexExecution() { this = range }
DataFlow::Node getRegex() { result = range.getRegex() }
DataFlow::Node getReMethod() { result = range.getReMethod() }
string getReMethodName() { result = range.getReMethodName() }
}
class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
string reMethod;
DirectRegex() {
this = API::moduleImport("re").getMember(reMethod).getACall() and
reMethod instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = this.getArg(0) }
override DataFlow::Node getReMethod() { result = this }
override string getReMethodName() { result = "re." + reMethod }
}
class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
DataFlow::CallCfgNode compileCall;
CompiledRegex() {
compileCall = API::moduleImport("re").getMember("compile").getACall() and
this.getObject().getALocalSource() = compileCall and
this.getMethodName() instanceof RegexExecutionMethods
}
override DataFlow::Node getRegex() { result = compileCall.getArg(0) }
override DataFlow::Node getReMethod() { result = compileCall }
override string getReMethodName() { result = "re.compile" }
}
class RegexInjectionSink extends DataFlow::Node {
string reMethodName;
DataFlow::Node reMethod;
RegexInjectionSink() {
exists(RegexExecution reExec |
this = reExec.getRegex() and
reMethod = reExec.getReMethod() and
reMethodName = reExec.getReMethodName()
)
}
DataFlow::Node getReMethod() { result = reMethod }
string getReMethodName() { result = reMethodName }
}
class RegexInjectionFlowConfig extends TaintTracking::Configuration {
RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof RegexInjectionSink }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = API::moduleImport("re").getMember("escape").getACall().getArg(0)
}
}
from RegexInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"$@ regular expression is constructed from a $@ and executed by $@.", sink.getNode(), "This",
source.getNode(), "user-provided value", sink.getNode().(RegexInjectionSink).getReMethod(),
sink.getNode().(RegexInjectionSink).getReMethodName()
LDAP Injection
Concepts
The main “structures” we will need in the taint tracking configuration are:
- LDAP queries with a predicate to get the argument holding the query to be executed. (Link)
- LDAP escape functions with a predicate to get the argument holding the input being escaped. (Link)
Let’s populate them!
LDAP library modeling
LDAP 2
The first thing done is the creation of a class holding all the methods in charge of executing an LDAP query, let’s begin with python2
’s ldap
:
private class LDAP2QueryMethods extends string {
LDAP2QueryMethods() {
this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"]
}
}
Then it would be easy to check whether a used method is inside the ones we are looking for: (In the actual modeling, the method is not hardcoded)
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() {
this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"]
}
}
from string method
where
method = "search_s" and
method instanceof LDAP2QueryMethods
select method
Let’s model the query methods:
@app.route("/normal2")
def normal2():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(foo={})".format(unsafe_filter)
ldap_connection = ldap.initialize("ldap://127.0.0.1")
user = ldap_connection.search_s(
dn, ldap.SCOPE_SUBTREE, search_filter)
- Finding
ldap.initialize()
:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode c
where
c.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
c = API::moduleImport("ldap").getMember("initialize").getACall()
select c
API::moduleImport("ldap")
will return a reference to the ldap
library, getMember("initialize")
will return an instance to ldap
’s initialize
method, and getACall()
will get a call like ldap.initialize("ldap://127.0.0.1")
.
- Finding a
MethodCallNode
whose object local source is the call found previously:
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode searchMethod
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall()
select searchMethod
You may have noticed that the main modeling uses DataFlow::AttrRead
, a structure meant for an object’s attribute read. I can’t remember why I used it, perhaps because MethodCallNode
wasn’t introduced yet, but anyway, we will be using MethodCallNode
.
- Restricting
searchMethod
toLDAP2QueryMethods
:
import semmle.python.ApiGraphs
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
from DataFlow::MethodCallNode searchMethod
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall()
select searchMethod
It is faster to restrict searchMethod
’s method name before computing that its object’s local source equals a call to ldap.initialize
, because if there’s no LDAP2QueryMethods
, the latest won’t be computed.
- Getting the arguments:
import semmle.python.ApiGraphs
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
from DataFlow::MethodCallNode searchMethod, DataFlow::Node arg
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP_%") and // just restricting the file to be queried
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall() and
arg in [searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")]
select searchMethod, arg
arg in []
means that CodeQL will execute the code following that statement setting arg
to each object inside the list.
- Making a
LDAP2Query
class:
import semmle.python.ApiGraphs
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
class LDAP2Query extends DataFlow::CallCfgNode {
DataFlow::MethodCallNode searchMethod;
LDAP2Query() {
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
}
DataFlow::Node getQuery() {
result in [
searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
]
}
}
from LDAP2Query lq
select lq, lq.getQuery()
Notice that, in order to use searchMethod
in a class predicate (getQuery
) we need to create a class variable DataFlow::MethodCallNode searchMethod;
. Otherwise, the only thing correlating getQuery
with its own LDAP2Query
would be this
and so we should make more type casting than this way.
- Extending
LDAPQuery::Range
:
import semmle.python.ApiGraphs
module LDAPQuery {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getQuery();
}
}
class LDAPQuery extends DataFlow::Node {
LDAPQuery::Range range;
LDAPQuery() { this = range }
DataFlow::Node getQuery() { result = range.getQuery() }
}
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP2Query() {
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall() and
this = searchMethod.(DataFlow::CallCfgNode) // [1]
}
override DataFlow::Node getQuery() {
result in [
searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
]
}
}
from LDAPQuery lq // [2]
select lq, lq.getQuery() // [3]
Notice the use of this
inside LDAP2Query
[1]
(since calling LDAPQuery
will return all query calls and so this
refers to LDAP2Query
being the call to a LDAP2QueryMethods
method), and the use of LDAPQuery
in the select
statement [2]
along with LDAPQuery
’s Concept predicate getQuery()
[3]
.
Now we have python2
’s LDAPQuery
modeled, we may now be heading to model its escape methods:
-
ldap.dn.escape_dn_chars
(source) -
ldap.filter.escape_filter_chars
(documentation) -
Finding the calls:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode ldap2EscapeCall
where
ldap2EscapeCall in [
API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall(),
API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
]
select ldap2EscapeCall, ldap2EscapeCall.getArg(0)
- Wrapping them in their own classes:
import semmle.python.ApiGraphs
class LDAP2EscapeDNCall extends DataFlow::CallCfgNode {
LDAP2EscapeDNCall() {
this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
}
DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode {
LDAP2EscapeFilterCall() {
this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
}
DataFlow::Node getAnInput() { result = this.getArg(0) }
}
from DataFlow::CallCfgNode ldap2EscapeCall
where
ldap2EscapeCall instanceof LDAP2EscapeDNCall or
ldap2EscapeCall instanceof LDAP2EscapeFilterCall
select ldap2EscapeCall
As you may have noticed, we can’t use ldapEscapeCall.getAnInput()
in the select statement. This happens because we are dealing with a variable whose type is CallCfgNode
, and this specific type does not have any getAnInput
predicate. We could do something like ldapEscapeCall.(LDAP2EscapeDNCall).getAnInput()
and it would work because the casting would be successful, but using Concepts is easier.
- Extending
LDAPEscape::Range
:
import semmle.python.ApiGraphs
module LDAPEscape {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getAnInput();
}
}
class LDAPEscape extends DataFlow::Node {
LDAPEscape::Range range;
LDAPEscape() { this = range }
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeDNCall() {
this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeFilterCall() {
this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()
Since we are done with python2
’s ldap
, we are ready to get python3
’s ldap3
modeling done!
LDAP 3
Given the following vulnerable snippet:
@app.route("/normal")
def normal():
unsafe_dc = request.args['dc']
unsafe_filter = request.args['username']
dn = "dc={}".format(unsafe_dc)
search_filter = "(user={})".format(unsafe_filter)
srv = ldap3.Server('ldap://127.0.0.1')
conn = ldap3.Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
- Finding
ldap3.Connection()
:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode c
where
c.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
c = API::moduleImport("ldap3").getMember("Connection").getACall()
select c
API::moduleImport("ldap3")
will return a reference to the ldap3
library, getMember("Connection")
will return an instance to ldap3
’s Connection
method, and getACall()
will get a call like ldap3.Connection(srv, user=dn, auto_bind=True)
.
- Finding a
MethodCallNode
whose object local source is the call found previously:
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode searchMethod
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap3").getMember("Connection").getACall()
select searchMethod
As in python2
’s ldap
modeling, you may have noticed that the main modeling uses DataFlow::AttrRead
, a structure meant for an object’s attribute read. I can’t remember why I used it, perhaps because MethodCallNode
wasn’t introduced yet, but anyway, we will be using MethodCallNode
.
- Restricting
searchMethod
method tosearch
:
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode searchMethod
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap3").getMember("Connection").getACall()
select searchMethod
- Bonus: correlating
ldap3.Connection()
’s first argument local source toldap3.Server()
.
Since the appearance of ldap3.Connection
means there will be a call to ldap3.Server
(otherwise ldap3.Connection
wouldn’t work), applying the correlation is a great way to practice.
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode searchMethod, DataFlow::CallCfgNode connectionCall
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall()
select searchMethod
- Getting the arguments:
import semmle.python.ApiGraphs
from DataFlow::MethodCallNode searchMethod, DataFlow::CallCfgNode connectionCall, DataFlow::Node arg
where
searchMethod.getLocation().getFile().getBaseName().matches("LDAP3%") and // just restricting the file to be queried
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall() and
arg = searchMethod.getArg([0, 1])
select searchMethod, arg
- Making an
LDAP3Query
class:
import semmle.python.ApiGraphs
class LDAP3Query extends DataFlow::CallCfgNode {
DataFlow::MethodCallNode searchMethod;
LDAP3Query() {
exists(DataFlow::CallCfgNode connectionCall |
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
)
}
DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}
from LDAP3Query lq
select lq, lq.getQuery()
- Extending
LDAPQuery::Range
:
import semmle.python.ApiGraphs
import semmle.python.ApiGraphs
module LDAPQuery {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getQuery();
}
}
class LDAPQuery extends DataFlow::Node {
LDAPQuery::Range range;
LDAPQuery() { this = range }
DataFlow::Node getQuery() { result = range.getQuery() }
}
class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP3Query() {
exists(DataFlow::CallCfgNode connectionCall |
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
)
}
override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}
from LDAPQuery lq
select lq, lq.getQuery()
Now we have python3
’s LDAPQuery
modeled, we may now be heading to model its escape methods:
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode ldap3EscapeCall
where
ldap3EscapeCall in [
API::moduleImport("ldap3").getMember("utils").getMember("dn").getMember("escape_rdn").getACall(),
API::moduleImport("ldap3").getMember("utils").getMember("conv").getMember("escape_filter_chars").getACall()
]
select ldap3EscapeCall, ldap3EscapeCall.getArg(0)
- Wrapping them in their own classes:
import semmle.python.ApiGraphs
class LDAP3EscapeDNCall extends DataFlow::CallCfgNode {
LDAP3EscapeDNCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("dn")
.getMember("escape_rdn")
.getACall()
}
DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode {
LDAP3EscapeFilterCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("conv")
.getMember("escape_filter_chars")
.getACall()
}
DataFlow::Node getAnInput() { result = this.getArg(0) }
}
from DataFlow::CallCfgNode ldap3EscapeCall
where
ldap3EscapeCall instanceof LDAP3EscapeDNCall or
ldap3EscapeCall instanceof LDAP3EscapeFilterCall
select ldap3EscapeCall
- Extending
LDAPEscape::Range
:
import semmle.python.ApiGraphs
module LDAPEscape {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getAnInput();
}
}
class LDAPEscape extends DataFlow::Node {
LDAPEscape::Range range;
LDAPEscape() { this = range }
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeDNCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("dn")
.getMember("escape_rdn")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeFilterCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("conv")
.getMember("escape_filter_chars")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()
Everything together
LDAPQuery
Concept:
import semmle.python.ApiGraphs
module LDAPQuery {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getQuery();
}
}
class LDAPQuery extends DataFlow::Node {
LDAPQuery::Range range;
LDAPQuery() { this = range }
DataFlow::Node getQuery() { result = range.getQuery() }
}
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP2Query() {
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
}
override DataFlow::Node getQuery() {
result in [
searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
]
}
}
class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP3Query() {
exists(DataFlow::CallCfgNode connectionCall |
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
)
}
override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}
from LDAPQuery lq
select lq, lq.getQuery()
LDAPEscape
Concept:
import semmle.python.ApiGraphs
module LDAPEscape {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getAnInput();
}
}
class LDAPEscape extends DataFlow::Node {
LDAPEscape::Range range;
LDAPEscape() { this = range }
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeDNCall() {
this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeFilterCall() {
this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeDNCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("dn")
.getMember("escape_rdn")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeFilterCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("conv")
.getMember("escape_filter_chars")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
from LDAPEscape ldapEscapeCall
select ldapEscapeCall, ldapEscapeCall.getAnInput()
Taint tracking configuration
Once both concepts have been populated, we are ready to get into the last stage of the query, the taint tracking configuration
Since this is a pretty basic query we will be using only one extra predicate, the Sanitizer.
class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink = any(LDAPQuery ldapQuery).getQuery() }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = any(LDAPEscape ldapEsc).getAnInput()
}
}
As you can see, we are setting RemoteFlowSource
as the source, LDAPQuery
’s getQuery
as the sink and LDAPEscape
’s getAnInput
as a sanitizer, so the query will flag a flow from RemoteFlowSource
to any LDAPQuery
’s getQuery
if the mentioned RemoteFlowSource
doesn’t flow into any LDAPEscape
’s getAnInput
.
Complete query
/**
* @name LDAP query built from user-controlled sources
* @description Building an LDAP query from user-controlled sources is vulnerable to insertion of
* malicious LDAP code by the user.
* @kind path-problem
* @problem.severity error
* @id py/ldap-injection
* @tags experimental
* security
* external/cwe/cwe-090
*/
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import DataFlow::PathGraph
module LDAPQuery {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getQuery();
}
}
class LDAPQuery extends DataFlow::Node {
LDAPQuery::Range range;
LDAPQuery() { this = range }
DataFlow::Node getQuery() { result = range.getQuery() }
}
module LDAPEscape {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getAnInput();
}
}
class LDAPEscape extends DataFlow::Node {
LDAPEscape::Range range;
LDAPEscape() { this = range }
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
class LDAP2QueryMethods extends string {
LDAP2QueryMethods() { this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"] }
}
class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP2Query() {
searchMethod.getMethodName() instanceof LDAP2QueryMethods and
searchMethod.getObject().getALocalSource() =
API::moduleImport("ldap").getMember("initialize").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
}
override DataFlow::Node getQuery() {
result in [
searchMethod.getArg(0), searchMethod.getArg(2), searchMethod.getArgByName("filterstr")
]
}
}
class LDAP3Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
DataFlow::MethodCallNode searchMethod;
LDAP3Query() {
exists(DataFlow::CallCfgNode connectionCall |
connectionCall = API::moduleImport("ldap3").getMember("Connection").getACall() and
searchMethod.getMethodName() = "search" and
searchMethod.getObject().getALocalSource() = connectionCall and
connectionCall.getArg(0).getALocalSource() =
API::moduleImport("ldap3").getMember("Server").getACall() and
this = searchMethod.(DataFlow::CallCfgNode)
)
}
override DataFlow::Node getQuery() { result = searchMethod.getArg([0, 1]) }
}
class LDAP2EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeDNCall() {
this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP2EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP2EscapeFilterCall() {
this = API::moduleImport("ldap").getMember("filter").getMember("escape_filter_chars").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeDNCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeDNCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("dn")
.getMember("escape_rdn")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAP3EscapeFilterCall extends DataFlow::CallCfgNode, LDAPEscape::Range {
LDAP3EscapeFilterCall() {
this =
API::moduleImport("ldap3")
.getMember("utils")
.getMember("conv")
.getMember("escape_filter_chars")
.getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink = any(LDAPQuery ldapQuery).getQuery() }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = any(LDAPEscape ldapEsc).getAnInput()
}
}
from LDAPInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ LDAP query parameter comes from $@.", sink.getNode(),
"This", source.getNode(), "a user-provided value"
Bonus exercises
If you have enjoyed this post and want to learn further I encourage you to give this exercises a shot and feel free to discuss the solutions.
- Simplify modeling for
MethodCallNode
avoidinggetObject().getALocalSource()
. Spoiler:Z2V0QU1ldGhvZENhbGwobWV0aG9kTmFtZSk=
- Get
text
inxml.etree.ElementTree.parse(StringIO(xml_content), parser=parser).getroot().text
whenxml_content
is user-controlled data. Sample code (contains spoilers in other parts of the same page). - Model your favourite library even though it is already modeled in the official repository.
- Contribute your favourite security query and earn some cash $$$$.
The end
This is the end of this post, I really hope you enjoyed learning CodeQL for Python and had a great time as I had writing this!