Skip to content

Commit 2c8e1e8

Browse files
Baughnclaude
andcommitted
feat: Add Landlock LSM sandbox for filesystem isolation
Implements Linux Landlock sandboxing to restrict filesystem access when ComfyUI is running. This provides defense-in-depth against malicious custom nodes or workflows that attempt to access sensitive files. How it works: - Uses Linux Landlock LSM (kernel 5.13+) via direct syscalls - Restricts write access to specific directories (output, input, temp, user) - Restricts read access to only what's needed (codebase, models, system libs) - Handles ABI versions 1-5, including IOCTL_DEV for GPU access on v5+ - Exits with error if --enable-landlock is set but Landlock unavailable Write access granted to: - ComfyUI output, input, temp, and user directories - System temp directory (for torch/backends) - SQLite database directory (if configured) - Paths specified via --landlock-allow-writable Read access granted to: - ComfyUI codebase directory - All configured model directories (including extra_model_paths.yaml) - Python installation and site-packages - System libraries (/usr, /lib, /lib64, /opt, /etc, /proc, /sys) - /nix (on NixOS systems) - /dev (with ioctl for GPU access) - Paths specified via --landlock-allow-readable Usage: python main.py --enable-landlock python main.py --enable-landlock --landlock-allow-writable /extra/dir python main.py --enable-landlock --landlock-allow-readable ~/.cache/huggingface Requirements: - Linux with kernel 5.13+ (fails with error on unsupported systems) - Once enabled, restrictions cannot be lifted for the process lifetime - Network access is not restricted (Landlock FS only) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 52e778f commit 2c8e1e8

File tree

3 files changed

+314
-0
lines changed

3 files changed

+314
-0
lines changed

comfy/cli_args.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ def __call__(self, parser, namespace, values, option_string=None):
4747
parser.add_argument("--output-directory", type=str, default=None, help="Set the ComfyUI output directory. Overrides --base-directory.")
4848
parser.add_argument("--temp-directory", type=str, default=None, help="Set the ComfyUI temp directory (default is in the ComfyUI directory). Overrides --base-directory.")
4949
parser.add_argument("--input-directory", type=str, default=None, help="Set the ComfyUI input directory. Overrides --base-directory.")
50+
parser.add_argument("--enable-landlock", action="store_true", help="Use the Linux Landlock LSM to restrict filesystem writes to known ComfyUI and cache directories.")
51+
parser.add_argument("--landlock-allow-writable", action="append", default=[], metavar="PATH", help="Extra directories that remain writable when --enable-landlock is set. Can be provided multiple times.")
52+
parser.add_argument("--landlock-allow-readable", action="append", default=[], metavar="PATH", help="Extra directories to allow read access when --enable-landlock is set. Can be provided multiple times.")
5053
parser.add_argument("--auto-launch", action="store_true", help="Automatically launch ComfyUI in the default browser.")
5154
parser.add_argument("--disable-auto-launch", action="store_true", help="Disable auto launching the browser.")
5255
parser.add_argument("--cuda-device", type=int, default=None, metavar="DEVICE_ID", help="Set the id of the cuda device this instance will use. All other devices will not be visible.")

main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from app.logger import setup_logger
1010
import itertools
1111
import utils.extra_config
12+
import utils.landlock
1213
import logging
1314
import sys
1415
from comfy_execution.progress import get_progress_state
@@ -311,6 +312,13 @@ def start_comfyui(asyncio_loop=None):
311312
folder_paths.set_temp_directory(temp_dir)
312313
cleanup_temp()
313314

315+
if args.enable_landlock:
316+
logging.info("Enabling Landlock")
317+
landlock_ok = utils.landlock.enable_landlock(args, logging.getLogger("landlock"))
318+
if not landlock_ok:
319+
logging.critical("Requested Landlock sandbox but it could not be enabled. Exiting.")
320+
sys.exit(1)
321+
314322
if args.windows_standalone_build:
315323
try:
316324
import new_updater

utils/landlock.py

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
import ctypes
2+
import errno
3+
import logging
4+
import os
5+
import sys
6+
import tempfile
7+
from dataclasses import dataclass
8+
9+
# Landlock constants copied from linux/landlock.h
10+
PR_SET_NO_NEW_PRIVS = 38
11+
12+
LANDLOCK_RULE_PATH_BENEATH = 1
13+
LANDLOCK_CREATE_RULESET_VERSION = 1
14+
15+
LANDLOCK_ACCESS_FS_EXECUTE = 1 << 0
16+
LANDLOCK_ACCESS_FS_WRITE_FILE = 1 << 1
17+
LANDLOCK_ACCESS_FS_READ_FILE = 1 << 2
18+
LANDLOCK_ACCESS_FS_READ_DIR = 1 << 3
19+
LANDLOCK_ACCESS_FS_REMOVE_DIR = 1 << 4
20+
LANDLOCK_ACCESS_FS_REMOVE_FILE = 1 << 5
21+
LANDLOCK_ACCESS_FS_MAKE_CHAR = 1 << 6
22+
LANDLOCK_ACCESS_FS_MAKE_DIR = 1 << 7
23+
LANDLOCK_ACCESS_FS_MAKE_REG = 1 << 8
24+
LANDLOCK_ACCESS_FS_MAKE_SOCK = 1 << 9
25+
LANDLOCK_ACCESS_FS_MAKE_FIFO = 1 << 10
26+
LANDLOCK_ACCESS_FS_MAKE_BLOCK = 1 << 11
27+
LANDLOCK_ACCESS_FS_MAKE_SYM = 1 << 12
28+
LANDLOCK_ACCESS_FS_REFER = 1 << 13
29+
LANDLOCK_ACCESS_FS_TRUNCATE = 1 << 14
30+
LANDLOCK_ACCESS_FS_IOCTL_DEV = 1 << 15 # ABI v5+
31+
32+
# Pre-computed access masks
33+
FS_READ_ACCESS = (
34+
LANDLOCK_ACCESS_FS_READ_FILE
35+
| LANDLOCK_ACCESS_FS_READ_DIR
36+
| LANDLOCK_ACCESS_FS_EXECUTE
37+
)
38+
FS_WRITE_ACCESS = (
39+
FS_READ_ACCESS
40+
| LANDLOCK_ACCESS_FS_WRITE_FILE
41+
| LANDLOCK_ACCESS_FS_MAKE_DIR
42+
| LANDLOCK_ACCESS_FS_MAKE_REG
43+
)
44+
45+
# Syscall numbers are ABI-stable across all 64-bit Linux architectures
46+
SYS_LANDLOCK_CREATE_RULESET = 444
47+
SYS_LANDLOCK_ADD_RULE = 445
48+
SYS_LANDLOCK_RESTRICT_SELF = 446
49+
50+
51+
class _RulesetAttr(ctypes.Structure):
52+
_fields_ = [("handled_access_fs", ctypes.c_uint64)]
53+
54+
55+
class _PathBeneathAttr(ctypes.Structure):
56+
_fields_ = [
57+
("allowed_access", ctypes.c_uint64),
58+
("parent_fd", ctypes.c_int32),
59+
("reserved", ctypes.c_uint32),
60+
]
61+
62+
63+
@dataclass(frozen=True)
64+
class LandlockRules:
65+
read_paths: set[str]
66+
write_paths: set[str]
67+
ioctl_paths: set[str]
68+
69+
70+
def _normalize_paths(paths: set[str]) -> set[str]:
71+
normalized = set()
72+
for path in paths:
73+
if not path:
74+
continue
75+
normalized.add(os.path.realpath(path))
76+
return normalized
77+
78+
79+
class LandlockEnforcer:
80+
def __init__(self, logger: logging.Logger | None = None):
81+
self.log = logger or logging.getLogger(__name__)
82+
self.libc = ctypes.CDLL(None, use_errno=True)
83+
self.libc.syscall.restype = ctypes.c_long
84+
self.libc.prctl.restype = ctypes.c_int
85+
86+
def _syscall(self, syscall_nr, *args) -> tuple[int | None, int]:
87+
ctypes.set_errno(0)
88+
res = self.libc.syscall(ctypes.c_long(syscall_nr), *args)
89+
if res == -1:
90+
return None, ctypes.get_errno()
91+
return res, 0
92+
93+
def _abi_version(self) -> int:
94+
res, err = self._syscall(
95+
SYS_LANDLOCK_CREATE_RULESET,
96+
ctypes.c_void_p(0),
97+
ctypes.c_size_t(0),
98+
ctypes.c_uint(LANDLOCK_CREATE_RULESET_VERSION),
99+
)
100+
if res is None:
101+
if err in (errno.ENOSYS, errno.EOPNOTSUPP):
102+
return 0
103+
return -err
104+
return res
105+
106+
def _create_ruleset(self, handled_access: int) -> tuple[int | None, int]:
107+
ruleset = _RulesetAttr(ctypes.c_uint64(handled_access))
108+
return self._syscall(
109+
SYS_LANDLOCK_CREATE_RULESET,
110+
ctypes.byref(ruleset),
111+
ctypes.c_size_t(ctypes.sizeof(ruleset)),
112+
ctypes.c_uint(0),
113+
)
114+
115+
def _add_rule(self, ruleset_fd: int, path: str, access_mask: int, ioctl: bool) -> bool:
116+
if ioctl:
117+
access_mask |= LANDLOCK_ACCESS_FS_IOCTL_DEV
118+
119+
try:
120+
dir_fd = os.open(path, os.O_PATH | os.O_CLOEXEC)
121+
except OSError as exc:
122+
self.log.warning("Landlock: skipping %s (%s)", path, exc)
123+
return False
124+
125+
try:
126+
rule = _PathBeneathAttr(
127+
ctypes.c_uint64(access_mask), ctypes.c_int32(dir_fd), ctypes.c_uint32(0)
128+
)
129+
res, err = self._syscall(
130+
SYS_LANDLOCK_ADD_RULE,
131+
ctypes.c_int(ruleset_fd),
132+
ctypes.c_int(LANDLOCK_RULE_PATH_BENEATH),
133+
ctypes.byref(rule),
134+
ctypes.c_uint(0),
135+
)
136+
if res is None:
137+
self.log.warning("Landlock: failed to add %s (errno=%s)", path, err)
138+
return False
139+
return True
140+
finally:
141+
os.close(dir_fd)
142+
143+
def _restrict_self(self, ruleset_fd: int) -> bool:
144+
if self.libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0:
145+
self.log.warning(
146+
"Landlock: prctl(PR_SET_NO_NEW_PRIVS) failed (errno=%s)",
147+
ctypes.get_errno(),
148+
)
149+
return False
150+
151+
res, err = self._syscall(SYS_LANDLOCK_RESTRICT_SELF, ctypes.c_int(ruleset_fd), ctypes.c_uint(0))
152+
if res is None:
153+
self.log.warning("Landlock: restrict_self failed (errno=%s)", err)
154+
return False
155+
return True
156+
157+
def apply(self, rules: LandlockRules) -> bool:
158+
if not sys.platform.startswith("linux"):
159+
self.log.info("Landlock: not a Linux platform, skipping.")
160+
return False
161+
162+
abi_version = self._abi_version()
163+
if abi_version <= 0:
164+
self.log.info("Landlock: not available on this kernel (abi=%s).", abi_version)
165+
return False
166+
167+
read_access = FS_READ_ACCESS
168+
write_access = FS_WRITE_ACCESS
169+
if abi_version >= 2:
170+
write_access |= LANDLOCK_ACCESS_FS_TRUNCATE
171+
172+
# Build handled_access from all permissions we might grant
173+
handled_access = read_access | write_access
174+
if abi_version >= 5:
175+
handled_access |= LANDLOCK_ACCESS_FS_IOCTL_DEV
176+
177+
ruleset_fd, err = self._create_ruleset(handled_access)
178+
if ruleset_fd is None:
179+
self.log.warning("Landlock: failed to create ruleset (errno=%s)", err)
180+
return False
181+
182+
write_paths = _normalize_paths(rules.write_paths)
183+
read_paths = _normalize_paths(rules.read_paths) - write_paths
184+
# In theory these could require write or read access. Though in practice it's just /dev.
185+
ioctl_paths = _normalize_paths(rules.ioctl_paths)
186+
187+
for path in write_paths:
188+
if path != os.path.sep:
189+
try:
190+
os.makedirs(path, exist_ok=True)
191+
except Exception as exc:
192+
self.log.warning("Landlock: unable to prepare %s (%s)", path, exc)
193+
return False
194+
if not self._add_rule(ruleset_fd, path, write_access, path in ioctl_paths):
195+
return False
196+
197+
for path in read_paths:
198+
self._add_rule(ruleset_fd, path, read_access, path in ioctl_paths)
199+
200+
try:
201+
if not self._restrict_self(ruleset_fd):
202+
return False
203+
finally:
204+
os.close(ruleset_fd)
205+
206+
if write_paths:
207+
self.log.info(
208+
"Landlock enabled (ABI %s). Writable roots: %s",
209+
abi_version,
210+
", ".join(sorted(write_paths)),
211+
)
212+
else:
213+
self.log.info("Landlock enabled (ABI %s). No writable roots configured.", abi_version)
214+
return True
215+
216+
217+
_landlock_applied = False
218+
219+
220+
def build_default_rules(args) -> LandlockRules:
221+
import folder_paths
222+
from urllib.parse import urlparse
223+
224+
write_paths: set[str] = {
225+
folder_paths.get_output_directory(),
226+
folder_paths.get_input_directory(),
227+
folder_paths.get_temp_directory(),
228+
folder_paths.get_user_directory(),
229+
}
230+
231+
ioctl_paths: set[str] = set()
232+
233+
# Torch and some backends use system temp and /dev/shm
234+
write_paths.add(tempfile.gettempdir())
235+
236+
if args.temp_directory:
237+
write_paths.add(os.path.join(os.path.abspath(args.temp_directory), "temp"))
238+
239+
db_url = getattr(args, "database_url", None)
240+
if db_url and db_url.startswith("sqlite"):
241+
parsed = urlparse(db_url)
242+
if parsed.scheme == "sqlite" and parsed.path:
243+
write_paths.add(os.path.abspath(os.path.dirname(parsed.path)))
244+
245+
for path in args.landlock_allow_writable or []:
246+
if path:
247+
write_paths.add(path)
248+
249+
# Build read paths - only what's actually needed
250+
read_paths: set[str] = set()
251+
252+
# ComfyUI codebase
253+
read_paths.add(folder_paths.base_path)
254+
255+
# All configured model directories (includes extra_model_paths.yaml)
256+
for folder_name in folder_paths.folder_names_and_paths:
257+
for path in folder_paths.folder_names_and_paths[folder_name][0]:
258+
read_paths.add(path)
259+
260+
# Python installation and site-packages
261+
read_paths.add(sys.prefix)
262+
if sys.base_prefix != sys.prefix:
263+
read_paths.add(sys.base_prefix)
264+
for path in sys.path:
265+
if path and os.path.isdir(path):
266+
read_paths.add(path)
267+
268+
# System libraries (required for shared libs, CUDA, etc.)
269+
for system_path in ["/usr", "/lib", "/lib64", "/opt", "/etc", "/proc", "/sys"]:
270+
if os.path.exists(system_path):
271+
read_paths.add(system_path)
272+
273+
# NixOS: /nix/store contains the entire system
274+
if os.path.exists("/nix"):
275+
read_paths.add("/nix")
276+
277+
# /dev needs write + ioctl for CUDA/GPU access
278+
write_paths.add("/dev")
279+
ioctl_paths.add("/dev")
280+
281+
# User-specified additional read paths
282+
for path in getattr(args, "landlock_allow_readable", None) or []:
283+
if path:
284+
read_paths.add(path)
285+
286+
return LandlockRules(read_paths=_normalize_paths(read_paths), write_paths=_normalize_paths(write_paths), ioctl_paths=_normalize_paths(ioctl_paths))
287+
288+
289+
def enable_landlock(args, logger: logging.Logger | None = None) -> bool:
290+
global _landlock_applied
291+
292+
if _landlock_applied:
293+
return True
294+
if not getattr(args, "enable_landlock", False):
295+
return False
296+
297+
enforcer = LandlockEnforcer(logger)
298+
try:
299+
_landlock_applied = enforcer.apply(build_default_rules(args))
300+
except Exception:
301+
enforcer.log.exception("Landlock: unexpected failure while applying ruleset.")
302+
_landlock_applied = False
303+
return _landlock_applied

0 commit comments

Comments
 (0)