Skip to content

Commit 6aa5264

Browse files
committed
Fix path traversal and symlink escape in COW filesystem handler
Signed-off-by: Cong Wang <cwang@multikernel.io>
1 parent 34920b0 commit 6aa5264

File tree

3 files changed

+155
-17
lines changed

3 files changed

+155
-17
lines changed

src/sandlock/cowfs/_branch.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,13 @@ def ensure_cow_copy(self, rel_path: str) -> Path:
106106

107107
upper_file.parent.mkdir(parents=True, exist_ok=True)
108108

109-
if lower_file.exists():
110-
shutil.copy2(str(lower_file), str(upper_file))
109+
if lower_file.exists() and not lower_file.is_symlink():
110+
shutil.copy2(str(lower_file), str(upper_file),
111+
follow_symlinks=False)
112+
elif lower_file.is_symlink():
113+
# Copy symlink itself, not its target
114+
link_target = os.readlink(str(lower_file))
115+
os.symlink(link_target, str(upper_file))
111116

112117
return upper_file
113118

src/sandlock/cowfs/_handler.py

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ def matches(self, path: str) -> bool:
4444
"""Check if a path is under the COW workdir."""
4545
return path.startswith(self._workdir_str + "/") or path == self._workdir_str
4646

47+
def _safe_rel(self, path: str) -> str | None:
48+
"""Compute relative path and reject traversal escapes.
49+
50+
Returns the relative path, or None if it escapes the workdir.
51+
"""
52+
rel = os.path.relpath(path, self._workdir_str)
53+
if rel == ".." or rel.startswith("../"):
54+
return None
55+
return rel
56+
4757
def handle_open(self, path: str, flags: int) -> str | None:
4858
"""Determine the real path to open for a COW-intercepted openat.
4959
@@ -52,7 +62,9 @@ def handle_open(self, path: str, flags: int) -> str | None:
5262
if flags & O_DIRECTORY:
5363
return None
5464

55-
rel_path = os.path.relpath(path, self._workdir_str)
65+
rel_path = self._safe_rel(path)
66+
if rel_path is None:
67+
return None
5668

5769
# Deleted file — new create or ENOENT
5870
if self._branch.is_deleted(rel_path):
@@ -78,7 +90,9 @@ def handle_unlink(self, path: str, is_dir: bool = False) -> bool:
7890
7991
Returns True if handled, False to let kernel handle.
8092
"""
81-
rel_path = os.path.relpath(path, self._workdir_str)
93+
rel_path = self._safe_rel(path)
94+
if rel_path is None:
95+
return False
8296
upper_file = self._branch.upper_dir / rel_path
8397
lower_file = Path(self._workdir_str) / rel_path
8498

@@ -102,7 +116,9 @@ def handle_unlink(self, path: str, is_dir: bool = False) -> bool:
102116

103117
def handle_mkdir(self, path: str, mode: int) -> bool:
104118
"""Handle mkdirat: create directory in upper."""
105-
rel_path = os.path.relpath(path, self._workdir_str)
119+
rel_path = self._safe_rel(path)
120+
if rel_path is None:
121+
return False
106122
self._branch._deleted.discard(rel_path)
107123
upper_dir = self._branch.upper_dir / rel_path
108124
upper_dir.mkdir(parents=True, exist_ok=True)
@@ -113,7 +129,9 @@ def handle_stat(self, path: str) -> str | None:
113129
114130
Returns the real path to stat, or None if deleted/nonexistent.
115131
"""
116-
rel_path = os.path.relpath(path, self._workdir_str)
132+
rel_path = self._safe_rel(path)
133+
if rel_path is None:
134+
return None
117135

118136
if self._branch.is_deleted(rel_path):
119137
return None
@@ -125,8 +143,10 @@ def handle_stat(self, path: str) -> str | None:
125143

126144
def handle_rename(self, old_path: str, new_path: str) -> bool:
127145
"""Handle rename: rename in upper dir."""
128-
old_rel = os.path.relpath(old_path, self._workdir_str)
129-
new_rel = os.path.relpath(new_path, self._workdir_str)
146+
old_rel = self._safe_rel(old_path)
147+
new_rel = self._safe_rel(new_path)
148+
if old_rel is None or new_rel is None:
149+
return False
130150

131151
old_upper = self._branch.ensure_cow_copy(old_rel)
132152
new_upper = self._branch.upper_dir / new_rel
@@ -162,8 +182,17 @@ def list_merged_dir(self, rel_path: str) -> list[str]:
162182
return sorted(entries)
163183

164184
def handle_symlink(self, target: str, linkpath: str) -> bool:
165-
"""Handle symlink: create symlink in upper."""
166-
rel_path = os.path.relpath(linkpath, self._workdir_str)
185+
"""Handle symlink: create symlink in upper.
186+
187+
Rejects absolute or traversal symlink targets to prevent
188+
escaping the COW layer on commit.
189+
"""
190+
rel_path = self._safe_rel(linkpath)
191+
if rel_path is None:
192+
return False
193+
# Reject symlink targets that escape the workdir
194+
if os.path.isabs(target) or ".." in target.split("/"):
195+
return False
167196
self._branch._deleted.discard(rel_path)
168197
upper_link = self._branch.upper_dir / rel_path
169198
upper_link.parent.mkdir(parents=True, exist_ok=True)
@@ -172,8 +201,10 @@ def handle_symlink(self, target: str, linkpath: str) -> bool:
172201

173202
def handle_link(self, oldpath: str, newpath: str) -> bool:
174203
"""Handle link: create hard link in upper."""
175-
old_rel = os.path.relpath(oldpath, self._workdir_str)
176-
new_rel = os.path.relpath(newpath, self._workdir_str)
204+
old_rel = self._safe_rel(oldpath)
205+
new_rel = self._safe_rel(newpath)
206+
if old_rel is None or new_rel is None:
207+
return False
177208
old_upper = self._branch.ensure_cow_copy(old_rel)
178209
new_upper = self._branch.upper_dir / new_rel
179210
new_upper.parent.mkdir(parents=True, exist_ok=True)
@@ -182,14 +213,18 @@ def handle_link(self, oldpath: str, newpath: str) -> bool:
182213

183214
def handle_chmod(self, path: str, mode: int) -> bool:
184215
"""Handle chmod: chmod in upper (COW copy if needed)."""
185-
rel_path = os.path.relpath(path, self._workdir_str)
216+
rel_path = self._safe_rel(path)
217+
if rel_path is None:
218+
return False
186219
upper_file = self._branch.ensure_cow_copy(rel_path)
187220
os.chmod(str(upper_file), mode)
188221
return True
189222

190223
def handle_readlink(self, path: str) -> str | None:
191224
"""Handle readlink: resolve symlink from upper or lower."""
192-
rel_path = os.path.relpath(path, self._workdir_str)
225+
rel_path = self._safe_rel(path)
226+
if rel_path is None:
227+
return None
193228

194229
if self._branch.is_deleted(rel_path):
195230
return None
@@ -205,15 +240,19 @@ def handle_readlink(self, path: str) -> str | None:
205240

206241
def handle_truncate(self, path: str, length: int) -> bool:
207242
"""Handle truncate: truncate in upper (COW copy if needed)."""
208-
rel_path = os.path.relpath(path, self._workdir_str)
243+
rel_path = self._safe_rel(path)
244+
if rel_path is None:
245+
return False
209246
upper_file = self._branch.ensure_cow_copy(rel_path)
210247
os.truncate(str(upper_file), length)
211248
return True
212249

213250
def handle_chown(self, path: str, uid: int, gid: int,
214251
follow_symlinks: bool = True) -> bool:
215252
"""Handle chown/fchownat: chown in upper (COW copy if needed)."""
216-
rel_path = os.path.relpath(path, self._workdir_str)
253+
rel_path = self._safe_rel(path)
254+
if rel_path is None:
255+
return False
217256
upper_file = self._branch.ensure_cow_copy(rel_path)
218257
os.chown(str(upper_file), uid, gid,
219258
follow_symlinks=follow_symlinks)
@@ -226,7 +265,9 @@ def handle_utimens(self, path: str,
226265
227266
times is (atime, mtime) as floats, or None for current time.
228267
"""
229-
rel_path = os.path.relpath(path, self._workdir_str)
268+
rel_path = self._safe_rel(path)
269+
if rel_path is None:
270+
return False
230271
upper_file = self._branch.ensure_cow_copy(rel_path)
231272
os.utime(str(upper_file), times=times,
232273
follow_symlinks=follow_symlinks)

tests/test_branchfs.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,98 @@ def test_nested_sandbox_inherits_parent_branch(self, mock_create, mock_mount):
296296
)
297297

298298

299+
class TestCowHandlerPathTraversal(unittest.TestCase):
300+
"""Verify that COW handler rejects path traversal attacks."""
301+
302+
def setUp(self):
303+
import tempfile
304+
self._tmpdir = tempfile.mkdtemp(prefix="sandlock_cow_test_")
305+
self._workdir = Path(self._tmpdir) / "workdir"
306+
self._workdir.mkdir()
307+
# Create a file in the workdir
308+
(self._workdir / "hello.txt").write_text("hello")
309+
310+
from sandlock.cowfs._branch import CowBranch
311+
from sandlock.cowfs._handler import CowHandler
312+
self._branch = CowBranch(self._workdir)
313+
self._branch.create()
314+
self._handler = CowHandler(self._branch)
315+
316+
def tearDown(self):
317+
import shutil
318+
shutil.rmtree(self._tmpdir, ignore_errors=True)
319+
320+
def test_open_traversal_rejected(self):
321+
result = self._handler.handle_open(
322+
str(self._workdir) + "/../../../etc/passwd", 0)
323+
self.assertIsNone(result)
324+
325+
def test_stat_traversal_rejected(self):
326+
result = self._handler.handle_stat(
327+
str(self._workdir) + "/../../../etc/passwd")
328+
self.assertIsNone(result)
329+
330+
def test_unlink_traversal_rejected(self):
331+
result = self._handler.handle_unlink(
332+
str(self._workdir) + "/../../../etc/passwd")
333+
self.assertFalse(result)
334+
335+
def test_mkdir_traversal_rejected(self):
336+
result = self._handler.handle_mkdir(
337+
str(self._workdir) + "/../../../tmp/evil", 0o755)
338+
self.assertFalse(result)
339+
340+
def test_rename_traversal_rejected(self):
341+
result = self._handler.handle_rename(
342+
str(self._workdir) + "/hello.txt",
343+
str(self._workdir) + "/../../../tmp/stolen")
344+
self.assertFalse(result)
345+
346+
def test_symlink_absolute_target_rejected(self):
347+
result = self._handler.handle_symlink(
348+
"/etc/shadow",
349+
str(self._workdir) + "/evil_link")
350+
self.assertFalse(result)
351+
352+
def test_symlink_traversal_target_rejected(self):
353+
result = self._handler.handle_symlink(
354+
"../../../etc/shadow",
355+
str(self._workdir) + "/evil_link")
356+
self.assertFalse(result)
357+
358+
def test_symlink_safe_target_allowed(self):
359+
result = self._handler.handle_symlink(
360+
"hello.txt",
361+
str(self._workdir) + "/good_link")
362+
self.assertTrue(result)
363+
364+
def test_link_traversal_rejected(self):
365+
result = self._handler.handle_link(
366+
str(self._workdir) + "/../../../etc/passwd",
367+
str(self._workdir) + "/stolen")
368+
self.assertFalse(result)
369+
370+
def test_chmod_traversal_rejected(self):
371+
result = self._handler.handle_chmod(
372+
str(self._workdir) + "/../../../etc/passwd", 0o777)
373+
self.assertFalse(result)
374+
375+
def test_readlink_traversal_rejected(self):
376+
result = self._handler.handle_readlink(
377+
str(self._workdir) + "/../../../etc/passwd")
378+
self.assertIsNone(result)
379+
380+
def test_truncate_traversal_rejected(self):
381+
result = self._handler.handle_truncate(
382+
str(self._workdir) + "/../../../etc/passwd", 0)
383+
self.assertFalse(result)
384+
385+
def test_legitimate_path_works(self):
386+
result = self._handler.handle_open(
387+
str(self._workdir) + "/hello.txt", 0)
388+
self.assertIsNotNone(result)
389+
390+
299391
class TestExports(unittest.TestCase):
300392
def test_enums_importable(self):
301393
from sandlock import FsIsolation, BranchAction

0 commit comments

Comments
 (0)