diff --git a/doc/scapy/layers/smb.rst b/doc/scapy/layers/smb.rst index 20482142246..6b86d80f8ff 100644 --- a/doc/scapy/layers/smb.rst +++ b/doc/scapy/layers/smb.rst @@ -76,7 +76,7 @@ You might be wondering if you can pass the ``HashNT`` of the password of the use .. code:: python - >>> smbclient("server1.domain.local", ssp=KerberosSSP(SPN="cifs/server1", UPN="Administrator@domain.local", PASSWORD="password")) + >>> smbclient("server1.domain.local", ssp=KerberosSSP(UPN="Administrator@domain.local", PASSWORD="password")) **smbclient using a** :class:`~scapy.layers.ntlm.KerberosSSP` **created by** `Ticketer++ `_: @@ -155,7 +155,6 @@ Let's write a script that connects to a share and list the files in the root fol KerberosSSP( UPN="Administrator@domain.local", PASSWORD=password, - SPN="cifs/server1", ) ]) # Connect to the server diff --git a/scapy/config.py b/scapy/config.py index d302b257840..2a385c2f579 100755 --- a/scapy/config.py +++ b/scapy/config.py @@ -599,6 +599,7 @@ class ExtsManager(importlib.abc.MetaPathFinder): def __init__(self): self.exts: List[ScapyExt] = [] self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} + self._loaded: List[str] = [] # Add to meta_path as we are an import provider if self not in sys.meta_path: sys.meta_path.append(self) @@ -628,6 +629,9 @@ def load(self, extension: str): :param extension: the name of the extension, as installed. """ + if extension in self._loaded: + return + try: import importlib.metadata except ImportError: @@ -686,6 +690,7 @@ def load(self, extension: str): # Add to the extension list self.exts.append(ext) + self._loaded.append(extension) # If there are bash autocompletions, add them if ext.bash_completions: diff --git a/scapy/layers/gssapi.py b/scapy/layers/gssapi.py index f1743770eef..547e09a4734 100644 --- a/scapy/layers/gssapi.py +++ b/scapy/layers/gssapi.py @@ -455,6 +455,7 @@ def GSS_Init_sec_context( self, Context: CONTEXT, token=None, + target_name: Optional[str] = None, req_flags: Optional[GSS_C_FLAGS] = None, chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, ): diff --git a/scapy/layers/http.py b/scapy/layers/http.py index fede3e60594..016337738fc 100644 --- a/scapy/layers/http.py +++ b/scapy/layers/http.py @@ -942,6 +942,7 @@ def request( self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, ssp_blob, + target_name="http/" + host, req_flags=0, chan_bindings=self.chan_bindings, ) diff --git a/scapy/layers/kerberos.py b/scapy/layers/kerberos.py index 9e292dcf7e2..39e734fe724 100644 --- a/scapy/layers/kerberos.py +++ b/scapy/layers/kerberos.py @@ -143,6 +143,12 @@ from scapy.layers.smb2 import STATUS_ERREF from scapy.layers.x509 import X509_AlgorithmIdentifier +# Redirect exports from RFC3961 +try: + from scapy.libs.rfc3961 import * # noqa: F401,F403 +except ImportError: + pass + # Typing imports from typing import ( Optional, @@ -4008,7 +4014,8 @@ class KerberosSSP(SSP): :param ST: the service ticket to use for access. If not provided, will be retrieved - :param SPN: the SPN of the service to use + :param SPN: the SPN of the service to use. If not provided, will use the + target_name provided in the GSS_Init_sec_context :param UPN: The client UPN :param DC_IP: (optional) is ST+KEY are not provided, will need to contact the KDC at this IP. If not provided, will perform dc locator. @@ -4506,6 +4513,7 @@ def GSS_Init_sec_context( self, Context: CONTEXT, token=None, + target_name: Optional[str] = None, req_flags: Optional[GSS_C_FLAGS] = None, chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, ): @@ -4536,8 +4544,8 @@ def GSS_Init_sec_context( # Do we have a ST? if self.ST is None: # Client sends an AP-req - if not self.SPN: - raise ValueError("Missing SPN attribute") + if not self.SPN and not target_name: + raise ValueError("Missing SPN/target_name attribute") additional_tickets = [] if self.U2U: try: @@ -4559,7 +4567,7 @@ def GSS_Init_sec_context( # Use TGT res = krb_tgs_req( upn=self.UPN, - spn=self.SPN, + spn=self.SPN or target_name, ip=self.DC_IP, sessionkey=self.KEY, ticket=self.TGT, @@ -4571,7 +4579,7 @@ def GSS_Init_sec_context( # Ask for TGT then ST res = krb_as_and_tgs( upn=self.UPN, - spn=self.SPN, + spn=self.SPN or target_name, ip=self.DC_IP, key=self.KEY, password=self.PASSWORD, diff --git a/scapy/layers/ldap.py b/scapy/layers/ldap.py index ee358de34c4..c09e07e64c1 100644 --- a/scapy/layers/ldap.py +++ b/scapy/layers/ldap.py @@ -1800,6 +1800,7 @@ def __init__( verb=True, ): self.sock = None + self.host = None self.verb = verb self.ssl = False self.sslcontext = None @@ -1815,7 +1816,7 @@ def __init__( def connect( self, - ip, + host, port=None, use_ssl=False, sslcontext=None, @@ -1826,7 +1827,7 @@ def connect( """ Initiate a connection - :param ip: the IP or hostname to connect to. + :param host: the IP or hostname to connect to. :param port: the port to connect to. (Default: 389 or 636) :param use_ssl: whether to use LDAPS or not. (Default: False) @@ -1844,17 +1845,18 @@ def connect( port = 389 sock = socket.socket() self.timeout = timeout + self.host = host sock.settimeout(timeout) if self.verb: print( "\u2503 Connecting to %s on port %s%s..." % ( - ip, + host, port, " with SSL" if self.ssl else "", ) ) - sock.connect((ip, port)) + sock.connect((host, port)) if self.verb: print( conf.color_theme.green( @@ -1872,7 +1874,7 @@ def connect( context = ssl.create_default_context() else: context = self.sslcontext - sock = context.wrap_socket(sock, server_hostname=sni or ip) + sock = context.wrap_socket(sock, server_hostname=sni or host) # Wrap the socket in a Scapy socket if self.ssl: self.sock = SSLStreamSocket(sock, LDAP) @@ -2042,6 +2044,7 @@ def bind( # 2. First exchange: Negotiate self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, + target_name="ldap/" + self.host, req_flags=( GSS_C_FLAGS.GSS_C_REPLAY_FLAG | GSS_C_FLAGS.GSS_C_SEQUENCE_FLAG @@ -2068,6 +2071,7 @@ def bind( self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, GSSAPI_BLOB(val), + target_name="ldap/" + self.host, chan_bindings=self.chan_bindings, ) resp = self.sr1( @@ -2090,6 +2094,7 @@ def bind( # GSSAPI or SPNEGO self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, + target_name="ldap/" + self.host, req_flags=( # Required flags for GSSAPI: RFC4752 sect 3.1 GSS_C_FLAGS.GSS_C_REPLAY_FLAG @@ -2122,6 +2127,7 @@ def bind( self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, GSSAPI_BLOB(val), + target_name="ldap/" + self.host, chan_bindings=self.chan_bindings, ) else: diff --git a/scapy/layers/msrpce/msnrpc.py b/scapy/layers/msrpce/msnrpc.py index a07cb42aa9b..fef1007e562 100644 --- a/scapy/layers/msrpce/msnrpc.py +++ b/scapy/layers/msrpce/msnrpc.py @@ -475,7 +475,12 @@ def GSS_VerifyMICEx(self, Context, msgs, signature): self._unsecure(Context, msgs, signature, False) def GSS_Init_sec_context( - self, Context, token=None, req_flags: Optional[GSS_C_FLAGS] = None + self, + Context: CONTEXT, + token=None, + target_name: Optional[str] = None, + req_flags: Optional[GSS_C_FLAGS] = None, + chan_bindings: bytes = GSS_C_NO_CHANNEL_BINDINGS, ): if Context is None: Context = self.CONTEXT(True, req_flags=req_flags, AES=self.AES) diff --git a/scapy/layers/msrpce/rpcclient.py b/scapy/layers/msrpce/rpcclient.py index 0d3dec573dd..6e81221dfec 100644 --- a/scapy/layers/msrpce/rpcclient.py +++ b/scapy/layers/msrpce/rpcclient.py @@ -76,6 +76,7 @@ def __init__(self, transport, ndr64=False, ndrendian="little", verb=True, **kwar self.ndr64 = ndr64 self.ndrendian = ndrendian self.verb = verb + self.host = None self.auth_level = kwargs.pop("auth_level", DCE_C_AUTHN_LEVEL.NONE) self.auth_context_id = kwargs.pop("auth_context_id", 0) self.ssp = kwargs.pop("ssp", None) # type: SSP @@ -100,7 +101,7 @@ def from_smblink(cls, smbcli, smb_kwargs={}, **kwargs): ) return client - def connect(self, ip, port=None, timeout=5, smb_kwargs={}): + def connect(self, host, port=None, timeout=5, smb_kwargs={}): """ Initiate a connection """ @@ -113,14 +114,15 @@ def connect(self, ip, port=None, timeout=5, smb_kwargs={}): raise ValueError( "Can't guess the port for transport: %s" % self.transport ) + self.host = host sock = socket.socket() sock.settimeout(timeout) if self.verb: print( "\u2503 Connecting to %s on port %s via %s..." - % (ip, port, repr(self.transport)) + % (host, port, repr(self.transport)) ) - sock.connect((ip, port)) + sock.connect((host, port)) if self.verb: print( conf.color_theme.green( @@ -313,6 +315,7 @@ def _bind(self, interface, reqcls, respcls): else 0 ) ), + target_name="host/" + self.host, ) if status not in [GSS_S_CONTINUE_NEEDED, GSS_S_COMPLETE]: # Authentication failed. @@ -349,6 +352,7 @@ def _bind(self, interface, reqcls, respcls): self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, token=resp.auth_verifier.auth_value, + target_name="host/" + self.host, ) if status in [GSS_S_CONTINUE_NEEDED, GSS_S_COMPLETE]: # Authentication should continue, in two ways: @@ -390,6 +394,7 @@ def _bind(self, interface, reqcls, respcls): self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( self.sspcontext, token=resp.auth_verifier.auth_value, + target_name="host/" + self.host, ) # Check context acceptance if ( diff --git a/scapy/layers/ntlm.py b/scapy/layers/ntlm.py index cb73963bbdc..7f92430e1f8 100644 --- a/scapy/layers/ntlm.py +++ b/scapy/layers/ntlm.py @@ -1388,6 +1388,7 @@ def GSS_Init_sec_context( self, Context: CONTEXT, token=None, + target_name: Optional[str] = None, req_flags: Optional[GSS_C_FLAGS] = None, chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, ): diff --git a/scapy/layers/smb2.py b/scapy/layers/smb2.py index 7a6e2be44ed..a19c5b97159 100644 --- a/scapy/layers/smb2.py +++ b/scapy/layers/smb2.py @@ -1594,7 +1594,7 @@ def post_build(self, pkt, pay): }, config=[ ("Offset", _NTLM_ENUM.OFFSET), - ] + ], ) + pay ) diff --git a/scapy/layers/smbclient.py b/scapy/layers/smbclient.py index 64b96589612..14eb47f8cf5 100644 --- a/scapy/layers/smbclient.py +++ b/scapy/layers/smbclient.py @@ -20,7 +20,6 @@ import threading from scapy.automaton import ATMT, Automaton, ObjectPipe -from scapy.base_classes import Net from scapy.config import conf from scapy.error import Scapy_Exception from scapy.fields import UTCTimeField @@ -29,8 +28,6 @@ CLIUtil, pretty_list, human_size, - valid_ip, - valid_ip6, ) from scapy.volatile import RandUUID @@ -40,12 +37,6 @@ GSS_S_CONTINUE_NEEDED, GSS_C_FLAGS, ) -from scapy.layers.inet6 import Net6 -from scapy.layers.kerberos import ( - KerberosSSP, - krb_as_and_tgs, - _parse_upn, -) from scapy.layers.msrpce.raw.ms_srvs import ( LPSHARE_ENUM_STRUCT, NetrShareEnum_Request, @@ -54,7 +45,6 @@ ) from scapy.layers.ntlm import ( NTLMSSP, - MD4le, ) from scapy.layers.smb import ( SMBNegotiate_Request, @@ -146,7 +136,7 @@ def __init__(self, sock, ssp=None, *args, **kwargs): self.REQUIRE_ENCRYPTION = kwargs.pop("REQUIRE_ENCRYPTION", False) self.RETRY = kwargs.pop("RETRY", 0) # optionally: retry n times session setup self.SMB2 = kwargs.pop("SMB2", False) # optionally: start directly in SMB2 - self.SERVER_NAME = kwargs.pop("SERVER_NAME", "") + self.HOST = kwargs.pop("HOST", "") # Store supported dialects if "DIALECTS" in kwargs: self.DIALECTS = kwargs.pop("DIALECTS") @@ -361,7 +351,7 @@ def on_negotiate_smb2(self): # TODO support compression and RDMA SMB2_Negotiate_Context() / SMB2_Netname_Negotiate_Context_ID( - NetName=self.SERVER_NAME, + NetName=self.HOST, ), SMB2_Negotiate_Context() / SMB2_Signing_Capabilities( @@ -458,6 +448,7 @@ def NEGOTIATED(self, ssp_blob=None): ssp_tuple = self.session.ssp.GSS_Init_sec_context( self.session.sspcontext, token=ssp_blob, + target_name="cifs/" + self.HOST if self.HOST else None, req_flags=( GSS_C_FLAGS.GSS_C_MUTUAL_FLAG | (GSS_C_FLAGS.GSS_C_INTEG_FLAG if self.session.SigningRequired else 0) @@ -618,6 +609,7 @@ def AUTHENTICATED(self, ssp_blob=None): self.session.sspcontext, _, status = self.session.ssp.GSS_Init_sec_context( self.session.sspcontext, token=ssp_blob, + target_name="cifs/" + self.HOST if self.HOST else None, ) if status != GSS_S_COMPLETE: raise ValueError("Internal error: the SSP completed with an error.") @@ -1098,17 +1090,18 @@ def close(self): @conf.commands.register class smbclient(CLIUtil): r""" - A simple smbclient CLI + A simple SMB client CLI powered by Scapy :param target: can be a hostname, the IPv4 or the IPv6 to connect to :param UPN: the upn to use (DOMAIN/USER, DOMAIN\USER, USER@DOMAIN or USER) :param guest: use guest mode (over NTLM) :param ssp: if provided, use this SSP for auth. - :param kerberos: if available, whether to use Kerberos or not :param kerberos_required: require kerberos :param port: the TCP port. default 445 - :param password: (string) if provided, used for auth - :param HashNt: (bytes) if provided, used for auth (NTLM) + :param password: if provided, used for auth + :param HashNt: if provided, used for auth (NTLM) + :param HashAes256Sha96: if provided, used for auth (Kerberos) + :param HashAes128Sha96: if provided, used for auth (Kerberos) :param ST: if provided, the service ticket to use (Kerberos) :param KEY: if provided, the session key associated to the ticket (Kerberos) :param cli: CLI mode (default True). False to use for scripting @@ -1125,9 +1118,10 @@ def __init__( UPN: str = None, password: str = None, guest: bool = False, - kerberos: bool = True, kerberos_required: bool = False, - HashNt: str = None, + HashNt: bytes = None, + HashAes256Sha96: bytes = None, + HashAes128Sha96: bytes = None, port: int = 445, timeout: int = 2, debug: int = 0, @@ -1141,71 +1135,30 @@ def __init__( ): if cli: self._depcheck() - hostname = None - # Check if target is a hostname / Check IP - if ":" in target: - family = socket.AF_INET6 - if not valid_ip6(target): - hostname = target - target = str(Net6(target)) - else: - family = socket.AF_INET - if not valid_ip(target): - hostname = target - target = str(Net(target)) assert UPN or ssp or guest, "Either UPN, ssp or guest must be provided !" # Do we need to build a SSP? if ssp is None: # Create the SSP (only if not guest mode) if not guest: - # Check UPN - try: - _, realm = _parse_upn(UPN) - if realm == ".": - # Local - kerberos = False - except ValueError: - # not a UPN: NTLM - kerberos = False - # Do we need to ask the password? - if HashNt is None and password is None and ST is None: - # yes. - from prompt_toolkit import prompt - - password = prompt("Password: ", is_password=True) - ssps = [] - # Kerberos - if kerberos and hostname: - if ST is None: - resp = krb_as_and_tgs( - upn=UPN, - spn="cifs/%s" % hostname, - password=password, - debug=debug, - ) - if resp is not None: - ST, KEY = resp.tgsrep.ticket, resp.sessionkey - if ST: - ssps.append(KerberosSSP(UPN=UPN, ST=ST, KEY=KEY, debug=debug)) - elif kerberos_required: - raise ValueError( - "Kerberos required but target isn't a hostname !" - ) - elif kerberos_required: - raise ValueError( - "Kerberos required but domain not specified in the UPN, " - "or target isn't a hostname !" - ) - # NTLM - if not kerberos_required: - if HashNt is None and password is not None: - HashNt = MD4le(password) - ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt)) - # Build the SSP - ssp = SPNEGOSSP(ssps) + ssp = SPNEGOSSP.from_cli_arguments( + UPN=UPN, + target=target, + password=password, + HashNt=HashNt, + HashAes256Sha96=HashAes256Sha96, + HashAes128Sha96=HashAes128Sha96, + ST=ST, + KEY=KEY, + kerberos_required=kerberos_required, + ) else: # Guest mode ssp = None + # Check if target is IPv4 or IPv6 + if ":" in target: + family = socket.AF_INET6 + else: + family = socket.AF_INET # Open socket sock = socket.socket(family, socket.SOCK_STREAM) # Configure socket for SMB: @@ -1218,11 +1171,13 @@ def __init__( sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10) # Timeout & connect sock.settimeout(timeout) + if debug: + print("Connecting to %s:%s" % (target, port)) sock.connect((target, port)) self.extra_create_options = [] # Wrap with the automaton self.timeout = timeout - kwargs.setdefault("SERVER_NAME", target) + kwargs.setdefault("HOST", target) self.sock = SMB_Client.from_tcpsock( sock, ssp=ssp, diff --git a/scapy/layers/spnego.py b/scapy/layers/spnego.py index fa1f5e8926f..75ee2202416 100644 --- a/scapy/layers/spnego.py +++ b/scapy/layers/spnego.py @@ -38,6 +38,7 @@ ASN1F_optional, ) from scapy.asn1packet import ASN1_Packet +from scapy.base_classes import Net from scapy.fields import ( FieldListField, LEIntEnumField, @@ -56,7 +57,12 @@ XStrLenField, ) from scapy.packet import Packet, bind_layers +from scapy.utils import ( + valid_ip, + valid_ip6, +) +from scapy.layers.inet6 import Net6 from scapy.layers.gssapi import ( GSSAPI_BLOB, GSSAPI_BLOB_SIGNATURE, @@ -75,8 +81,12 @@ # SSP Providers from scapy.layers.kerberos import ( Kerberos, + KerberosSSP, + _parse_upn, ) from scapy.layers.ntlm import ( + NTLMSSP, + MD4le, NEGOEX_EXCHANGE_NTLM, NTLM_Header, _NTLMPayloadField, @@ -619,6 +629,116 @@ def __init__(self, ssps, **kwargs): self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) super(SPNEGOSSP, self).__init__(**kwargs) + @classmethod + def from_cli_arguments( + cls, + UPN: str, + target: str, + password: str = None, + HashNt: bytes = None, + HashAes256Sha96: bytes = None, + HashAes128Sha96: bytes = None, + kerberos_required: bool = False, + ST=None, + KEY=None, + debug: int = 0, + ): + """ + Initialize a SPNEGOSSP from a list of many arguments. + This is useful in a CLI, with NTLM and Kerberos supported by default. + + :param UPN: the UPN of the user to use. + :param target: the target IP/hostname entered by the user. + :param kerberos_required: require kerberos + :param password: (string) if provided, used for auth + :param HashNt: (bytes) if provided, used for auth (NTLM) + :param HashAes256Sha96: (bytes) if provided, used for auth (Kerberos) + :param HashAes128Sha96: (bytes) if provided, used for auth (Kerberos) + :param ST: if provided, the service ticket to use (Kerberos) + :param KEY: if ST provided, the session key associated to the ticket (Kerberos). + Else, the user secret key. + """ + kerberos = True + hostname = None + # Check if target is a hostname / Check IP + if ":" in target: + if not valid_ip6(target): + hostname = target + target = str(Net6(target)) + else: + if not valid_ip(target): + hostname = target + target = str(Net(target)) + # Check UPN + try: + _, realm = _parse_upn(UPN) + if realm == ".": + # Local + kerberos = False + except ValueError: + # not a UPN: NTLM only + kerberos = False + # Do we need to ask the password? + if HashNt is None and password is None and ST is None: + # yes. + from prompt_toolkit import prompt + + password = prompt("Password: ", is_password=True) + ssps = [] + # Kerberos + if kerberos and hostname: + # Get ticket if we don't already have one. + if ST is None: + # In this case, KEY is supposed to be the user's key. + from scapy.libs.rfc3961 import Key, EncryptionType + + if KEY is None and HashAes256Sha96: + KEY = Key( + EncryptionType.AES256_CTS_HMAC_SHA1_96, + HashAes256Sha96, + ) + elif KEY is None and HashAes128Sha96: + KEY = Key( + EncryptionType.AES128_CTS_HMAC_SHA1_96, + HashAes128Sha96, + ) + elif KEY is None and HashNt: + KEY = Key( + EncryptionType.RC4_HMAC, + HashNt, + ) + # Make a SSP that only has a UPN and secret. + ssps.append( + KerberosSSP( + UPN=UPN, + PASSWORD=password, + KEY=KEY, + debug=debug, + ) + ) + else: + # We have a ST, use it with the key. + ssps.append( + KerberosSSP( + UPN=UPN, + ST=ST, + KEY=KEY, + debug=debug, + ) + ) + elif kerberos_required: + raise ValueError( + "Kerberos required but domain not specified in the UPN, " + "or target isn't a hostname !" + ) + # NTLM + if not kerberos_required: + if HashNt is None and password is not None: + HashNt = MD4le(password) + ssps.append(NTLMSSP(UPN=UPN, HASHNT=HashNt)) + # Build the SSP + return cls(ssps) + def _extract_gssapi(self, Context, x): status, otherMIC, rawToken = None, None, False # Extract values from GSSAPI @@ -718,6 +838,7 @@ def _common_spnego_handler( Context, IsClient, token=None, + target_name: Optional[str] = None, req_flags=None, chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, ): @@ -779,13 +900,10 @@ def _common_spnego_handler( # The currently provided token is for this SSP ! # Pass it to the sub ssp, with its own context if IsClient: - ( - Context.sub_context, - tok, - status, - ) = Context.ssp.GSS_Init_sec_context( + Context.sub_context, tok, status = Context.ssp.GSS_Init_sec_context( Context.sub_context, token=token, + target_name=target_name, req_flags=Context.req_flags, chan_bindings=chan_bindings, ) @@ -946,6 +1064,7 @@ def GSS_Init_sec_context( self, Context: CONTEXT, token=None, + target_name: Optional[str] = None, req_flags: Optional[GSS_C_FLAGS] = None, chan_bindings: GssChannelBindings = GSS_C_NO_CHANNEL_BINDINGS, ): @@ -953,6 +1072,7 @@ def GSS_Init_sec_context( Context, True, token=token, + target_name=target_name, req_flags=req_flags, chan_bindings=chan_bindings, ) diff --git a/scapy/libs/rfc3961.py b/scapy/libs/rfc3961.py index baa041a1c4c..858a9fa2073 100644 --- a/scapy/libs/rfc3961.py +++ b/scapy/libs/rfc3961.py @@ -22,10 +22,12 @@ # TODO: support cipher states... __all__ = [ - "EncryptionType", "ChecksumType", - "Key", + "EncryptionType", "InvalidChecksum", + "KRB_FX_CF2", + "Key", + "SP800108_KDFCTR", "_rfc1964pad", ] diff --git a/scapy/utils.py b/scapy/utils.py index 2a381dac9d8..320349243c6 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -3940,13 +3940,14 @@ def AutoArgparse( # Process the parameters positional = [] noargument = [] + hexarguments = [] parameters = {} for param in inspect.signature(func).parameters.values(): if not param.annotation: continue noarg = False - parname = param.name - paramkwargs = {} + parname = param.name.replace("_", "-") + paramkwargs: Dict[str, Any] = {} if param.annotation is bool: if param.default is True: parname = "no-" + parname @@ -3954,6 +3955,9 @@ def AutoArgparse( else: paramkwargs["action"] = "store_true" noarg = True + elif param.annotation is bytes: + paramkwargs["type"] = str + hexarguments.append(parname) elif param.annotation in [str, int, float]: paramkwargs["type"] = param.annotation else: @@ -3971,6 +3975,14 @@ def AutoArgparse( paramkwargs["action"] = "append" if param.name in argsdoc: paramkwargs["help"] = argsdoc[param.name] + if param.annotation is bytes: + paramkwargs["help"] = "(hex) " + paramkwargs["help"] + elif param.annotation is bool: + paramkwargs["help"] = "(flag) " + paramkwargs["help"] + else: + paramkwargs["help"] = ( + "(%s) " % param.annotation.__name__ + paramkwargs["help"] + ) # Add to the parameter list parameters[parname] = paramkwargs if noarg: @@ -3992,11 +4004,25 @@ def AutoArgparse( # Add parameters to parser for parname, paramkwargs in parameters.items(): - parser.add_argument(parname, **paramkwargs) # type: ignore + parser.add_argument(parname, **paramkwargs) # Now parse the sys.argv parameters params = vars(parser.parse_args()) + # Convert hex parameters if provided + for p in hexarguments: + if params[p] is not None: + try: + params[p] = bytes.fromhex(params[p]) + except ValueError: + print( + conf.color_theme.fail( + "ERROR: the value of parameter %s " + "'%s' is not valid hexadecimal !" % (p, params[p]) + ) + ) + return None + # Act as in interactive mode conf.logLevel = 20 from scapy.themes import DefaultTheme @@ -4011,7 +4037,7 @@ def AutoArgparse( } ) except AssertionError as ex: - print("ERROR: " + str(ex)) + print(conf.color_theme.fail("ERROR: " + str(ex))) parser.print_help() return None