Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions scapy/layers/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@

# Typing imports
from typing import (
Any,
Dict,
List,
Union,
)

# Elements of protocol
Expand Down Expand Up @@ -891,6 +894,29 @@ class LDAP_DelResponse(ASN1_Packet):
)


# Modify DN Operation
# https://datatracker.ietf.org/doc/html/rfc4511#section-4.9


class LDAP_ModifyDNRequest(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
ASN1_root = ASN1F_SEQUENCE(
LDAPDN("entry", ""),
LDAPDN("newrdn", ""),
ASN1F_BOOLEAN("deleteoldrdn", ASN1_BOOLEAN(False)),
ASN1F_optional(LDAPDN("newSuperior", None, implicit_tag=0xA0)),
implicit_tag=ASN1_Class_LDAP.ModifyDNRequest,
)


class LDAP_ModifyDNResponse(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
ASN1_root = ASN1F_SEQUENCE(
*LDAPResult,
implicit_tag=ASN1_Class_LDAP.ModifyDNResponse,
)


# Abandon Operation
# https://datatracker.ietf.org/doc/html/rfc4511#section-4.11

Expand Down Expand Up @@ -1024,6 +1050,8 @@ class LDAP(ASN1_Packet):
LDAP_AddResponse,
LDAP_DelRequest,
LDAP_DelResponse,
LDAP_ModifyDNRequest,
LDAP_ModifyDNResponse,
LDAP_UnbindRequest,
LDAP_ExtendedResponse,
),
Expand Down Expand Up @@ -2128,7 +2156,7 @@ def search(
attrsOnly=0,
attributes: List[str] = [],
controls: List[LDAP_Control] = [],
):
) -> Dict[str, List[Any]]:
"""
Perform a LDAP search.

Expand Down Expand Up @@ -2189,7 +2217,12 @@ def search(
resp.show()
raise TimeoutError("Search timed out.")
# Now, reassemble the results
_s = lambda x: x.decode(errors="backslashreplace")

def _s(x):
try:
return x.decode()
except UnicodeDecodeError:
return x

def _ssafe(x):
if self._TEXT_REG.match(x):
Expand Down Expand Up @@ -2272,6 +2305,94 @@ def modify(
resp=resp,
)

def add(
self,
entry: str,
attributes: Union[Dict[str, List[Any]], List[ASN1_Packet]],
controls: List[LDAP_Control] = [],
):
"""
Perform a LDAP add request.

:param attributes: the attributes to add. We support two formats:
- a list of LDAP_Attribute (or LDAP_PartialAttribute)
- a dict following {attribute: [list of values]}

:returns:
"""
# We handle the two cases in the type of attributes
if isinstance(attributes, dict):
attributes = [
LDAP_Attribute(
type=ASN1_STRING(k),
values=[
LDAP_AttributeValue(
value=ASN1_STRING(x),
)
for x in v
],
)
for k, v in attributes.items()
]

resp = self.sr1(
LDAP_AddRequest(
entry=ASN1_STRING(entry),
attributes=attributes,
),
controls=controls,
timeout=3,
)
if LDAP_AddResponse not in resp.protocolOp or resp.protocolOp.resultCode != 0:
raise LDAP_Exception(
"LDAP add failed !",
resp=resp,
)

def modifydn(
self,
entry: str,
newdn: str,
deleteoldrdn=True,
controls: List[LDAP_Control] = [],
):
"""
Perform a LDAP modify DN request.

..note:: This functions calculates the relative DN and superior required for
LDAP ModifyDN automatically.

:param entry: the DN of the entry to rename.
:param newdn: the new FULL DN of the entry.
:returns:
"""
# RFC4511 sect 4.9
# Calculate the newrdn (relative DN) and superior
newrdn, newSuperior = newdn.split(",", 1)
_, cur_superior = entry.split(",", 1)
# If the superior hasn't changed, don't update it.
if cur_superior == newSuperior:
newSuperior = None
# Send the request
resp = self.sr1(
LDAP_ModifyDNRequest(
entry=entry,
newrdn=newrdn,
newSuperior=newSuperior,
deleteoldrdn=deleteoldrdn,
),
controls=controls,
timeout=3,
)
if (
LDAP_ModifyDNResponse not in resp.protocolOp
or resp.protocolOp.resultCode != 0
):
raise LDAP_Exception(
"LDAP modify failed !",
resp=resp,
)

def close(self):
if self.verb:
print("X Connection closed\n")
Expand Down
Loading