Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions aws_doc_sdk_examples_tools/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from shutil import rmtree

from pathspec import GitIgnoreSpec
from aws_doc_sdk_examples_tools.fs import Fs, PathFs


def match_path_to_specs(path: Path, specs: List[GitIgnoreSpec]) -> bool:
Expand All @@ -21,7 +22,7 @@ def match_path_to_specs(path: Path, specs: List[GitIgnoreSpec]) -> bool:


def walk_with_gitignore(
root: Path, specs: List[GitIgnoreSpec] = []
root: Path, specs: List[GitIgnoreSpec] = [], fs: Fs = PathFs()
) -> Generator[Path, None, None]:
"""
Starting from a root directory, walk the file system yielding a path for each file.
Expand All @@ -30,27 +31,31 @@ def walk_with_gitignore(
fiddling with a number of flags.
"""
gitignore = root / ".gitignore"
if gitignore.exists():
with open(root / ".gitignore", "r", encoding="utf-8") as ignore_file:
specs = [*specs, GitIgnoreSpec.from_lines(ignore_file.readlines())]
for entry in os.scandir(root):
path = Path(entry.path)
gitignore_stat = fs.stat(gitignore)
if gitignore_stat.exists:
lines = fs.readlines(gitignore)
specs = [*specs, GitIgnoreSpec.from_lines(lines)]

for path in fs.list(root):
if not match_path_to_specs(path, specs):
if entry.is_dir():
yield from walk_with_gitignore(path, specs)
path_stat = fs.stat(path)
if path_stat.is_dir:
yield from walk_with_gitignore(path, specs, fs)
else:
yield path
# Don't yield .gitignore files themselves
if path.name != ".gitignore":
yield path


def get_files(
root: Path, skip: Callable[[Path], bool] = lambda _: False
root: Path, skip: Callable[[Path], bool] = lambda _: False, fs: Fs = PathFs()
) -> Generator[Path, None, None]:
"""
Yield non-skipped files, that is, anything not matching git ls-files and not
in the "to skip" files that are in git but are machine generated, so we don't
want to validate them.
"""
for path in walk_with_gitignore(root):
for path in walk_with_gitignore(root, fs=fs):
if not skip(path):
yield path

Expand Down
187 changes: 187 additions & 0 deletions aws_doc_sdk_examples_tools/file_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Tests for file_utils.py with filesystem abstraction.
"""

from pathlib import Path

from aws_doc_sdk_examples_tools.fs import RecordFs
from aws_doc_sdk_examples_tools.file_utils import walk_with_gitignore, get_files


class TestWalkWithGitignore:
"""Test walk_with_gitignore with RecordFs."""

def test_basic_directory_traversal(self):
"""Test basic directory traversal without gitignore."""
fs = RecordFs(
{
Path("/root/file1.py"): "print('file1')",
Path("/root/file2.js"): "console.log('file2')",
}
)

files = list(walk_with_gitignore(Path("/root"), fs=fs))

expected = [
Path("/root/file1.py"),
Path("/root/file2.js"),
]
assert sorted(files) == sorted(expected)

def test_gitignore_filtering(self):
"""Test that gitignore rules are applied correctly."""
fs = RecordFs(
{
Path("/root/.gitignore"): "*.tmp\n*.log\n",
Path("/root/keep.py"): "print('keep')",
Path("/root/ignore.tmp"): "temporary",
Path("/root/keep.js"): "console.log('keep')",
Path("/root/debug.log"): "log content",
}
)

files = list(walk_with_gitignore(Path("/root"), fs=fs))

# .gitignore files should not be included in results
expected = [
Path("/root/keep.py"),
Path("/root/keep.js"),
]
assert sorted(files) == sorted(expected)

def test_no_gitignore_file(self):
"""Test directory traversal when no .gitignore exists."""
fs = RecordFs(
{
Path("/root/file1.py"): "print('file1')",
Path("/root/file2.js"): "console.log('file2')",
Path("/root/file3.txt"): "text content",
}
)

files = list(walk_with_gitignore(Path("/root"), fs=fs))

expected = [
Path("/root/file1.py"),
Path("/root/file2.js"),
Path("/root/file3.txt"),
]
assert sorted(files) == sorted(expected)

def test_empty_directory(self):
"""Test walking an empty directory."""
fs = RecordFs({})

files = list(walk_with_gitignore(Path("/empty"), fs=fs))

assert files == []

def test_directory_with_only_gitignore(self):
"""Test directory that only contains .gitignore file."""
fs = RecordFs(
{
Path("/root/.gitignore"): "*.tmp\n",
}
)

files = list(walk_with_gitignore(Path("/root"), fs=fs))

assert files == []

def test_nested_gitignores(self):
"""Test nested gitignore files with different rules."""
fs = RecordFs(
{
# Root level gitignore ignores *.log files
Path("/root/.gitignore"): "*.log\n",
Path("/root/keep.py"): "print('keep')",
Path("/root/debug.log"): "root log", # Should be ignored
# Nested directory with its own gitignore ignoring *.tmp files
Path("/root/subdir/.gitignore"): "*.tmp\n",
Path("/root/subdir/keep.js"): "console.log('keep')",
Path(
"/root/subdir/ignore.tmp"
): "temporary", # Should be ignored by subdir gitignore
Path(
"/root/subdir/keep.log"
): "nested log", # Should be ignored by root gitignore
}
)

files = list(walk_with_gitignore(Path("/root"), fs=fs))

# Only files that don't match any gitignore pattern should be returned
expected = [
Path("/root/keep.py"),
Path("/root/subdir/keep.js"),
]
assert sorted(files) == sorted(expected)


class TestGetFiles:
"""Test get_files with RecordFs."""

def test_get_files_basic(self):
"""Test basic get_files functionality."""
fs = RecordFs(
{
Path("/root/file1.py"): "print('file1')",
Path("/root/file2.js"): "console.log('file2')",
}
)

files = list(get_files(Path("/root"), fs=fs))

expected = [
Path("/root/file1.py"),
Path("/root/file2.js"),
]
assert sorted(files) == sorted(expected)

def test_get_files_with_skip_function(self):
"""Test get_files with skip function."""
fs = RecordFs(
{
Path("/root/keep.py"): "print('keep')",
Path("/root/skip.py"): "print('skip')",
Path("/root/keep.js"): "console.log('keep')",
Path("/root/skip.js"): "console.log('skip')",
}
)

def skip_function(path: Path) -> bool:
return "skip" in path.name

files = list(get_files(Path("/root"), skip=skip_function, fs=fs))

expected = [
Path("/root/keep.py"),
Path("/root/keep.js"),
]
assert sorted(files) == sorted(expected)

def test_get_files_with_gitignore_and_skip(self):
"""Test get_files with both gitignore and skip function."""
fs = RecordFs(
{
Path("/root/.gitignore"): "*.tmp\n",
Path("/root/keep.py"): "print('keep')",
Path("/root/skip.py"): "print('skip')",
Path("/root/ignore.tmp"): "temporary",
Path("/root/keep.js"): "console.log('keep')",
}
)

def skip_function(path: Path) -> bool:
return "skip" in path.name

files = list(get_files(Path("/root"), skip=skip_function, fs=fs))

expected = [
Path("/root/keep.py"),
Path("/root/keep.js"),
]
assert sorted(files) == sorted(expected)
21 changes: 19 additions & 2 deletions aws_doc_sdk_examples_tools/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def glob(self, path: Path, glob: str) -> Generator[Path, None, None]:
return path.glob(glob)

def read(self, path: Path) -> str:
with path.open("r") as file:
with path.open("r", encoding="utf-8") as file:
return file.read()

def readlines(self, path: Path) -> List[str]:
Expand Down Expand Up @@ -118,7 +118,24 @@ def mkdir(self, path: Path):
self.fs.setdefault(path, "")

def list(self, path: Path) -> List[Path]:
return [item for item in self.fs.keys() if item.parent == path]
# If it's a file, return an empty list
if self.stat(path).is_file:
return []

# Gather all entries that are immediate children of `path`
prefix = str(path).rstrip("/") + "/"
children = set()

for item in self.fs.keys():
item_s = str(item)
if item_s.startswith(prefix):
# Determine the remainder path after the prefix
remainder = item_s[len(prefix) :]
# Split off the first component
first_part = remainder.split("/", 1)[0]
children.add(Path(prefix + first_part))

return sorted(children)


fs = PathFs()