From ce445263e6b6e0c3cf8ddbc7ce9147cdc771af18 Mon Sep 17 00:00:00 2001 From: Svein Ove Aas Date: Sat, 25 Oct 2025 03:55:50 +0100 Subject: [PATCH] feat: Add Landlock LSM sandbox for filesystem isolation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Linux Landlock sandboxing to restrict filesystem access when ComfyUI is running. This provides defense-in-depth against malicious custom nodes or workflows that attempt to access sensitive files. How it works: - Uses Linux Landlock LSM (kernel 5.13+) via direct syscalls - Restricts write access to specific directories (output, input, temp, user) - Restricts read access to only what's needed (codebase, models, system libs) - Handles ABI versions 1-5, including IOCTL_DEV for GPU access on v5+ - Exits with error if --enable-landlock is set but Landlock unavailable Write access granted to: - ComfyUI output, input, temp, and user directories - System temp directory (for torch/backends) - SQLite database directory (if configured) - Paths specified via --landlock-allow-writable Read access granted to: - ComfyUI codebase directory - All configured model directories (including extra_model_paths.yaml) - Python installation and site-packages - System libraries (/usr, /lib, /lib64, /opt, /etc, /proc, /sys) - /nix (on NixOS systems) - /dev (with ioctl for GPU access) - Paths specified via --landlock-allow-readable Usage: python main.py --enable-landlock python main.py --enable-landlock --landlock-allow-writable /extra/dir python main.py --enable-landlock --landlock-allow-readable ~/.cache/huggingface Requirements: - Linux with kernel 5.13+ (fails with error on unsupported systems) - Once enabled, restrictions cannot be lifted for the process lifetime - Network access is not restricted (Landlock FS only) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- comfy/cli_args.py | 3 + main.py | 8 ++ utils/landlock.py | 332 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 utils/landlock.py diff --git a/comfy/cli_args.py b/comfy/cli_args.py index 5f0dfaa10799..d6c64ea05d76 100644 --- a/comfy/cli_args.py +++ b/comfy/cli_args.py @@ -47,6 +47,9 @@ def __call__(self, parser, namespace, values, option_string=None): parser.add_argument("--output-directory", type=str, default=None, help="Set the ComfyUI output directory. Overrides --base-directory.") parser.add_argument("--temp-directory", type=str, default=None, help="Set the ComfyUI temp directory (default is in the ComfyUI directory). Overrides --base-directory.") parser.add_argument("--input-directory", type=str, default=None, help="Set the ComfyUI input directory. Overrides --base-directory.") +parser.add_argument("--enable-landlock", action="store_true", help="Use the Linux Landlock LSM to restrict filesystem writes to known ComfyUI and cache directories.") +parser.add_argument("--landlock-allow-writable", action="append", default=[], metavar="PATH", help="Extra directories that remain writable when --enable-landlock is set. Can be provided multiple times.") +parser.add_argument("--landlock-allow-readable", action="append", default=[], metavar="PATH", help="Extra directories to allow read access when --enable-landlock is set. Can be provided multiple times.") parser.add_argument("--auto-launch", action="store_true", help="Automatically launch ComfyUI in the default browser.") parser.add_argument("--disable-auto-launch", action="store_true", help="Disable auto launching the browser.") parser.add_argument("--cuda-device", type=int, default=None, metavar="DEVICE_ID", help="Set the id of the cuda device this instance will use. All other devices will not be visible.") diff --git a/main.py b/main.py index e1b0f1620bbc..48775676a496 100644 --- a/main.py +++ b/main.py @@ -9,6 +9,7 @@ from app.logger import setup_logger import itertools import utils.extra_config +import utils.landlock import logging import sys from comfy_execution.progress import get_progress_state @@ -311,6 +312,13 @@ def start_comfyui(asyncio_loop=None): folder_paths.set_temp_directory(temp_dir) cleanup_temp() + if args.enable_landlock: + logging.info("Enabling Landlock") + landlock_ok = utils.landlock.enable_landlock(args, logging.getLogger("landlock")) + if not landlock_ok: + logging.critical("Requested Landlock sandbox but it could not be enabled. Exiting.") + sys.exit(1) + if args.windows_standalone_build: try: import new_updater diff --git a/utils/landlock.py b/utils/landlock.py new file mode 100644 index 000000000000..1a2a7df15f65 --- /dev/null +++ b/utils/landlock.py @@ -0,0 +1,332 @@ +import ctypes +import errno +import logging +import os +import sys +import tempfile +from dataclasses import dataclass + +# Landlock constants copied from linux/landlock.h +PR_SET_NO_NEW_PRIVS = 38 + +LANDLOCK_RULE_PATH_BENEATH = 1 +LANDLOCK_CREATE_RULESET_VERSION = 1 + +LANDLOCK_ACCESS_FS_EXECUTE = 1 << 0 +LANDLOCK_ACCESS_FS_WRITE_FILE = 1 << 1 +LANDLOCK_ACCESS_FS_READ_FILE = 1 << 2 +LANDLOCK_ACCESS_FS_READ_DIR = 1 << 3 +LANDLOCK_ACCESS_FS_REMOVE_DIR = 1 << 4 +LANDLOCK_ACCESS_FS_REMOVE_FILE = 1 << 5 +LANDLOCK_ACCESS_FS_MAKE_CHAR = 1 << 6 +LANDLOCK_ACCESS_FS_MAKE_DIR = 1 << 7 +LANDLOCK_ACCESS_FS_MAKE_REG = 1 << 8 +LANDLOCK_ACCESS_FS_MAKE_SOCK = 1 << 9 +LANDLOCK_ACCESS_FS_MAKE_FIFO = 1 << 10 +LANDLOCK_ACCESS_FS_MAKE_BLOCK = 1 << 11 +LANDLOCK_ACCESS_FS_MAKE_SYM = 1 << 12 +LANDLOCK_ACCESS_FS_REFER = 1 << 13 +LANDLOCK_ACCESS_FS_TRUNCATE = 1 << 14 +LANDLOCK_ACCESS_FS_IOCTL_DEV = 1 << 15 # ABI v5+ + +# Pre-computed access masks +FS_READ_ACCESS = ( + LANDLOCK_ACCESS_FS_READ_FILE + | LANDLOCK_ACCESS_FS_READ_DIR + | LANDLOCK_ACCESS_FS_EXECUTE +) +FS_WRITE_ACCESS = ( + FS_READ_ACCESS + | LANDLOCK_ACCESS_FS_WRITE_FILE + | LANDLOCK_ACCESS_FS_MAKE_DIR + | LANDLOCK_ACCESS_FS_MAKE_REG + | LANDLOCK_ACCESS_FS_MAKE_SOCK + | LANDLOCK_ACCESS_FS_MAKE_FIFO + | LANDLOCK_ACCESS_FS_MAKE_BLOCK + | LANDLOCK_ACCESS_FS_MAKE_CHAR + | LANDLOCK_ACCESS_FS_MAKE_SYM + | LANDLOCK_ACCESS_FS_REMOVE_DIR + | LANDLOCK_ACCESS_FS_REMOVE_FILE +) + +# Syscall numbers are ABI-stable across all 64-bit Linux architectures +SYS_LANDLOCK_CREATE_RULESET = 444 +SYS_LANDLOCK_ADD_RULE = 445 +SYS_LANDLOCK_RESTRICT_SELF = 446 + + +class _RulesetAttr(ctypes.Structure): + _fields_ = [("handled_access_fs", ctypes.c_uint64)] + + +class _PathBeneathAttr(ctypes.Structure): + _fields_ = [ + ("allowed_access", ctypes.c_uint64), + ("parent_fd", ctypes.c_int32), + ("reserved", ctypes.c_uint32), + ] + + +@dataclass(frozen=True) +class LandlockRules: + read_paths: set[str] + write_paths: set[str] + ioctl_paths: set[str] + + +def _normalize_paths(paths: set[str]) -> set[str]: + normalized = set() + for path in paths: + if not path: + continue + normalized.add(os.path.realpath(path)) + return normalized + + +class LandlockEnforcer: + def __init__(self, logger: logging.Logger | None = None): + self.log = logger or logging.getLogger(__name__) + self.libc = ctypes.CDLL(None, use_errno=True) + self.libc.syscall.restype = ctypes.c_long + self.libc.prctl.restype = ctypes.c_int + + def _syscall(self, syscall_nr, *args) -> tuple[int | None, int]: + ctypes.set_errno(0) + res = self.libc.syscall(ctypes.c_long(syscall_nr), *args) + if res == -1: + return None, ctypes.get_errno() + return res, 0 + + def _abi_version(self) -> int: + res, err = self._syscall( + SYS_LANDLOCK_CREATE_RULESET, + ctypes.c_void_p(0), + ctypes.c_size_t(0), + ctypes.c_uint(LANDLOCK_CREATE_RULESET_VERSION), + ) + if res is None: + if err in (errno.ENOSYS, errno.EOPNOTSUPP): + return 0 + return -err + return res + + def _create_ruleset(self, handled_access: int) -> tuple[int | None, int]: + ruleset = _RulesetAttr(ctypes.c_uint64(handled_access)) + return self._syscall( + SYS_LANDLOCK_CREATE_RULESET, + ctypes.byref(ruleset), + ctypes.c_size_t(ctypes.sizeof(ruleset)), + ctypes.c_uint(0), + ) + + def _add_rule(self, ruleset_fd: int, path: str, access_mask: int, allow_ioctl: bool) -> bool: + if allow_ioctl: + access_mask |= LANDLOCK_ACCESS_FS_IOCTL_DEV + + try: + dir_fd = os.open(path, os.O_PATH | os.O_CLOEXEC) + except OSError as exc: + self.log.warning("Landlock: skipping %s (%s)", path, exc) + return False + + try: + rule = _PathBeneathAttr( + ctypes.c_uint64(access_mask), ctypes.c_int32(dir_fd), ctypes.c_uint32(0) + ) + res, err = self._syscall( + SYS_LANDLOCK_ADD_RULE, + ctypes.c_int(ruleset_fd), + ctypes.c_int(LANDLOCK_RULE_PATH_BENEATH), + ctypes.byref(rule), + ctypes.c_uint(0), + ) + if res is None: + self.log.warning("Landlock: failed to add %s (errno=%s)", path, err) + return False + return True + finally: + os.close(dir_fd) + + def _restrict_self(self, ruleset_fd: int) -> bool: + if self.libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0: + self.log.warning( + "Landlock: prctl(PR_SET_NO_NEW_PRIVS) failed (errno=%s)", + ctypes.get_errno(), + ) + return False + + res, err = self._syscall(SYS_LANDLOCK_RESTRICT_SELF, ctypes.c_int(ruleset_fd), ctypes.c_uint(0)) + if res is None: + self.log.warning("Landlock: restrict_self failed (errno=%s)", err) + return False + return True + + def apply(self, rules: LandlockRules) -> bool: + if not sys.platform.startswith("linux"): + self.log.info("Landlock: not a Linux platform, skipping.") + return False + + abi_version = self._abi_version() + if abi_version <= 0: + self.log.info("Landlock: not available on this kernel (abi=%s).", abi_version) + return False + + read_access = FS_READ_ACCESS + handled_write_access = FS_WRITE_ACCESS + allowed_write_access = ( + read_access + | LANDLOCK_ACCESS_FS_WRITE_FILE + | LANDLOCK_ACCESS_FS_MAKE_DIR + | LANDLOCK_ACCESS_FS_MAKE_REG + | LANDLOCK_ACCESS_FS_REMOVE_DIR + | LANDLOCK_ACCESS_FS_REMOVE_FILE + ) # leave other handled rights (symlinks, device nodes, sockets) denied + if abi_version >= 2: + handled_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE + allowed_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE + if abi_version >= 3: + handled_write_access |= LANDLOCK_ACCESS_FS_REFER + allowed_write_access |= LANDLOCK_ACCESS_FS_REFER + + handled_access = handled_write_access | read_access + ioctl_supported = abi_version >= 5 + if ioctl_supported: + handled_access |= LANDLOCK_ACCESS_FS_IOCTL_DEV + + write_paths = _normalize_paths(rules.write_paths) + read_paths = _normalize_paths(rules.read_paths) - write_paths + # In theory these could require write or read access. Though in practice it's just /dev. + ioctl_paths = _normalize_paths(rules.ioctl_paths) + + if ioctl_paths and not ioctl_supported: + self.log.info( + "Landlock: ioctl access requested but ABI %s has no support; continuing without ioctl.", + abi_version, + ) + + ruleset_fd = None + ruleset_fd, err = self._create_ruleset(handled_access) + if ruleset_fd is None: + self.log.warning("Landlock: failed to create ruleset (errno=%s)", err) + return False + + try: + for path in write_paths: + if path != os.path.sep: + try: + os.makedirs(path, exist_ok=True) + except Exception as exc: + self.log.warning("Landlock: unable to prepare %s (%s)", path, exc) + return False + if not self._add_rule( + ruleset_fd, path, allowed_write_access, ioctl_supported and path in ioctl_paths + ): + return False + + for path in read_paths: + self._add_rule(ruleset_fd, path, read_access, ioctl_supported and path in ioctl_paths) + + if not self._restrict_self(ruleset_fd): + return False + finally: + if ruleset_fd is not None: + os.close(ruleset_fd) + + if write_paths: + self.log.info( + "Landlock enabled (ABI %s). Writable roots: %s", + abi_version, + ", ".join(sorted(write_paths)), + ) + else: + self.log.info("Landlock enabled (ABI %s). No writable roots configured.", abi_version) + return True + + +_landlock_applied = False + + +def build_default_rules(args) -> LandlockRules: + import folder_paths + from urllib.parse import urlparse + + write_paths: set[str] = { + folder_paths.get_output_directory(), + folder_paths.get_input_directory(), + folder_paths.get_temp_directory(), + folder_paths.get_user_directory(), + } + + ioctl_paths: set[str] = set() + + # Torch and some backends use system temp and /dev/shm + write_paths.add(tempfile.gettempdir()) + + if args.temp_directory: + write_paths.add(os.path.join(os.path.abspath(args.temp_directory), "temp")) + + db_url = getattr(args, "database_url", None) + if db_url and db_url.startswith("sqlite"): + parsed = urlparse(db_url) + if parsed.scheme == "sqlite" and parsed.path: + write_paths.add(os.path.abspath(os.path.dirname(parsed.path))) + + for path in args.landlock_allow_writable or []: + if path: + write_paths.add(path) + + # Build read paths - only what's actually needed + read_paths: set[str] = set() + + # ComfyUI codebase + read_paths.add(folder_paths.base_path) + + # All configured model directories (includes extra_model_paths.yaml) + for folder_name in folder_paths.folder_names_and_paths: + for path in folder_paths.folder_names_and_paths[folder_name][0]: + read_paths.add(path) + + # Python installation and site-packages + read_paths.add(sys.prefix) + if sys.base_prefix != sys.prefix: + read_paths.add(sys.base_prefix) + for path in sys.path: + if path and os.path.isdir(path): + read_paths.add(path) + + # System libraries (required for shared libs, CUDA, etc.) + for system_path in ["/usr", "/lib", "/lib64", "/opt", "/etc", "/proc", "/sys"]: + if os.path.exists(system_path): + read_paths.add(system_path) + + # NixOS: /nix/store contains the entire system + if os.path.exists("/nix"): + read_paths.add("/nix") + + # /dev needs write + ioctl for CUDA/GPU access + write_paths.add("/dev") + ioctl_paths.add("/dev") + + # User-specified additional read paths + for path in getattr(args, "landlock_allow_readable", None) or []: + if path: + read_paths.add(path) + + return LandlockRules(read_paths=_normalize_paths(read_paths), write_paths=_normalize_paths(write_paths), ioctl_paths=_normalize_paths(ioctl_paths)) + + +def enable_landlock(args, logger: logging.Logger | None = None) -> bool: + global _landlock_applied + + if _landlock_applied: + return True + if not getattr(args, "enable_landlock", False): + return False + + enforcer = LandlockEnforcer(logger) + try: + _landlock_applied = enforcer.apply(build_default_rules(args)) + except Exception: + enforcer.log.exception("Landlock: unexpected failure while applying ruleset.") + _landlock_applied = False + return _landlock_applied