diff --git a/scapy/config.py b/scapy/config.py index 1a70b7cf440..6c10d791684 100755 --- a/scapy/config.py +++ b/scapy/config.py @@ -7,11 +7,11 @@ Implementation of the configuration object. """ - import atexit import copy import functools import os +import pathlib import re import socket import sys @@ -538,7 +538,7 @@ def __repr__(self): class ScapyExt: - __slots__ = ["specs", "name", "version"] + __slots__ = ["specs", "name", "version", "bash_completions"] class MODE(Enum): LAYERS = "layers" @@ -554,6 +554,7 @@ class ScapyExtSpec: def __init__(self): self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {} + self.bash_completions = {} def config(self, name, version): self.name = name @@ -576,6 +577,9 @@ def register(self, name, mode, path, default=None): spec.default = bool(importlib.util.find_spec(spec.fullname)) self.specs[fullname] = spec + def register_bashcompletion(self, script: pathlib.Path): + self.bash_completions[script.name] = script + def __repr__(self): return "" % ( self.name, @@ -585,18 +589,19 @@ def __repr__(self): class ExtsManager(importlib.abc.MetaPathFinder): - __slots__ = ["exts", "_loaded", "all_specs"] + __slots__ = ["exts", "all_specs"] - SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy' - GPLV2_CLASSIFIERS = [ - 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', - 'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)', + GPLV2_LICENCES = [ + "GPL-2.0-only", + "GPL-2.0-or-later", ] def __init__(self): self.exts: List[ScapyExt] = [] self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} - self._loaded = [] + # Add to meta_path as we are an import provider + if self not in sys.meta_path: + sys.meta_path.append(self) def find_spec(self, fullname, path, target=None): if fullname in self.all_specs: @@ -606,7 +611,10 @@ def invalidate_caches(self): pass def _register_spec(self, spec): + # Register to known specs self.all_specs[spec.fullname] = spec + + # If default=True, inject it in the currently loaded modules if spec.default: loader = importlib.util.LazyLoader(spec.spec.loader) spec.spec.loader = loader @@ -614,73 +622,76 @@ def _register_spec(self, spec): sys.modules[spec.fullname] = module loader.exec_module(module) - def load(self): + def load(self, extension: str): + """ + Load a scapy extension. + + :param extension: the name of the extension, as installed. + """ try: import importlib.metadata except ImportError: + raise ImportError("Cannot import importlib.metadata ! Upgrade Python.") + + # Get extension distribution + distr = importlib.metadata.distribution(extension) + + # Check the classifiers + if distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES: + log_loading.warning( + "'%s' has no GPLv2 classifier therefore cannot be loaded." % extension + ) return - for distr in importlib.metadata.distributions(): - if any( - v == self.SCAPY_PLUGIN_CLASSIFIER - for k, v in distr.metadata.items() if k == 'Classifier' - ): - try: - # Python 3.13 raises an internal warning when calling this - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - pkg = next( - k - for k, v in - importlib.metadata.packages_distributions().items() - if distr.name in v - ) - except KeyError: - pkg = distr.name - if pkg in self._loaded: - continue - if not any( - v in self.GPLV2_CLASSIFIERS - for k, v in distr.metadata.items() if k == 'Classifier' - ): - log_loading.warning( - "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501 - ) - continue - self._loaded.append(pkg) - ext = ScapyExt() - try: - scapy_ext = importlib.import_module(pkg) - except Exception as ex: - log_loading.warning( - "'%s' failed during import with %s" % ( - pkg, - ex - ) - ) - continue - try: - scapy_ext_func = scapy_ext.scapy_ext - except AttributeError: - log_loading.info( - "'%s' included the Scapy Framework specifier " - "but did not include a scapy_ext" % pkg - ) - continue - try: - scapy_ext_func(ext) - except Exception as ex: - log_loading.warning( - "'%s' failed during initialization with %s" % ( - pkg, - ex - ) + + # Create the extension + ext = ScapyExt() + + # Get the top-level declared "import packages" + # HACK: not available nicely in importlib :/ + packages = distr.read_text("top_level.txt").split() + + for package in packages: + scapy_ext = importlib.import_module(package) + + # We initialize the plugin by calling it's 'scapy_ext' function + try: + scapy_ext_func = scapy_ext.scapy_ext + except AttributeError: + log_loading.warning( + "'%s' does not look like a Scapy plugin !" % extension + ) + return + try: + scapy_ext_func(ext) + except Exception as ex: + log_loading.warning( + "'%s' failed during initialization with %s" % ( + extension, + ex ) - continue - for spec in ext.specs.values(): - self._register_spec(spec) - self.exts.append(ext) - if self not in sys.meta_path: - sys.meta_path.append(self) + ) + return + + # Register all the specs provided by this extension + for spec in ext.specs.values(): + self._register_spec(spec) + + # Add to the extension list + self.exts.append(ext) + + # If there are bash autocompletions, add them + if ext.bash_completions: + from scapy.main import _add_bash_autocompletion + + for name, script in ext.bash_completions.items(): + _add_bash_autocompletion(name, script) + + def loadall(self) -> None: + """ + Load all extensions registered in conf. + """ + for extension in conf.load_extensions: + self.load(extension) def __repr__(self): from scapy.utils import pretty_list @@ -1033,6 +1044,8 @@ class Conf(ConfClass): #: netcache holds time-based caches for net operations netcache: NetCache = NetCache() geoip_city = None + #: Scapy extensions that are loaded automatically on load + load_extensions: List[str] = [] # can, tls, http and a few others are not loaded by default load_layers: List[str] = [ 'bluetooth', @@ -1170,10 +1183,6 @@ def __getattribute__(self, attr): conf = Conf() # type: Conf -# Python 3.8 Only -if sys.version_info >= (3, 8): - conf.exts.load() - def crypto_validator(func): # type: (DecoratorCallable) -> DecoratorCallable diff --git a/scapy/main.py b/scapy/main.py index fea7f1ec302..663d5308b8c 100644 --- a/scapy/main.py +++ b/scapy/main.py @@ -9,20 +9,22 @@ import builtins -import pathlib -import sys -import os -import getopt import code -import gzip +import getopt import glob +import gzip import importlib import io -from itertools import zip_longest import logging +import os +import pathlib import pickle +import shutil +import sys import types import warnings + +from itertools import zip_longest from random import choice # Never add any global import, in main.py, that would trigger a @@ -101,6 +103,15 @@ def _probe_cache_folder(*cf): ) +def _probe_share_folder(*cf): + # type: (str) -> Optional[pathlib.Path] + return _probe_xdg_folder( + "XDG_DATA_HOME", + os.path.join(os.path.expanduser("~"), ".local", "share"), + *cf + ) + + def _check_perms(file: Union[pathlib.Path, str]) -> None: """ Checks that the permissions of a file are properly user-specific, if sudo is used. @@ -203,6 +214,22 @@ def _validate_local(k): DEFAULT_PRESTART_FILE = None DEFAULT_STARTUP_FILE = None +# https://github.com/scop/bash-completion/blob/main/README.md#faq +if "BASH_COMPLETION_USER_DIR" in os.environ: + BASH_COMPLETION_USER_DIR: Optional[pathlib.Path] = pathlib.Path( + os.environ["BASH_COMPLETION_USER_DIR"] + ) +else: + BASH_COMPLETION_USER_DIR = _probe_share_folder("bash-completion") + +if BASH_COMPLETION_USER_DIR: + BASH_COMPLETION_FOLDER: Optional[pathlib.Path] = ( + BASH_COMPLETION_USER_DIR / "completions" + ) +else: + BASH_COMPLETION_FOLDER = None + + # Default scapy prestart.py config file DEFAULT_PRESTART = """ @@ -219,6 +246,12 @@ def _validate_local(k): # disable INFO: tags related to dependencies missing # log_loading.setLevel(logging.WARNING) +# extensions to load by default +conf.load_extensions = [ + # "scapy-red", + # "scapy-rpc", +] + # force-use libpcap # conf.use_pcap = True """.strip() @@ -237,6 +270,31 @@ def _usage(): sys.exit(0) +def _add_bash_autocompletion(fname: str, script: pathlib.Path) -> None: + """ + Util function used most notably in setup.py to add a bash autocompletion script. + """ + try: + if BASH_COMPLETION_FOLDER is None: + raise OSError() + + # If already defined, exit. + dest = BASH_COMPLETION_FOLDER / fname + if dest.exists(): + return + + # Check that bash autocompletion folder exists + if not BASH_COMPLETION_FOLDER.exists(): + BASH_COMPLETION_FOLDER.mkdir(parents=True, exist_ok=True) + _check_perms(BASH_COMPLETION_FOLDER) + + # Copy file + shutil.copy(script, BASH_COMPLETION_FOLDER) + except OSError: + log_loading.warning("Bash autocompletion script could not be copied.", + exc_info=True) + + ###################### # Extension system # ###################### @@ -808,6 +866,10 @@ def interact(mydict=None, _locals=SESSION ) + # Load extensions (Python 3.8 Only) + if sys.version_info >= (3, 8): + conf.exts.loadall() + if conf.fancy_banner: banner_text = get_fancy_banner() else: diff --git a/scapy/utils.py b/scapy/utils.py index f7ecf6d447c..bf84eb80710 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -3891,7 +3891,10 @@ def loop(self, debug: int = 0) -> None: print("Output processor failed with error: %s" % ex) -def AutoArgparse(func: DecoratorCallable) -> None: +def AutoArgparse( + func: DecoratorCallable, + _parseonly: bool = False, +) -> Optional[Tuple[List[str], List[str]]]: """ Generate an Argparse call from a function, then call this function. @@ -3929,17 +3932,15 @@ def AutoArgparse(func: DecoratorCallable) -> None: argsdoc[argparam] = argdesc else: desc = "" - # Now build the argparse.ArgumentParser - parser = argparse.ArgumentParser( - prog=func.__name__, - description=desc, - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) + # Process the parameters positional = [] + noargument = [] + parameters = {} for param in inspect.signature(func).parameters.values(): if not param.annotation: continue + noarg = False parname = param.name paramkwargs = {} if param.annotation is bool: @@ -3948,6 +3949,7 @@ def AutoArgparse(func: DecoratorCallable) -> None: paramkwargs["action"] = "store_false" else: paramkwargs["action"] = "store_true" + noarg = True elif param.annotation in [str, int, float]: paramkwargs["type"] = param.annotation else: @@ -3965,9 +3967,32 @@ def AutoArgparse(func: DecoratorCallable) -> None: paramkwargs["action"] = "append" if param.name in argsdoc: paramkwargs["help"] = argsdoc[param.name] + # Add to the parameter list + parameters[parname] = paramkwargs + if noarg: + noargument.append(parname) + + if _parseonly: + # An internal mode used to generate bash autocompletion, do it then exit. + return ( + [x for x in parameters if x not in positional] + ["--help"], + [x for x in noargument if x not in positional] + ["--help"], + ) + + # Now build the argparse.ArgumentParser + parser = argparse.ArgumentParser( + prog=func.__name__, + description=desc, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Add parameters to parser + for parname, paramkwargs in parameters.items(): parser.add_argument(parname, **paramkwargs) # type: ignore + # Now parse the sys.argv parameters params = vars(parser.parse_args()) + # Act as in interactive mode conf.logLevel = 20 from scapy.themes import DefaultTheme @@ -3984,6 +4009,7 @@ def AutoArgparse(func: DecoratorCallable) -> None: except AssertionError as ex: print("ERROR: " + str(ex)) parser.print_help() + return None #######################