Skip to content

Commit 8715d29

Browse files
committed
Upload LDAP Improper authentication query, qhelp and tests
1 parent f45916e commit 8715d29

File tree

6 files changed

+270
-0
lines changed

6 files changed

+270
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
2+
3+
<qhelp>
4+
5+
<overview>
6+
<p>If an LDAP connection doesn't carry any kind of authentication, gets produced an access to information
7+
in the LDAP directory.</p>
8+
9+
<p>Simple authentication in LDAP can be used with three different mechanisms:<p>
10+
11+
<li>Anonymous Authentication Mechanism by performing a bind request with a username and password value of zero length.</li>
12+
<li>Unauthenticated Authentication Mechanism by performing a bind request with a password value of zero length.</li>
13+
<li>Name/Password Authentication Mechanism by performing a bind request with a password value of non-zero length.</li>
14+
15+
</overview>
16+
17+
<recommendation>
18+
<p>Every LDAP authentication should be done by using a password taken from a safe place.
19+
<recommendation>
20+
21+
<references>
22+
<li>
23+
SonarSource
24+
<a href="https://rules.sonarsource.com/python/type/Vulnerability/RSPEC-4433">RSPEC-4433</a>
25+
</li>
26+
<li>
27+
CWE-
28+
<a href="https://cwe.mitre.org/data/definitions/287.html">287</a>
29+
</references>
30+
31+
</qhelp>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* @name Python LDAP Improper Authentication
3+
* @description Check if a user-controlled query carry no authentication
4+
* @kind path-problem
5+
* @problem.severity warning
6+
* @id python/ldap-improper-auth
7+
* @tags experimental
8+
* security
9+
* external/cwe/cwe-287
10+
*/
11+
12+
import python
13+
import semmle.python.dataflow.new.RemoteFlowSources
14+
import semmle.python.dataflow.new.DataFlow
15+
import semmle.python.dataflow.new.TaintTracking
16+
import semmle.python.dataflow.new.internal.TaintTrackingPublic
17+
import DataFlow::PathGraph
18+
19+
// LDAP2
20+
class BindSink extends DataFlow::Node {
21+
BindSink() {
22+
exists(SsaVariable bindVar, CallNode bindCall, CallNode searchCall |
23+
// get variable initializing the connection
24+
bindVar.getDefinition().getImmediateDominator() = Value::named("ldap.initialize").getACall() and
25+
// get a call using that variable
26+
bindVar.getAUse().getImmediateDominator() = bindCall and
27+
// restrict call to any bind method
28+
bindCall.getNode().getFunc().(Attribute).getName().matches("%bind%") and
29+
(
30+
// check second argument (password)
31+
bindCall.getArg(1).getNode() instanceof None or
32+
count(bindCall.getAnArg()) = 1
33+
) and
34+
// get another call using that variable
35+
bindVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
36+
// restrict call to any search method
37+
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
38+
// set the third argument as sink
39+
this.asExpr() = searchCall.getArg(2).getNode()
40+
)
41+
}
42+
}
43+
44+
// LDAP3
45+
class ConnectionSink extends DataFlow::Node {
46+
ConnectionSink() {
47+
exists(SsaVariable connectionVar, CallNode connectionCall, CallNode searchCall |
48+
// get call initializing the connection
49+
connectionCall = Value::named("ldap3.Connection").getACall() and
50+
(
51+
// check password argument
52+
not exists(connectionCall.getArgByName("password")) or
53+
connectionCall.getArgByName("password").pointsTo(Value::named("None"))
54+
) and
55+
// get the variable initializing the connection
56+
connectionVar.getDefinition().getImmediateDominator() = connectionCall and
57+
// get a call using that variable
58+
connectionVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
59+
// restrict call to any search method
60+
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
61+
// set the second argument as sink
62+
this.asExpr() = searchCall.getArg(1).getNode()
63+
)
64+
}
65+
}
66+
67+
class LDAPImproperAuthSink extends DataFlow::Node {
68+
LDAPImproperAuthSink() {
69+
this instanceof BindSink or
70+
this instanceof ConnectionSink
71+
}
72+
}
73+
74+
class LDAPImproperAuthConfig extends TaintTracking::Configuration {
75+
LDAPImproperAuthConfig() { this = "LDAPImproperAuthConfig" }
76+
77+
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
78+
79+
override predicate isSink(DataFlow::Node sink) { sink instanceof LDAPImproperAuthSink }
80+
}
81+
82+
from LDAPImproperAuthConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
83+
where config.hasFlowPath(source, sink)
84+
select sink.getNode(), source, sink, "$@ LDAP query executes $@.", sink.getNode(), "This",
85+
source.getNode(), "a user-provided value without authentication."
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from ldap3 import Server, Connection, ALL
2+
from flask import request, Flask
3+
4+
app = Flask(__name__)
5+
6+
7+
@app.route("/passwordNone")
8+
def passwordNone():
9+
dn = request.args['dc']
10+
search_filter = request.args['search']
11+
12+
srv = Server('servername', get_info=ALL)
13+
conn = Connection(srv, user='user_dn', password=None)
14+
status, result, response, _ = conn.search(dn, search_filter)
15+
return result
16+
17+
18+
@app.route("/notPassword")
19+
def notPassword():
20+
dn = request.args['dc']
21+
search_filter = request.args['search']
22+
23+
srv = Server('servername', get_info=ALL)
24+
conn = Connection(srv, user='user_dn')
25+
status, result, response, _ = conn.search(dn, search_filter)
26+
return result
27+
28+
29+
# if __name__ == "__main__":
30+
# app.run(debug=True)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from ldap3 import Server, Connection, ALL
2+
from flask import request, Flask
3+
import os
4+
5+
app = Flask(__name__)
6+
7+
8+
@app.route("/passwordFromEnv")
9+
def passwordFromEnv():
10+
dn = request.args['dc']
11+
search_filter = request.args['search']
12+
13+
srv = Server('servername', get_info=ALL)
14+
conn = Connection(srv, user='user_dn',
15+
password=os.environ.get('LDAP_PASSWORD'))
16+
status, result, response, _ = conn.search(dn, search_filter)
17+
return result
18+
19+
# if __name__ == "__main__":
20+
# app.run(debug=True)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from flask import request, Flask
2+
import ldap
3+
4+
app = Flask(__name__)
5+
6+
7+
@app.route("/simple_bind")
8+
def simple_bind():
9+
dn = request.args['dc']
10+
search_filter = request.args['search']
11+
12+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
13+
ldap_connection.simple_bind('cn=root')
14+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
15+
return user[0]
16+
17+
18+
@app.route("/simple_bind_s")
19+
def simple_bind_s():
20+
dn = request.args['dc']
21+
search_filter = request.args['search']
22+
23+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
24+
ldap_connection.simple_bind_s('cn=root')
25+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
26+
return user[0]
27+
28+
29+
@app.route("/bind_s")
30+
def bind_s():
31+
dn = request.args['dc']
32+
search_filter = request.args['search']
33+
34+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
35+
ldap_connection.bind_s('cn=root', None)
36+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
37+
return user[0]
38+
39+
40+
@app.route("/bind")
41+
def bind():
42+
dn = request.args['dc']
43+
search_filter = request.args['search']
44+
45+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
46+
ldap_connection.bind('cn=root', None)
47+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
48+
return user[0]
49+
50+
51+
# if __name__ == "__main__":
52+
# app.run(debug=True)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from flask import request, Flask
2+
import ldap
3+
import os
4+
5+
app = Flask(__name__)
6+
7+
8+
@app.route("/simple_bind")
9+
def simple_bind():
10+
dn = request.args['dc']
11+
search_filter = request.args['search']
12+
13+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
14+
ldap_connection.simple_bind('cn=root', os.environ.get('LDAP_PASSWORD'))
15+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
16+
return user[0]
17+
18+
19+
@app.route("/simple_bind_s")
20+
def simple_bind_s():
21+
dn = request.args['dc']
22+
search_filter = request.args['search']
23+
24+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
25+
ldap_connection.simple_bind_s('cn=root', os.environ.get('LDAP_PASSWORD'))
26+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
27+
return user[0]
28+
29+
30+
@app.route("/bind_s")
31+
def bind_s():
32+
dn = request.args['dc']
33+
search_filter = request.args['search']
34+
35+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
36+
ldap_connection.bind_s('cn=root', os.environ.get('LDAP_PASSWORD'))
37+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
38+
return user[0]
39+
40+
41+
@app.route("/bind")
42+
def bind():
43+
dn = request.args['dc']
44+
search_filter = request.args['search']
45+
46+
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
47+
ldap_connection.bind('cn=root', os.environ.get('LDAP_PASSWORD'))
48+
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
49+
return user[0]
50+
51+
# if __name__ == "__main__":
52+
# app.run(debug=True)

0 commit comments

Comments
 (0)