Skip to content

Commit 3e366b4

Browse files
committed
Add new DirEntry class
1 parent c86a536 commit 3e366b4

38 files changed

+700
-515
lines changed

dissect/target/filesystem.py

Lines changed: 145 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pathlib
77
import stat
88
from collections import defaultdict
9+
from functools import cache
910
from typing import TYPE_CHECKING, Any, BinaryIO, Final
1011

1112
from dissect.target.exceptions import (
@@ -24,6 +25,8 @@
2425
if TYPE_CHECKING:
2526
from collections.abc import Callable, Iterator
2627

28+
from typing_extensions import Self
29+
2730
from dissect.target.target import Target
2831

2932
FILESYSTEMS: list[type[Filesystem]] = []
@@ -191,7 +194,7 @@ def iterdir(self, path: str) -> Iterator[str]:
191194
"""
192195
return self.get(path).iterdir()
193196

194-
def scandir(self, path: str) -> Iterator[FilesystemEntry]:
197+
def scandir(self, path: str) -> Iterator[DirEntry]:
195198
"""Iterate over the contents of a directory, return them as FilesystemEntry's.
196199
197200
Args:
@@ -222,7 +225,7 @@ def listdir_ext(self, path: str) -> list[FilesystemEntry]:
222225
Returns:
223226
A list of FilesystemEntry's.
224227
"""
225-
return list(self.scandir(path))
228+
return [e.get() for e in self.scandir(path)]
226229

227230
def walk(
228231
self,
@@ -251,7 +254,7 @@ def walk_ext(
251254
topdown: bool = True,
252255
onerror: Callable[[Exception], None] | None = None,
253256
followlinks: bool = False,
254-
) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]:
257+
) -> Iterator[tuple[list[FilesystemEntry], list[DirEntry], list[DirEntry]]]:
255258
"""Recursively walk a directory pointed to by ``path``, returning :class:`FilesystemEntry` of files
256259
and directories.
257260
@@ -266,7 +269,7 @@ def walk_ext(
266269
"""
267270
return self.get(path).walk_ext(topdown, onerror, followlinks)
268271

269-
def recurse(self, path: str) -> Iterator[FilesystemEntry]:
272+
def recurse(self, path: str) -> Iterator[DirEntry]:
270273
"""Recursively walk a directory and yield contents as :class:`FilesystemEntry`.
271274
272275
Does not follow symbolic links.
@@ -519,7 +522,7 @@ def __repr__(self) -> str:
519522
def __str__(self) -> str:
520523
return str(self.path)
521524

522-
def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry:
525+
def _resolve(self, follow_symlinks: bool = True) -> Self:
523526
"""Helper method to resolve symbolic links.
524527
525528
If ``follow_symlinks`` is ``False``, this function is effectively a no-op.
@@ -535,7 +538,7 @@ def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry:
535538
return self.readlink_ext()
536539
return self
537540

538-
def get(self, path: str) -> FilesystemEntry:
541+
def get(self, path: str) -> Self:
539542
"""Retrieve a :class:`FilesystemEntry` relative to this entry.
540543
541544
Args:
@@ -560,9 +563,10 @@ def iterdir(self) -> Iterator[str]:
560563
Returns:
561564
An iterator of directory entries as path strings.
562565
"""
563-
raise NotImplementedError
566+
for entry in self.scandir():
567+
yield entry.name
564568

565-
def scandir(self) -> Iterator[FilesystemEntry]:
569+
def scandir(self) -> Iterator[DirEntry]:
566570
"""Iterate over the contents of a directory, yields :class:`FilesystemEntry`.
567571
568572
Returns:
@@ -578,13 +582,13 @@ def listdir(self) -> list[str]:
578582
"""
579583
return list(self.iterdir())
580584

581-
def listdir_ext(self) -> list[FilesystemEntry]:
585+
def listdir_ext(self) -> list[Self]:
582586
"""List the contents of a directory as a list of :class:`FilesystemEntry`.
583587
584588
Returns:
585589
A list of :class:`FilesystemEntry`.
586590
"""
587-
return list(self.scandir())
591+
return [e.get() for e in self.scandir()]
588592

589593
def walk(
590594
self,
@@ -615,7 +619,7 @@ def walk_ext(
615619
topdown: bool = True,
616620
onerror: Callable[[Exception], None] | None = None,
617621
followlinks: bool = False,
618-
) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]:
622+
) -> Iterator[tuple[list[Self], list[Self], list[Self]]]:
619623
"""Recursively walk a directory and yield its contents as :class:`FilesystemEntry` split in a tuple of
620624
lists of files, directories and symlinks.
621625
@@ -629,13 +633,13 @@ def walk_ext(
629633
"""
630634
yield from fsutil.walk_ext(self, topdown, onerror, followlinks)
631635

632-
def recurse(self) -> Iterator[FilesystemEntry]:
633-
"""Recursively walk a directory and yield its contents as :class:`FilesystemEntry`.
636+
def recurse(self) -> Iterator[DirEntry]:
637+
"""Recursively walk a directory and yield its contents as :class:`DirEntry`.
634638
635639
Does not follow symbolic links.
636640
637641
Returns:
638-
An iterator of :class:`FilesystemEntry`.
642+
An iterator of :class:`DirEntry`.
639643
"""
640644
yield from fsutil.recurse(self)
641645

@@ -651,7 +655,7 @@ def glob(self, pattern: str) -> Iterator[str]:
651655
for entry in self.glob_ext(pattern):
652656
yield entry.path
653657

654-
def glob_ext(self, pattern: str) -> Iterator[FilesystemEntry]:
658+
def glob_ext(self, pattern: str) -> Iterator[Self]:
655659
"""Iterate over the directory part of ``pattern``, returning entries matching
656660
``pattern`` as :class:`FilesysmteEntry`.
657661
@@ -746,7 +750,7 @@ def readlink(self) -> str:
746750
The path the link points to."""
747751
raise NotImplementedError
748752

749-
def readlink_ext(self) -> FilesystemEntry:
753+
def readlink_ext(self) -> Self:
750754
"""Read the link where this entry points to, return the resulting path as :class:`FilesystemEntry`.
751755
752756
If it is a symlink and returns the string that corresponds to that path.
@@ -841,14 +845,109 @@ def hash(self, algos: list[str] | list[Callable] | None = None) -> tuple[str]:
841845
return hashutil.common(self.open())
842846

843847

848+
class DirEntry:
849+
"""Directory entry base class. Closely models ``os.DirEntry``.
850+
851+
Filesystem implementations are encouraged to subclass this class to provide efficient
852+
implementations of the various methods.
853+
854+
Args:
855+
fs: The filesystem the entry belongs to.
856+
path: The path of the parent directory.
857+
name: The name of the entry.
858+
entry: The raw entry backing this directory entry.
859+
"""
860+
861+
def __init__(self, fs: Filesystem, path: str, name: str, entry: Any):
862+
self.fs = fs
863+
"""The filesystem the entry belongs to."""
864+
self.path = fsutil.join(path, name, alt_separator=self.fs.alt_separator)
865+
"""The full path of the entry."""
866+
self.name = name
867+
"""The name of the entry."""
868+
self.entry = entry
869+
"""The raw entry backing this directory entry."""
870+
871+
self.stat = cache(self.stat)
872+
873+
def __fspath__(self) -> str:
874+
return self.path
875+
876+
def __repr__(self) -> str:
877+
return f"<DirEntry {self.name!r}>"
878+
879+
def get(self) -> FilesystemEntry:
880+
"""Retrieve the :class:`FilesystemEntry` this directory entry points to.
881+
882+
Subclasses should override this method to provide an efficient implementation.
883+
"""
884+
return self.fs.get(self.path)
885+
886+
def is_dir(self, *, follow_symlinks: bool = True) -> bool:
887+
"""Return whether this entry is a directory or a symbolic link pointing to a directory.
888+
889+
Subclasses should override this method to provide an efficient implementation.
890+
"""
891+
try:
892+
return stat.S_ISDIR(self.stat(follow_symlinks=follow_symlinks).st_mode)
893+
except FileNotFoundError:
894+
return False
895+
896+
def is_file(self, *, follow_symlinks: bool = True) -> bool:
897+
"""Return whether this entry is a file or a symbolic link pointing to a file.
898+
899+
Subclasses should override this method to provide an efficient implementation.
900+
"""
901+
try:
902+
return stat.S_ISREG(self.stat(follow_symlinks=follow_symlinks).st_mode)
903+
except FileNotFoundError:
904+
return False
905+
906+
def is_symlink(self) -> bool:
907+
"""Return whether this entry is a symbolic link.
908+
909+
Subclasses should override this method to provide an efficient implementation.
910+
"""
911+
return stat.S_ISLNK(self.stat(follow_symlinks=False).st_mode)
912+
913+
def is_junction(self) -> bool:
914+
"""Return whether this entry is a junction (only valid for NTFS)."""
915+
return False
916+
917+
def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result:
918+
"""Return the stat information of this entry.
919+
920+
Subclasses should override this method to provide an efficient implementation.
921+
922+
Note that this may return slightly different information than a "full" stat on the full filesystem entry,
923+
as in most cases this will generate a stat based on the information available in the directory entry only.
924+
"""
925+
return self.fs.stat(self.path, follow_symlinks=follow_symlinks)
926+
927+
def inode(self) -> int:
928+
"""Return the inode number of this entry."""
929+
return self.stat(follow_symlinks=False).st_ino
930+
931+
932+
class VirtualDirEntry(DirEntry):
933+
fs: VirtualFilesystem
934+
entry: FilesystemEntry
935+
936+
def get(self) -> FilesystemEntry:
937+
return self.entry
938+
939+
def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result:
940+
return self.entry.stat(follow_symlinks=follow_symlinks)
941+
942+
844943
class VirtualDirectory(FilesystemEntry):
845944
"""Virtual directory implementation. Backed by a dict."""
846945

847946
def __init__(self, fs: VirtualFilesystem, path: str):
848947
super().__init__(fs, path, None)
849948
self.up = None
850-
self.top = None
851-
self.entries = {}
949+
self.top: FilesystemEntry | None = None
950+
self.entries: dict[str, FilesystemEntry] = {}
852951

853952
def __getitem__(self, item: str) -> FilesystemEntry:
854953
if not self.fs.case_sensitive:
@@ -879,25 +978,11 @@ def add(self, name: str, entry: FilesystemEntry) -> None:
879978
def get(self, path: str) -> FilesystemEntry:
880979
return self.fs.get(path, relentry=self)
881980

882-
def iterdir(self) -> Iterator[str]:
981+
def scandir(self) -> Iterator[DirEntry]:
883982
yielded = set()
884-
for entry in self.entries:
885-
yield entry
886-
yielded.add(entry)
887-
888-
# self.top used to be a reference to a filesystem. This is now a reference to
889-
# any filesystem entry, usually the root of a filesystem.
890-
if self.top:
891-
for entry in self.top.iterdir():
892-
if entry in yielded or (not self.fs.case_sensitive and entry.lower() in yielded):
893-
continue
894-
yield entry
895-
896-
def scandir(self) -> Iterator[FilesystemEntry]:
897-
yielded = set()
898-
for entry in self.entries.values():
899-
yield entry
900-
yielded.add(entry.name)
983+
for name, entry in self.entries.items():
984+
yield VirtualDirEntry(self.fs, self.path, entry.name, entry)
985+
yielded.add(name)
901986

902987
# self.top used to be a reference to a filesystem. This is now a reference to
903988
# any filesystem entry, usually the root of a filesystem.
@@ -970,10 +1055,7 @@ def get(self, path: str) -> FilesystemEntry:
9701055
return self
9711056
raise NotADirectoryError(f"'{self.path}' is not a directory")
9721057

973-
def iterdir(self) -> Iterator[str]:
974-
raise NotADirectoryError(f"'{self.path}' is not a directory")
975-
976-
def scandir(self) -> Iterator[FilesystemEntry]:
1058+
def scandir(self) -> Iterator[DirEntry]:
9771059
raise NotADirectoryError(f"'{self.path}' is not a directory")
9781060

9791061
def open(self) -> BinaryIO:
@@ -1056,10 +1138,7 @@ def lattr(self) -> Any:
10561138
def get(self, path: str) -> FilesystemEntry:
10571139
return self.fs.get(path, self)
10581140

1059-
def iterdir(self) -> Iterator[str]:
1060-
yield from self.readlink_ext().iterdir()
1061-
1062-
def scandir(self) -> Iterator[FilesystemEntry]:
1141+
def scandir(self) -> Iterator[DirEntry]:
10631142
yield from self.readlink_ext().scandir()
10641143

10651144
def open(self) -> BinaryIO:
@@ -1517,6 +1596,14 @@ def __getattr__(self, attr: str) -> Any:
15171596
return object.__getattribute__(self, attr)
15181597

15191598

1599+
class LayerDirEntry(DirEntry):
1600+
fs: LayerFilesystem
1601+
entry: list[DirEntry]
1602+
1603+
def get(self) -> LayerFilesystemEntry:
1604+
return LayerFilesystemEntry(self.fs, self.path, [e.get() for e in self.entry])
1605+
1606+
15201607
class LayerFilesystemEntry(FilesystemEntry):
15211608
def __init__(self, fs: Filesystem, path: str, entry: FilesystemEntry):
15221609
super().__init__(fs, path, EntryList(entry))
@@ -1542,24 +1629,12 @@ def get(self, path: str) -> FilesystemEntry:
15421629
def open(self) -> BinaryIO:
15431630
return self._resolve()._exec("open")
15441631

1545-
def iterdir(self) -> Iterator[str]:
1546-
yielded = {".", ".."}
1547-
selfentry = self._resolve()
1548-
for fsentry in selfentry.entries:
1549-
for entry_name in fsentry.iterdir():
1550-
name = entry_name if selfentry.fs.case_sensitive else entry_name.lower()
1551-
if name in yielded:
1552-
continue
1553-
1554-
yield entry_name
1555-
yielded.add(name)
1556-
1557-
def scandir(self) -> Iterator[LayerFilesystemEntry]:
1632+
def scandir(self) -> Iterator[LayerDirEntry]:
15581633
# Every entry is actually a list of entries from the different
15591634
# overlaying FSes, of which each may implement a different function
15601635
# like .stat() or .open()
15611636
items = defaultdict(list)
1562-
selfentry = self._resolve()
1637+
selfentry: LayerFilesystemEntry = self._resolve()
15631638
for fsentry in selfentry.entries:
15641639
for entry in fsentry.scandir():
15651640
name = entry.name if selfentry.fs.case_sensitive else entry.name.lower()
@@ -1572,9 +1647,7 @@ def scandir(self) -> Iterator[LayerFilesystemEntry]:
15721647
# The filename for the first entry is taken. Note that in case of
15731648
# non case-sensitive FSes, the different entries from the
15741649
# overlaying FSes may have different casing of the name.
1575-
entry_name = entries[0].name
1576-
path = fsutil.join(selfentry.path, entry_name, alt_separator=selfentry.fs.alt_separator)
1577-
yield LayerFilesystemEntry(selfentry.fs, path, entries)
1650+
yield DirEntry(selfentry.fs, selfentry.path, entries[0].name, entries)
15781651

15791652
def is_file(self, follow_symlinks: bool = True) -> bool:
15801653
try:
@@ -1627,6 +1700,15 @@ def get(self, path: str, relentry: LayerFilesystemEntry | None = None) -> RootFi
16271700
return entry
16281701

16291702

1703+
class RootDirEntry(LayerDirEntry):
1704+
fs: RootFilesystem
1705+
1706+
def get(self) -> RootFilesystemEntry:
1707+
entry = super().get()
1708+
entry.__class__ = RootFilesystemEntry
1709+
return entry
1710+
1711+
16301712
class RootFilesystemEntry(LayerFilesystemEntry):
16311713
fs: RootFilesystem
16321714

@@ -1640,14 +1722,10 @@ def open(self) -> BinaryIO:
16401722
self.fs.target.log.trace("%r::open()", self)
16411723
return super().open()
16421724

1643-
def iterdir(self) -> Iterator[str]:
1644-
self.fs.target.log.trace("%r::iterdir()", self)
1645-
yield from super().iterdir()
1646-
1647-
def scandir(self) -> Iterator[RootFilesystemEntry]:
1725+
def scandir(self) -> Iterator[DirEntry]:
16481726
self.fs.target.log.trace("%r::scandir()", self)
16491727
for entry in super().scandir():
1650-
entry.__class__ = RootFilesystemEntry
1728+
entry.__class__ = RootDirEntry
16511729
yield entry
16521730

16531731
def is_file(self, follow_symlinks: bool = True) -> bool:

0 commit comments

Comments
 (0)