Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 83 additions & 74 deletions scapy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
Implementation of the configuration object.
"""


import atexit
import copy
import functools
import os
import pathlib
import re
import socket
import sys
Expand Down Expand Up @@ -538,7 +538,7 @@ def __repr__(self):


class ScapyExt:
__slots__ = ["specs", "name", "version"]
__slots__ = ["specs", "name", "version", "bash_completions"]

class MODE(Enum):
LAYERS = "layers"
Expand All @@ -554,6 +554,7 @@ class ScapyExtSpec:

def __init__(self):
self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
self.bash_completions = {}

def config(self, name, version):
self.name = name
Expand All @@ -576,6 +577,9 @@ def register(self, name, mode, path, default=None):
spec.default = bool(importlib.util.find_spec(spec.fullname))
self.specs[fullname] = spec

def register_bashcompletion(self, script: pathlib.Path):
self.bash_completions[script.name] = script

def __repr__(self):
return "<ScapyExt %s %s (%s specs)>" % (
self.name,
Expand All @@ -585,18 +589,19 @@ def __repr__(self):


class ExtsManager(importlib.abc.MetaPathFinder):
__slots__ = ["exts", "_loaded", "all_specs"]
__slots__ = ["exts", "all_specs"]

SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy'
GPLV2_CLASSIFIERS = [
'License :: OSI Approved :: GNU General Public License v2 (GPLv2)',
'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
GPLV2_LICENCES = [
"GPL-2.0-only",
"GPL-2.0-or-later",
]

def __init__(self):
self.exts: List[ScapyExt] = []
self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
self._loaded = []
# Add to meta_path as we are an import provider
if self not in sys.meta_path:
sys.meta_path.append(self)

def find_spec(self, fullname, path, target=None):
if fullname in self.all_specs:
Expand All @@ -606,81 +611,87 @@ def invalidate_caches(self):
pass

def _register_spec(self, spec):
# Register to known specs
self.all_specs[spec.fullname] = spec

# If default=True, inject it in the currently loaded modules
if spec.default:
loader = importlib.util.LazyLoader(spec.spec.loader)
spec.spec.loader = loader
module = importlib.util.module_from_spec(spec.spec)
sys.modules[spec.fullname] = module
loader.exec_module(module)

def load(self):
def load(self, extension: str):
"""
Load a scapy extension.

:param extension: the name of the extension, as installed.
"""
try:
import importlib.metadata
except ImportError:
raise ImportError("Cannot import importlib.metadata ! Upgrade Python.")

# Get extension distribution
distr = importlib.metadata.distribution(extension)

# Check the classifiers
if distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES:
log_loading.warning(
"'%s' has no GPLv2 classifier therefore cannot be loaded." % extension
)
return
for distr in importlib.metadata.distributions():
if any(
v == self.SCAPY_PLUGIN_CLASSIFIER
for k, v in distr.metadata.items() if k == 'Classifier'
):
try:
# Python 3.13 raises an internal warning when calling this
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
pkg = next(
k
for k, v in
importlib.metadata.packages_distributions().items()
if distr.name in v
)
except KeyError:
pkg = distr.name
if pkg in self._loaded:
continue
if not any(
v in self.GPLV2_CLASSIFIERS
for k, v in distr.metadata.items() if k == 'Classifier'
):
log_loading.warning(
"'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501
)
continue
self._loaded.append(pkg)
ext = ScapyExt()
try:
scapy_ext = importlib.import_module(pkg)
except Exception as ex:
log_loading.warning(
"'%s' failed during import with %s" % (
pkg,
ex
)
)
continue
try:
scapy_ext_func = scapy_ext.scapy_ext
except AttributeError:
log_loading.info(
"'%s' included the Scapy Framework specifier "
"but did not include a scapy_ext" % pkg
)
continue
try:
scapy_ext_func(ext)
except Exception as ex:
log_loading.warning(
"'%s' failed during initialization with %s" % (
pkg,
ex
)

# Create the extension
ext = ScapyExt()

# Get the top-level declared "import packages"
# HACK: not available nicely in importlib :/
packages = distr.read_text("top_level.txt").split()

for package in packages:
scapy_ext = importlib.import_module(package)

# We initialize the plugin by calling it's 'scapy_ext' function
try:
scapy_ext_func = scapy_ext.scapy_ext
except AttributeError:
log_loading.warning(
"'%s' does not look like a Scapy plugin !" % extension
)
return
try:
scapy_ext_func(ext)
except Exception as ex:
log_loading.warning(
"'%s' failed during initialization with %s" % (
extension,
ex
)
continue
for spec in ext.specs.values():
self._register_spec(spec)
self.exts.append(ext)
if self not in sys.meta_path:
sys.meta_path.append(self)
)
return

# Register all the specs provided by this extension
for spec in ext.specs.values():
self._register_spec(spec)

# Add to the extension list
self.exts.append(ext)

# If there are bash autocompletions, add them
if ext.bash_completions:
from scapy.main import _add_bash_autocompletion

for name, script in ext.bash_completions.items():
_add_bash_autocompletion(name, script)

def loadall(self) -> None:
"""
Load all extensions registered in conf.
"""
for extension in conf.load_extensions:
self.load(extension)

def __repr__(self):
from scapy.utils import pretty_list
Expand Down Expand Up @@ -1033,6 +1044,8 @@ class Conf(ConfClass):
#: netcache holds time-based caches for net operations
netcache: NetCache = NetCache()
geoip_city = None
#: Scapy extensions that are loaded automatically on load
load_extensions: List[str] = []
# can, tls, http and a few others are not loaded by default
load_layers: List[str] = [
'bluetooth',
Expand Down Expand Up @@ -1170,10 +1183,6 @@ def __getattribute__(self, attr):

conf = Conf() # type: Conf

# Python 3.8 Only
if sys.version_info >= (3, 8):
conf.exts.load()


def crypto_validator(func):
# type: (DecoratorCallable) -> DecoratorCallable
Expand Down
74 changes: 68 additions & 6 deletions scapy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@


import builtins
import pathlib
import sys
import os
import getopt
import code
import gzip
import getopt
import glob
import gzip
import importlib
import io
from itertools import zip_longest
import logging
import os
import pathlib
import pickle
import shutil
import sys
import types
import warnings

from itertools import zip_longest
from random import choice

# Never add any global import, in main.py, that would trigger a
Expand Down Expand Up @@ -101,6 +103,15 @@ def _probe_cache_folder(*cf):
)


def _probe_share_folder(*cf):
# type: (str) -> Optional[pathlib.Path]
return _probe_xdg_folder(
"XDG_DATA_HOME",
os.path.join(os.path.expanduser("~"), ".local", "share"),
*cf
)


def _check_perms(file: Union[pathlib.Path, str]) -> None:
"""
Checks that the permissions of a file are properly user-specific, if sudo is used.
Expand Down Expand Up @@ -203,6 +214,22 @@ def _validate_local(k):
DEFAULT_PRESTART_FILE = None
DEFAULT_STARTUP_FILE = None

# https://github.com/scop/bash-completion/blob/main/README.md#faq
if "BASH_COMPLETION_USER_DIR" in os.environ:
BASH_COMPLETION_USER_DIR: Optional[pathlib.Path] = pathlib.Path(
os.environ["BASH_COMPLETION_USER_DIR"]
)
else:
BASH_COMPLETION_USER_DIR = _probe_share_folder("bash-completion")

if BASH_COMPLETION_USER_DIR:
BASH_COMPLETION_FOLDER: Optional[pathlib.Path] = (
BASH_COMPLETION_USER_DIR / "completions"
)
else:
BASH_COMPLETION_FOLDER = None


# Default scapy prestart.py config file

DEFAULT_PRESTART = """
Expand All @@ -219,6 +246,12 @@ def _validate_local(k):
# disable INFO: tags related to dependencies missing
# log_loading.setLevel(logging.WARNING)

# extensions to load by default
conf.load_extensions = [
# "scapy-red",
# "scapy-rpc",
]

# force-use libpcap
# conf.use_pcap = True
""".strip()
Expand All @@ -237,6 +270,31 @@ def _usage():
sys.exit(0)


def _add_bash_autocompletion(fname: str, script: pathlib.Path) -> None:
"""
Util function used most notably in setup.py to add a bash autocompletion script.
"""
try:
if BASH_COMPLETION_FOLDER is None:
raise OSError()

# If already defined, exit.
dest = BASH_COMPLETION_FOLDER / fname
if dest.exists():
return

# Check that bash autocompletion folder exists
if not BASH_COMPLETION_FOLDER.exists():
BASH_COMPLETION_FOLDER.mkdir(parents=True, exist_ok=True)
_check_perms(BASH_COMPLETION_FOLDER)

# Copy file
shutil.copy(script, BASH_COMPLETION_FOLDER)
except OSError:
log_loading.warning("Bash autocompletion script could not be copied.",
exc_info=True)


######################
# Extension system #
######################
Expand Down Expand Up @@ -808,6 +866,10 @@ def interact(mydict=None,
_locals=SESSION
)

# Load extensions (Python 3.8 Only)
if sys.version_info >= (3, 8):
conf.exts.loadall()

if conf.fancy_banner:
banner_text = get_fancy_banner()
else:
Expand Down
Loading