diff --git a/scapy/layers/ldap.py b/scapy/layers/ldap.py index 3ad956e70db..43ee02347e7 100644 --- a/scapy/layers/ldap.py +++ b/scapy/layers/ldap.py @@ -99,7 +99,10 @@ # Typing imports from typing import ( + Any, + Dict, List, + Union, ) # Elements of protocol @@ -891,6 +894,29 @@ class LDAP_DelResponse(ASN1_Packet): ) +# Modify DN Operation +# https://datatracker.ietf.org/doc/html/rfc4511#section-4.9 + + +class LDAP_ModifyDNRequest(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + LDAPDN("entry", ""), + LDAPDN("newrdn", ""), + ASN1F_BOOLEAN("deleteoldrdn", ASN1_BOOLEAN(False)), + ASN1F_optional(LDAPDN("newSuperior", None, implicit_tag=0xA0)), + implicit_tag=ASN1_Class_LDAP.ModifyDNRequest, + ) + + +class LDAP_ModifyDNResponse(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + *LDAPResult, + implicit_tag=ASN1_Class_LDAP.ModifyDNResponse, + ) + + # Abandon Operation # https://datatracker.ietf.org/doc/html/rfc4511#section-4.11 @@ -1024,6 +1050,8 @@ class LDAP(ASN1_Packet): LDAP_AddResponse, LDAP_DelRequest, LDAP_DelResponse, + LDAP_ModifyDNRequest, + LDAP_ModifyDNResponse, LDAP_UnbindRequest, LDAP_ExtendedResponse, ), @@ -2128,7 +2156,7 @@ def search( attrsOnly=0, attributes: List[str] = [], controls: List[LDAP_Control] = [], - ): + ) -> Dict[str, List[Any]]: """ Perform a LDAP search. @@ -2189,7 +2217,12 @@ def search( resp.show() raise TimeoutError("Search timed out.") # Now, reassemble the results - _s = lambda x: x.decode(errors="backslashreplace") + + def _s(x): + try: + return x.decode() + except UnicodeDecodeError: + return x def _ssafe(x): if self._TEXT_REG.match(x): @@ -2272,6 +2305,94 @@ def modify( resp=resp, ) + def add( + self, + entry: str, + attributes: Union[Dict[str, List[Any]], List[ASN1_Packet]], + controls: List[LDAP_Control] = [], + ): + """ + Perform a LDAP add request. + + :param attributes: the attributes to add. We support two formats: + - a list of LDAP_Attribute (or LDAP_PartialAttribute) + - a dict following {attribute: [list of values]} + + :returns: + """ + # We handle the two cases in the type of attributes + if isinstance(attributes, dict): + attributes = [ + LDAP_Attribute( + type=ASN1_STRING(k), + values=[ + LDAP_AttributeValue( + value=ASN1_STRING(x), + ) + for x in v + ], + ) + for k, v in attributes.items() + ] + + resp = self.sr1( + LDAP_AddRequest( + entry=ASN1_STRING(entry), + attributes=attributes, + ), + controls=controls, + timeout=3, + ) + if LDAP_AddResponse not in resp.protocolOp or resp.protocolOp.resultCode != 0: + raise LDAP_Exception( + "LDAP add failed !", + resp=resp, + ) + + def modifydn( + self, + entry: str, + newdn: str, + deleteoldrdn=True, + controls: List[LDAP_Control] = [], + ): + """ + Perform a LDAP modify DN request. + + ..note:: This functions calculates the relative DN and superior required for + LDAP ModifyDN automatically. + + :param entry: the DN of the entry to rename. + :param newdn: the new FULL DN of the entry. + :returns: + """ + # RFC4511 sect 4.9 + # Calculate the newrdn (relative DN) and superior + newrdn, newSuperior = newdn.split(",", 1) + _, cur_superior = entry.split(",", 1) + # If the superior hasn't changed, don't update it. + if cur_superior == newSuperior: + newSuperior = None + # Send the request + resp = self.sr1( + LDAP_ModifyDNRequest( + entry=entry, + newrdn=newrdn, + newSuperior=newSuperior, + deleteoldrdn=deleteoldrdn, + ), + controls=controls, + timeout=3, + ) + if ( + LDAP_ModifyDNResponse not in resp.protocolOp + or resp.protocolOp.resultCode != 0 + ): + raise LDAP_Exception( + "LDAP modify failed !", + resp=resp, + ) + def close(self): if self.verb: print("X Connection closed\n")