Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion infra/cifuzz/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@
_HTTP_REQUEST_TIMEOUT = 10


def _safe_extract(zip_file, extract_directory):
"""Extracts all members of |zip_file| into |extract_directory|, rejecting
any member whose resolved destination path escapes |extract_directory|
(Zip Slip defence).

Args:
zip_file: An open zipfile.ZipFile object.
extract_directory: The directory to extract into.

Raises:
ValueError: If a ZIP entry would be extracted outside extract_directory.
"""
# Resolve the destination once so symlinks in the directory itself are
# handled correctly before we compare member paths against it.
real_extract_dir = os.path.realpath(extract_directory) + os.sep

for member in zip_file.infolist():
# os.path.join correctly handles absolute member names (e.g. '/etc/passwd')
# by discarding the base, so we normalise via realpath instead.
member_dest = os.path.realpath(
os.path.join(extract_directory, member.filename))

# Directories end with sep after realpath; add sep to the base so that a
# prefix match cannot be fooled by a sibling directory named
# "extract_directory_evil".
if not (member_dest + os.sep).startswith(real_extract_dir):
raise ValueError(
'Zip Slip blocked: member %r would be extracted to %r, '
'which is outside the target directory %r.' %
(member.filename, member_dest, extract_directory))

zip_file.extract(member, extract_directory)


def download_and_unpack_zip(url, extract_directory, headers=None):
"""Downloads and unpacks a zip file from an HTTP URL.

Expand All @@ -56,10 +90,13 @@ def download_and_unpack_zip(url, extract_directory, headers=None):

try:
with zipfile.ZipFile(tmp_file.name, 'r') as zip_file:
zip_file.extractall(extract_directory)
_safe_extract(zip_file, extract_directory)
except zipfile.BadZipFile:
logging.error('Error unpacking zip from %s. Bad Zipfile.', url)
return False
except ValueError as err:
logging.error('Error unpacking zip from %s. %s', url, err)
return False

return True

Expand Down
45 changes: 45 additions & 0 deletions infra/cifuzz/http_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
# limitations under the License.
"""Tests for http_utils.py"""

import io
import os
import tempfile
import unittest
import zipfile
from unittest import mock

from pyfakefs import fake_filesystem_unittest
Expand All @@ -23,6 +27,47 @@
mock_get_response = mock.MagicMock(status_code=200, content=b'')


class SafeExtractTest(unittest.TestCase):
"""Tests for _safe_extract (Zip Slip defence)."""

def _make_zip(self, members):
"""Returns a BytesIO holding a ZIP archive with the given member names."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
for name in members:
zf.writestr(name, 'data')
buf.seek(0)
return buf

def test_normal_member_extracted(self):
"""Normal members inside the target directory are extracted without error."""
with tempfile.TemporaryDirectory() as tmpdir:
buf = self._make_zip(['subdir/file.txt', 'root.txt'])
with zipfile.ZipFile(buf) as zf:
http_utils._safe_extract(zf, tmpdir)
self.assertTrue(os.path.exists(os.path.join(tmpdir, 'root.txt')))
self.assertTrue(
os.path.exists(os.path.join(tmpdir, 'subdir', 'file.txt')))

def test_zip_slip_path_traversal_blocked(self):
"""A member with '../' path traversal raises ValueError."""
with tempfile.TemporaryDirectory() as tmpdir:
buf = self._make_zip(['../../../../tmp/evil.txt'])
with zipfile.ZipFile(buf) as zf:
with self.assertRaises(ValueError) as ctx:
http_utils._safe_extract(zf, tmpdir)
self.assertIn('Zip Slip blocked', str(ctx.exception))

def test_absolute_member_path_blocked(self):
"""A member with an absolute path outside the target dir is blocked."""
with tempfile.TemporaryDirectory() as tmpdir:
buf = self._make_zip(['/etc/passwd'])
with zipfile.ZipFile(buf) as zf:
with self.assertRaises(ValueError) as ctx:
http_utils._safe_extract(zf, tmpdir)
self.assertIn('Zip Slip blocked', str(ctx.exception))


class DownloadUrlTest(unittest.TestCase):
"""Tests that download_url works."""
URL = 'https://example.com/file'
Expand Down