Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions comfy/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __call__(self, parser, namespace, values, option_string=None):
parser.add_argument("--output-directory", type=str, default=None, help="Set the ComfyUI output directory. Overrides --base-directory.")
parser.add_argument("--temp-directory", type=str, default=None, help="Set the ComfyUI temp directory (default is in the ComfyUI directory). Overrides --base-directory.")
parser.add_argument("--input-directory", type=str, default=None, help="Set the ComfyUI input directory. Overrides --base-directory.")
parser.add_argument("--enable-landlock", action="store_true", help="Use the Linux Landlock LSM to restrict filesystem writes to known ComfyUI and cache directories.")
parser.add_argument("--landlock-allow-writable", action="append", default=[], metavar="PATH", help="Extra directories that remain writable when --enable-landlock is set. Can be provided multiple times.")
parser.add_argument("--landlock-allow-readable", action="append", default=[], metavar="PATH", help="Extra directories to allow read access when --enable-landlock is set. Can be provided multiple times.")
parser.add_argument("--auto-launch", action="store_true", help="Automatically launch ComfyUI in the default browser.")
parser.add_argument("--disable-auto-launch", action="store_true", help="Disable auto launching the browser.")
parser.add_argument("--cuda-device", type=int, default=None, metavar="DEVICE_ID", help="Set the id of the cuda device this instance will use. All other devices will not be visible.")
Expand Down
8 changes: 8 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from app.logger import setup_logger
import itertools
import utils.extra_config
import utils.landlock
import logging
import sys
from comfy_execution.progress import get_progress_state
Expand Down Expand Up @@ -311,6 +312,13 @@ def start_comfyui(asyncio_loop=None):
folder_paths.set_temp_directory(temp_dir)
cleanup_temp()

if args.enable_landlock:
logging.info("Enabling Landlock")
landlock_ok = utils.landlock.enable_landlock(args, logging.getLogger("landlock"))
if not landlock_ok:
logging.critical("Requested Landlock sandbox but it could not be enabled. Exiting.")
sys.exit(1)

if args.windows_standalone_build:
try:
import new_updater
Expand Down
332 changes: 332 additions & 0 deletions utils/landlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
import ctypes
import errno
import logging
import os
import sys
import tempfile
from dataclasses import dataclass

# Landlock constants copied from linux/landlock.h
PR_SET_NO_NEW_PRIVS = 38

LANDLOCK_RULE_PATH_BENEATH = 1
LANDLOCK_CREATE_RULESET_VERSION = 1

LANDLOCK_ACCESS_FS_EXECUTE = 1 << 0
LANDLOCK_ACCESS_FS_WRITE_FILE = 1 << 1
LANDLOCK_ACCESS_FS_READ_FILE = 1 << 2
LANDLOCK_ACCESS_FS_READ_DIR = 1 << 3
LANDLOCK_ACCESS_FS_REMOVE_DIR = 1 << 4
LANDLOCK_ACCESS_FS_REMOVE_FILE = 1 << 5
LANDLOCK_ACCESS_FS_MAKE_CHAR = 1 << 6
LANDLOCK_ACCESS_FS_MAKE_DIR = 1 << 7
LANDLOCK_ACCESS_FS_MAKE_REG = 1 << 8
LANDLOCK_ACCESS_FS_MAKE_SOCK = 1 << 9
LANDLOCK_ACCESS_FS_MAKE_FIFO = 1 << 10
LANDLOCK_ACCESS_FS_MAKE_BLOCK = 1 << 11
LANDLOCK_ACCESS_FS_MAKE_SYM = 1 << 12
LANDLOCK_ACCESS_FS_REFER = 1 << 13
LANDLOCK_ACCESS_FS_TRUNCATE = 1 << 14
LANDLOCK_ACCESS_FS_IOCTL_DEV = 1 << 15 # ABI v5+

# Pre-computed access masks
FS_READ_ACCESS = (
LANDLOCK_ACCESS_FS_READ_FILE
| LANDLOCK_ACCESS_FS_READ_DIR
| LANDLOCK_ACCESS_FS_EXECUTE
)
FS_WRITE_ACCESS = (
FS_READ_ACCESS
| LANDLOCK_ACCESS_FS_WRITE_FILE
| LANDLOCK_ACCESS_FS_MAKE_DIR
| LANDLOCK_ACCESS_FS_MAKE_REG
| LANDLOCK_ACCESS_FS_MAKE_SOCK
| LANDLOCK_ACCESS_FS_MAKE_FIFO
| LANDLOCK_ACCESS_FS_MAKE_BLOCK
| LANDLOCK_ACCESS_FS_MAKE_CHAR
| LANDLOCK_ACCESS_FS_MAKE_SYM
| LANDLOCK_ACCESS_FS_REMOVE_DIR
| LANDLOCK_ACCESS_FS_REMOVE_FILE
)

# Syscall numbers are ABI-stable across all 64-bit Linux architectures
SYS_LANDLOCK_CREATE_RULESET = 444
SYS_LANDLOCK_ADD_RULE = 445
SYS_LANDLOCK_RESTRICT_SELF = 446


class _RulesetAttr(ctypes.Structure):
_fields_ = [("handled_access_fs", ctypes.c_uint64)]


class _PathBeneathAttr(ctypes.Structure):
_fields_ = [
("allowed_access", ctypes.c_uint64),
("parent_fd", ctypes.c_int32),
("reserved", ctypes.c_uint32),
]


@dataclass(frozen=True)
class LandlockRules:
read_paths: set[str]
write_paths: set[str]
ioctl_paths: set[str]


def _normalize_paths(paths: set[str]) -> set[str]:
normalized = set()
for path in paths:
if not path:
continue
normalized.add(os.path.realpath(path))
return normalized


class LandlockEnforcer:
def __init__(self, logger: logging.Logger | None = None):
self.log = logger or logging.getLogger(__name__)
self.libc = ctypes.CDLL(None, use_errno=True)
self.libc.syscall.restype = ctypes.c_long
self.libc.prctl.restype = ctypes.c_int

def _syscall(self, syscall_nr, *args) -> tuple[int | None, int]:
ctypes.set_errno(0)
res = self.libc.syscall(ctypes.c_long(syscall_nr), *args)
if res == -1:
return None, ctypes.get_errno()
return res, 0

def _abi_version(self) -> int:
res, err = self._syscall(
SYS_LANDLOCK_CREATE_RULESET,
ctypes.c_void_p(0),
ctypes.c_size_t(0),
ctypes.c_uint(LANDLOCK_CREATE_RULESET_VERSION),
)
if res is None:
if err in (errno.ENOSYS, errno.EOPNOTSUPP):
return 0
return -err
return res

def _create_ruleset(self, handled_access: int) -> tuple[int | None, int]:
ruleset = _RulesetAttr(ctypes.c_uint64(handled_access))
return self._syscall(
SYS_LANDLOCK_CREATE_RULESET,
ctypes.byref(ruleset),
ctypes.c_size_t(ctypes.sizeof(ruleset)),
ctypes.c_uint(0),
)

def _add_rule(self, ruleset_fd: int, path: str, access_mask: int, allow_ioctl: bool) -> bool:
if allow_ioctl:
access_mask |= LANDLOCK_ACCESS_FS_IOCTL_DEV

try:
dir_fd = os.open(path, os.O_PATH | os.O_CLOEXEC)
except OSError as exc:
self.log.warning("Landlock: skipping %s (%s)", path, exc)
return False

try:
rule = _PathBeneathAttr(
ctypes.c_uint64(access_mask), ctypes.c_int32(dir_fd), ctypes.c_uint32(0)
)
res, err = self._syscall(
SYS_LANDLOCK_ADD_RULE,
ctypes.c_int(ruleset_fd),
ctypes.c_int(LANDLOCK_RULE_PATH_BENEATH),
ctypes.byref(rule),
ctypes.c_uint(0),
)
if res is None:
self.log.warning("Landlock: failed to add %s (errno=%s)", path, err)
return False
return True
finally:
os.close(dir_fd)

def _restrict_self(self, ruleset_fd: int) -> bool:
if self.libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0:
self.log.warning(
"Landlock: prctl(PR_SET_NO_NEW_PRIVS) failed (errno=%s)",
ctypes.get_errno(),
)
return False

res, err = self._syscall(SYS_LANDLOCK_RESTRICT_SELF, ctypes.c_int(ruleset_fd), ctypes.c_uint(0))
if res is None:
self.log.warning("Landlock: restrict_self failed (errno=%s)", err)
return False
return True

def apply(self, rules: LandlockRules) -> bool:
if not sys.platform.startswith("linux"):
self.log.info("Landlock: not a Linux platform, skipping.")
return False

abi_version = self._abi_version()
if abi_version <= 0:
self.log.info("Landlock: not available on this kernel (abi=%s).", abi_version)
return False

read_access = FS_READ_ACCESS
handled_write_access = FS_WRITE_ACCESS
allowed_write_access = (
read_access
| LANDLOCK_ACCESS_FS_WRITE_FILE
| LANDLOCK_ACCESS_FS_MAKE_DIR
| LANDLOCK_ACCESS_FS_MAKE_REG
| LANDLOCK_ACCESS_FS_REMOVE_DIR
| LANDLOCK_ACCESS_FS_REMOVE_FILE
) # leave other handled rights (symlinks, device nodes, sockets) denied
if abi_version >= 2:
handled_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE
allowed_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE
if abi_version >= 3:
handled_write_access |= LANDLOCK_ACCESS_FS_REFER
allowed_write_access |= LANDLOCK_ACCESS_FS_REFER

handled_access = handled_write_access | read_access
ioctl_supported = abi_version >= 5
if ioctl_supported:
handled_access |= LANDLOCK_ACCESS_FS_IOCTL_DEV

write_paths = _normalize_paths(rules.write_paths)
read_paths = _normalize_paths(rules.read_paths) - write_paths
# In theory these could require write or read access. Though in practice it's just /dev.
ioctl_paths = _normalize_paths(rules.ioctl_paths)

if ioctl_paths and not ioctl_supported:
self.log.info(
"Landlock: ioctl access requested but ABI %s has no support; continuing without ioctl.",
abi_version,
)

ruleset_fd = None
ruleset_fd, err = self._create_ruleset(handled_access)
if ruleset_fd is None:
self.log.warning("Landlock: failed to create ruleset (errno=%s)", err)
return False

try:
for path in write_paths:
if path != os.path.sep:
try:
os.makedirs(path, exist_ok=True)
except Exception as exc:
self.log.warning("Landlock: unable to prepare %s (%s)", path, exc)
return False
if not self._add_rule(
ruleset_fd, path, allowed_write_access, ioctl_supported and path in ioctl_paths
):
return False

for path in read_paths:
self._add_rule(ruleset_fd, path, read_access, ioctl_supported and path in ioctl_paths)

if not self._restrict_self(ruleset_fd):
return False
finally:
if ruleset_fd is not None:
os.close(ruleset_fd)

if write_paths:
self.log.info(
"Landlock enabled (ABI %s). Writable roots: %s",
abi_version,
", ".join(sorted(write_paths)),
)
else:
self.log.info("Landlock enabled (ABI %s). No writable roots configured.", abi_version)
return True


_landlock_applied = False


def build_default_rules(args) -> LandlockRules:
import folder_paths
from urllib.parse import urlparse

write_paths: set[str] = {
folder_paths.get_output_directory(),
folder_paths.get_input_directory(),
folder_paths.get_temp_directory(),
folder_paths.get_user_directory(),
}

ioctl_paths: set[str] = set()

# Torch and some backends use system temp and /dev/shm
write_paths.add(tempfile.gettempdir())

if args.temp_directory:
write_paths.add(os.path.join(os.path.abspath(args.temp_directory), "temp"))

db_url = getattr(args, "database_url", None)
if db_url and db_url.startswith("sqlite"):
parsed = urlparse(db_url)
if parsed.scheme == "sqlite" and parsed.path:
write_paths.add(os.path.abspath(os.path.dirname(parsed.path)))

for path in args.landlock_allow_writable or []:
if path:
write_paths.add(path)

# Build read paths - only what's actually needed
read_paths: set[str] = set()

# ComfyUI codebase
read_paths.add(folder_paths.base_path)

# All configured model directories (includes extra_model_paths.yaml)
for folder_name in folder_paths.folder_names_and_paths:
for path in folder_paths.folder_names_and_paths[folder_name][0]:
read_paths.add(path)

# Python installation and site-packages
read_paths.add(sys.prefix)
if sys.base_prefix != sys.prefix:
read_paths.add(sys.base_prefix)
for path in sys.path:
if path and os.path.isdir(path):
read_paths.add(path)

# System libraries (required for shared libs, CUDA, etc.)
for system_path in ["/usr", "/lib", "/lib64", "/opt", "/etc", "/proc", "/sys"]:
if os.path.exists(system_path):
read_paths.add(system_path)

# NixOS: /nix/store contains the entire system
if os.path.exists("/nix"):
read_paths.add("/nix")

# /dev needs write + ioctl for CUDA/GPU access
write_paths.add("/dev")
ioctl_paths.add("/dev")

# User-specified additional read paths
for path in getattr(args, "landlock_allow_readable", None) or []:
if path:
read_paths.add(path)

return LandlockRules(read_paths=_normalize_paths(read_paths), write_paths=_normalize_paths(write_paths), ioctl_paths=_normalize_paths(ioctl_paths))


def enable_landlock(args, logger: logging.Logger | None = None) -> bool:
global _landlock_applied

if _landlock_applied:
return True
if not getattr(args, "enable_landlock", False):
return False

enforcer = LandlockEnforcer(logger)
try:
_landlock_applied = enforcer.apply(build_default_rules(args))
except Exception:
enforcer.log.exception("Landlock: unexpected failure while applying ruleset.")
_landlock_applied = False
return _landlock_applied
Loading