Skip to content

Commit 2c2d96e

Browse files
joost-jSchamper
andauthored
Add LDAP search filter parser (#97)
Co-authored-by: Schamper <[email protected]>
1 parent f4a55fc commit 2c2d96e

File tree

2 files changed

+889
-0
lines changed

2 files changed

+889
-0
lines changed

dissect/util/ldap.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
from __future__ import annotations
2+
3+
import operator
4+
import re
5+
from enum import Enum
6+
7+
from dissect.util.exceptions import Error
8+
9+
10+
class InvalidQueryError(Error):
11+
pass
12+
13+
14+
class LogicalOperator(Enum):
15+
AND = "&"
16+
OR = "|"
17+
NOT = "!"
18+
19+
20+
_LOGICAL_OPERATORS = tuple(op.value for op in LogicalOperator)
21+
22+
23+
class ComparisonOperator(Enum):
24+
GE = ">="
25+
LE = "<="
26+
GT = ">"
27+
LT = "<"
28+
EQ = "="
29+
APPROX = "~="
30+
BIT = ":="
31+
EXTENDED = ":"
32+
33+
34+
_NORMAL_COMPARISON_OPERATORS = [op for op in ComparisonOperator if op != ComparisonOperator.EXTENDED]
35+
_SORTED_COMPARISON_OPERATORS = sorted(_NORMAL_COMPARISON_OPERATORS, key=lambda op: len(op.value), reverse=True)
36+
37+
_RE_EXTENDED = re.compile(r"(.+?):(.+?):=(.+)?")
38+
39+
40+
class SearchFilter:
41+
"""Represents an LDAP search filter (simple or nested).
42+
43+
Args:
44+
query: The LDAP search filter string.
45+
"""
46+
47+
def __init__(self, query: str) -> None:
48+
self.query: str = query
49+
50+
self.children: list[SearchFilter] = []
51+
self.operator: LogicalOperator | ComparisonOperator | None = None
52+
self.attribute: str | None = None
53+
self.value: str | None = None
54+
self._extended_rule: str | None = None
55+
56+
_validate_syntax(query)
57+
58+
if query[1:-1].startswith(_LOGICAL_OPERATORS):
59+
self._parse_nested()
60+
else:
61+
self._parse_simple()
62+
63+
def __repr__(self) -> str:
64+
if self.is_nested():
65+
return f"<SearchFilter nested operator={self.operator.value!r} children={self.children}>"
66+
return f"<SearchFilter attribute={self.attribute!r} operator={self.operator.value!r} value={self.value}>"
67+
68+
@classmethod
69+
def parse(cls, query: str, optimize: bool = True) -> SearchFilter:
70+
"""Parse an LDAP query into a filter object, with optional optimization."""
71+
result = cls(query)
72+
if optimize:
73+
return optimize_ldap_query(result)[0]
74+
return result
75+
76+
def is_nested(self) -> bool:
77+
"""Return whether the filter is nested (i.e., contains logical operators and child filters)."""
78+
return isinstance(self.operator, LogicalOperator)
79+
80+
def format(self) -> str:
81+
"""Format the search filter back into an LDAP query string."""
82+
if self.is_nested():
83+
childs = "".join([child.format() for child in self.children])
84+
return f"({self.operator.value}{childs})"
85+
86+
if self.operator == ComparisonOperator.EXTENDED:
87+
return f"({self.attribute}:{self._extended_rule}:={self.value})"
88+
89+
return f"({self.attribute}{self.operator.value}{self.value})"
90+
91+
def _parse_simple(self) -> None:
92+
"""Parse simple filter."""
93+
query = self.query[1:-1]
94+
95+
# Check for extended matching rules first
96+
if ":" in query and (match := _RE_EXTENDED.match(query)):
97+
self.operator = ComparisonOperator.EXTENDED
98+
self.attribute, self._extended_rule, self.value = match.groups()
99+
return
100+
101+
# Regular operator parsing
102+
test = query
103+
operators: list[ComparisonOperator] = []
104+
for op in _SORTED_COMPARISON_OPERATORS:
105+
if op.value not in test:
106+
continue
107+
108+
if test.count(op.value) > 1:
109+
raise InvalidQueryError(f"Comparison operator {op.value} found multiple times in query: {self.query}")
110+
111+
operators.append(op)
112+
test = test.replace(op.value, "")
113+
114+
if len(operators) == 0:
115+
raise InvalidQueryError(
116+
f"No comparison operator found in query: {self.query}. "
117+
f"Expected one of {[op.value for op in _NORMAL_COMPARISON_OPERATORS]}."
118+
)
119+
120+
if len(operators) > 1:
121+
raise InvalidQueryError(
122+
f"Multiple comparison operators found in query: {self.query} -> {[o.value for o in operators]} "
123+
f"Expected only one of {[op.value for op in _NORMAL_COMPARISON_OPERATORS]}."
124+
)
125+
126+
self.operator = operators[0]
127+
self.attribute, _, self.value = query.partition(self.operator.value)
128+
129+
def _parse_nested(self) -> None:
130+
"""Parse nested filter."""
131+
query = self.query[1:-1]
132+
self.operator = LogicalOperator(query[0])
133+
134+
start = 1
135+
while start < len(query):
136+
end = start + 1
137+
depth = 1
138+
139+
while end < len(query) and depth > 0:
140+
if query[end] == "(":
141+
depth += 1
142+
elif query[end] == ")":
143+
depth -= 1
144+
end += 1
145+
146+
self.children.append(SearchFilter(query[start:end]))
147+
start = end
148+
149+
150+
_ATTRIBUTE_WEIGHTS = {
151+
"objectGUID": 1,
152+
"distinguishedName": 1,
153+
"sAMAccountName": 2,
154+
"userPrincipalName": 2,
155+
"mail": 2,
156+
"sAMAccountType": 3,
157+
"servicePrincipalName": 3,
158+
"userAccountControl": 4,
159+
"memberOf": 5,
160+
"member": 5,
161+
"pwdLastSet": 5,
162+
"primaryGroupID": 6,
163+
"whenCreated": 6,
164+
"ou": 6,
165+
"lastLogonTimestamp": 6,
166+
"cn": 7,
167+
"givenName": 7,
168+
"name": 7,
169+
"telephoneNumber": 7,
170+
"objectCategory": 8,
171+
"description": 9,
172+
"objectClass": 10,
173+
}
174+
175+
176+
def optimize_ldap_query(query: SearchFilter) -> tuple[SearchFilter, int]:
177+
"""Optimize an LDAP query in-place.
178+
179+
Removes redundant conditions and sorts filters and conditions based on how specific they are.
180+
181+
Args:
182+
query: The LDAP query to optimize.
183+
184+
Returns:
185+
A tuple containing the optimized LDAP query and its weight.
186+
"""
187+
# Simplify single-child AND/OR
188+
if query.is_nested() and len(query.children) == 1 and query.operator in (LogicalOperator.AND, LogicalOperator.OR):
189+
return optimize_ldap_query(query.children[0])
190+
191+
# Sort nested children by weight
192+
if query.is_nested() and len(query.children) > 1:
193+
children = sorted((optimize_ldap_query(child) for child in query.children), key=operator.itemgetter(1))
194+
195+
query.children = [child for child, _ in children]
196+
query.query = query.format()
197+
198+
return query, max(weight for _, weight in children)
199+
200+
# Handle NOT
201+
if query.is_nested() and len(query.children) == 1 and query.operator == LogicalOperator.NOT:
202+
child, weight = optimize_ldap_query(query.children[0])
203+
204+
query.children[0] = child
205+
query.query = query.format()
206+
207+
return query, weight
208+
209+
# Base case: simple filter
210+
if not query.is_nested():
211+
return query, _ATTRIBUTE_WEIGHTS.get(query.attribute, max(_ATTRIBUTE_WEIGHTS.values()))
212+
213+
return query, max(_ATTRIBUTE_WEIGHTS.values())
214+
215+
216+
def _validate_syntax(query: str) -> None:
217+
"""Validate basic LDAP query syntax.
218+
219+
Args:
220+
query: The LDAP query to validate.
221+
"""
222+
if not query:
223+
raise InvalidQueryError("Empty query")
224+
225+
if not query.startswith("(") or not query.endswith(")"):
226+
raise InvalidQueryError(f"Query must be wrapped in parentheses: {query}")
227+
228+
if query.count("(") != query.count(")"):
229+
raise InvalidQueryError(f"Unbalanced parentheses in query: {query}")
230+
231+
# Check for empty parentheses
232+
if "()" in query:
233+
raise InvalidQueryError(f"Empty parentheses found in query: {query}")
234+
235+
# Check for queries that start with double opening parentheses
236+
if query.startswith("(("):
237+
raise InvalidQueryError(f"Invalid query structure: {query}")

0 commit comments

Comments
 (0)