diff --git a/README.rst b/README.rst index 2369bc9..bb3305f 100644 --- a/README.rst +++ b/README.rst @@ -103,7 +103,7 @@ littlefs-python comes bundled with a command-line tool, ``littlefs-python``, tha .. code:: console $ littlefs-python --help - usage: littlefs-python [-h] [--version] {create,extract,list} ... + usage: littlefs-python [-h] [--version] {create,extract,list,repl} ... Create, extract and inspect LittleFS filesystem images. Use one of the commands listed below, the '-h' / '--help' option can be used on each command @@ -114,10 +114,11 @@ littlefs-python comes bundled with a command-line tool, ``littlefs-python``, tha --version show program's version number and exit Available Commands: - {create,extract,list} + {create,extract,list,repl} create Create LittleFS image from file/directory contents. extract Extract LittleFS image contents to a directory. list List LittleFS image contents. + repl Inspect an existing LittleFS image through an interactive shell. To create a littlefs binary image: @@ -135,6 +136,17 @@ To extract the contents of a littlefs binary image: $ littlefs-python extract lfs.bin output/ --block-size=4096 +To inspect or debug an existing image without extracting it first you can start a +simple REPL. It provides shell-like commands such as ``ls``, ``tree``, ``put``, ``get`` +and ``rm`` that operate directly on the image data: + +.. code:: console + + $ littlefs-python repl lfs.bin --block-size=4096 + Mounted remote littlefs. + littlefs> ls + README.rst + Development Setup ================= diff --git a/src/littlefs/__init__.py b/src/littlefs/__init__.py index 3c7d8b0..6dce6d3 100644 --- a/src/littlefs/__init__.py +++ b/src/littlefs/__init__.py @@ -34,6 +34,7 @@ "LittleFS", "LittleFSError", "UserContext", + "UserContextFile", "UserContextWinDisk", "__LFS_DISK_VERSION__", "__LFS_VERSION__", @@ -48,7 +49,7 @@ # Package not installed pass -from .context import UserContext, UserContextWinDisk +from .context import UserContext, UserContextFile, UserContextWinDisk if TYPE_CHECKING: from .lfs import LFSStat diff --git a/src/littlefs/__main__.py b/src/littlefs/__main__.py index 21292ca..d237aca 100644 --- a/src/littlefs/__main__.py +++ b/src/littlefs/__main__.py @@ -6,6 +6,8 @@ from littlefs import LittleFS, __version__ from littlefs.errors import LittleFSError +from littlefs.repl import LittleFSRepl +from littlefs.context import UserContextFile # Dictionary mapping suffixes to their size in bytes _suffix_map = { @@ -110,7 +112,7 @@ def create(parser: argparse.ArgumentParser, args: argparse.Namespace) -> int: compact_fs.fs_grow(args.block_count) data = compact_fs.context.buffer if not args.no_pad: - data = data.ljust(args.fs_size, b"\xFF") + data = data.ljust(args.fs_size, b"\xff") else: data = fs.context.buffer @@ -188,6 +190,47 @@ def extract(parser: argparse.ArgumentParser, args: argparse.Namespace) -> int: return 0 +def repl(parser: argparse.ArgumentParser, args: argparse.Namespace) -> int: + """Inspect an existing LittleFS image through an interactive shell.""" + + source: Path = args.source + if not source.is_file(): + parser.error(f"Source image '{source}' does not exist.") + + image_size = source.stat().st_size + if not image_size or image_size % args.block_size: + parser.error( + f"Image size ({image_size} bytes) is not a multiple of the supplied block size ({args.block_size})." + ) + + block_count = image_size // args.block_size + if block_count == 0: + parser.error("Image is smaller than a single block; cannot mount.") + + context = UserContextFile(str(source)) + fs = LittleFS( + context=context, + block_size=args.block_size, + block_count=block_count, + name_max=args.name_max, + mount=False, + ) + + shell = LittleFSRepl(fs) + try: + try: + shell.do_mount() + except LittleFSError as exc: + parser.error(f"Failed to mount '{source}': {exc}") + shell.cmdloop() + finally: + if shell._mounted: + with suppress(LittleFSError): + fs.unmount() + + return 0 + + def get_parser(): if sys.argv[0].endswith("__main__.py"): prog = f"python -m littlefs" @@ -299,6 +342,19 @@ def add_command(handler, name="", help=""): help="LittleFS block size.", ) + parser_repl = add_command(repl) + parser_repl.add_argument( + "source", + type=Path, + help="Source LittleFS filesystem binary.", + ) + parser_repl.add_argument( + "--block-size", + type=size_parser, + required=True, + help="LittleFS block size.", + ) + return parser diff --git a/src/littlefs/context.py b/src/littlefs/context.py index d697e77..d127975 100644 --- a/src/littlefs/context.py +++ b/src/littlefs/context.py @@ -1,6 +1,7 @@ import logging import typing import ctypes +import os if typing.TYPE_CHECKING: from .lfs import LFSConfig @@ -78,6 +79,60 @@ def sync(self, cfg: "LFSConfig") -> int: return 0 +class UserContextFile(UserContext): + """File-backed context using the standard library""" + + def __init__(self, file_path: str, *, create: bool = False) -> None: + mode = "r+b" + if not os.path.exists(file_path): + if not create: + raise FileNotFoundError(f"Context file '{file_path}' does not exist") + mode = "w+b" + + self._path = file_path + self._fh = open(file_path, mode) + + def read(self, cfg: "LFSConfig", block: int, off: int, size: int) -> bytearray: + logging.getLogger(__name__).debug("LFS Read : Block: %d, Offset: %d, Size=%d" % (block, off, size)) + start = block * cfg.block_size + off + self._fh.seek(start) + data = self._fh.read(size) + + if len(data) < size: + data += b"\xff" * (size - len(data)) + + return bytearray(data) + + def prog(self, cfg: "LFSConfig", block: int, off: int, data: bytes) -> int: + logging.getLogger(__name__).debug("LFS Prog : Block: %d, Offset: %d, Data=%r" % (block, off, data)) + start = block * cfg.block_size + off + self._fh.seek(start) + self._fh.write(data) + return 0 + + def erase(self, cfg: "LFSConfig", block: int) -> int: + logging.getLogger(__name__).debug("LFS Erase: Block: %d" % block) + start = block * cfg.block_size + self._fh.seek(start) + self._fh.write(b"\xff" * cfg.block_size) + return 0 + + def sync(self, cfg: "LFSConfig") -> int: + self._fh.flush() + os.fsync(self._fh.fileno()) + return 0 + + def close(self) -> None: + if not self._fh.closed: + self._fh.close() + + def __del__(self): + try: + self.close() + except Exception: + pass + + try: import win32file except ImportError: diff --git a/src/littlefs/repl.py b/src/littlefs/repl.py new file mode 100644 index 0000000..ddd706d --- /dev/null +++ b/src/littlefs/repl.py @@ -0,0 +1,281 @@ +from __future__ import annotations + +import cmd +import posixpath +import shlex +import sys +from pathlib import Path +from typing import BinaryIO + +from littlefs import LittleFS, LFSStat + + +class LittleFSRepl(cmd.Cmd): + """Interactive shell for inspecting a LittleFS volume via the provided filesystem object.""" + + prompt = "littlefs> " + + def __init__(self, fs: LittleFS) -> None: + """Initialize the shell with a LittleFS handle.""" + super().__init__() + self._fs = fs + self._mounted = False + self._cwd = "/" + + def onecmd(self, line: str) -> bool: + """Dispatch a command while converting unexpected errors to readable messages.""" + try: + return super().onecmd(line) + except Exception as exc: # noqa: BLE001 + print(f"{exc.__class__.__name__}: {exc}") + return False + + def _ensure_mounted(self) -> None: + """Raise if the remote filesystem is not mounted yet.""" + if not self._mounted: + raise RuntimeError("Filesystem is not mounted. Run 'mount' first.") + + def _resolve_path(self, raw_path: str | None) -> str: + """Normalize local CLI paths into absolute LittleFS paths.""" + candidate = raw_path.strip() if raw_path and raw_path.strip() else self._cwd + if not candidate.startswith("/"): + candidate = posixpath.join(self._cwd, candidate) + normalized = posixpath.normpath(candidate) + return "/" if normalized in ("", ".") else normalized + + def _resolve_remote_destination(self, raw_path: str, basename: str) -> str: + """Return the full remote destination path, adding basename when needed.""" + if not raw_path: + # Put in root + return self._resolve_path(basename) + + if raw_path.endswith("/"): + # Remote ends with a slash, attach name + return self._resolve_path(posixpath.join(raw_path, basename)) + + # Full filepath provided + return self._resolve_path(raw_path) + + @staticmethod + def _copy_stream(src: BinaryIO, dst: BinaryIO) -> None: + """Copy data from src to dst in small chunks.""" + while True: + chunk = src.read(16 * 1024) + if not chunk: + break + dst.write(chunk) + + def do_mount(self, _: str = "") -> None: + """Attempt to mount the remote filesystem.""" + if self._mounted: + raise RuntimeError("Filesystem already mounted.") + self._fs.mount() + self._mounted = True + self._cwd = "/" + print("Mounted remote littlefs.") + + def do_unmount(self, _: str = "") -> None: + """Unmount the remote filesystem.""" + if not self._mounted: + raise RuntimeError("Filesystem already unmounted.") + self._fs.unmount() + self._mounted = False + self._cwd = "/" + print("Unmounted remote littlefs.") + + def do_ls(self, line: str = "") -> None: + """List directory contents for the provided path.""" + self._ensure_mounted() + target = self._resolve_path(line) + entries = self._fs.listdir(target) + for fname in entries: + print(fname) + + def do_dir(self, line: str = "") -> None: + """Alias for ls (Windows muscle memory).""" + self.do_ls(line) + + def do_cd(self, line: str = "") -> None: + """Change the current working directory.""" + self._ensure_mounted() + target = "/" if not line.strip() else self._resolve_path(line) + stat = self._fs.stat(target) + if stat.type != LFSStat.TYPE_DIR: + raise NotADirectoryError(f"Not a directory: {target}") + self._cwd = target + print(self._cwd) + + def do_pwd(self, _: str = "") -> None: + """Print the current working directory.""" + self._ensure_mounted() + print(self._cwd) + + def do_tree(self, line: str = "") -> None: + """Print the directory tree rooted at the provided path.""" + self._ensure_mounted() + root = self._resolve_path(line) + print(root) + self._print_tree(root, "") + + def _print_tree(self, path: str, prefix: str) -> None: + """Recursive helper for do_tree that renders a tree view.""" + entries = sorted(self._fs.listdir(path)) + for idx, name in enumerate(entries): + child = posixpath.join(path, name) + stat = self._fs.stat(child) + + is_dir = stat.type == LFSStat.TYPE_DIR + connector = "`-- " if idx == len(entries) - 1 else "|-- " + label = f"{name}/" if is_dir else name + print(f"{prefix}{connector}{label}") + if is_dir: + extension = " " if idx == len(entries) - 1 else "| " + self._print_tree(child, prefix + extension) + + def do_put(self, line: str) -> None: + """Copy a local file to the LittleFS volume.""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: put [remote_path]") + + local_path = Path(args[0]).expanduser() + if not local_path.is_file(): + raise FileNotFoundError(f"Local file not found: {local_path}") + + remote_arg = args[1] if len(args) > 1 else None + remote_target = self._resolve_remote_destination(remote_arg or "", local_path.name) + parent = posixpath.dirname(remote_target) + + if parent and parent != "/": + self._fs.makedirs(parent, exist_ok=True) + + with open(local_path, "rb") as src, self._fs.open(remote_target, "wb") as dst: + self._copy_stream(src, dst) + + print(f"Put {local_path} -> {remote_target}") + + def do_get(self, line: str) -> None: + """Copy a remote file from the LittleFS volume to the host.""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: get [local_path]") + remote_path = self._resolve_path(args[0]) + local_arg = args[1] if len(args) > 1 else None + remote_name = Path(remote_path).name or "download.bin" + local_path = self._resolve_local_destination(local_arg, remote_name) + if local_path.parent and not local_path.parent.exists(): + local_path.parent.mkdir(parents=True, exist_ok=True) + with self._fs.open(remote_path, "rb") as src, open(local_path, "wb") as dst: + self._copy_stream(src, dst) + print(f"Got {remote_path} -> {local_path}") + + def do_cat(self, line: str = "") -> None: + """Print the contents of a remote file to stdout.""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: cat ") + remote_path = self._resolve_path(args[0]) + with self._fs.open(remote_path, "rb") as src: + self._copy_stream(src, sys.stdout.buffer) + sys.stdout.buffer.flush() + + def do_xxd(self, line: str = "") -> None: + """Show a hexadecimal dump of a remote file.""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: xxd ") + + remote_path = self._resolve_path(args[0]) + offset = 0 + with self._fs.open(remote_path, "rb") as src: + while True: + chunk = src.read(16) + if not chunk: + break + hex_pairs = [f"{byte:02x}" for byte in chunk] + first_half = " ".join(hex_pairs[:8]) + second_half = " ".join(hex_pairs[8:]) + hex_line = f"{first_half:<23} {second_half:<23}".rstrip() + ascii_line = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) + print(f"{offset:08x}: {hex_line:<48} {ascii_line}") + offset += len(chunk) + + def do_mkdir(self, line: str) -> None: + """Create a directory (use -p to create parents).""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: mkdir [-p] ") + recursive = False + target_arg = args[0] + if args[0] == "-p": + if len(args) < 2: + raise ValueError("Usage: mkdir [-p] ") + recursive = True + target_arg = args[1] + target = self._resolve_path(target_arg) + if recursive: + self._fs.makedirs(target, exist_ok=True) + else: + self._fs.mkdir(target) + print(f"Created directory {target}") + + def do_mv(self, line: str) -> None: + """Rename or move a file or directory.""" + self._ensure_mounted() + args = shlex.split(line) + if len(args) != 2: + raise ValueError("Usage: mv ") + src = self._resolve_path(args[0]) + dst = self._resolve_path(args[1]) + self._fs.rename(src, dst) + if self._cwd == src: + self._cwd = dst + elif self._cwd.startswith(f"{src}/"): + suffix = self._cwd[len(src) :] + self._cwd = posixpath.normpath(f"{dst}{suffix}") + print(f"Moved {src} -> {dst}") + + def do_rm(self, line: str) -> None: + """Remove a file or directory (use -r for recursive removal).""" + self._ensure_mounted() + args = shlex.split(line) + if not args: + raise ValueError("Usage: rm [-r] ") + recursive = False + target_arg = args[0] + if args[0] == "-r": + if len(args) < 2: + raise ValueError("Usage: rm [-r] ") + recursive = True + target_arg = args[1] + target = self._resolve_path(target_arg) + self._fs.remove(target, recursive=recursive) + print(f"Removed {target}") + + def _resolve_local_destination(self, raw_path: str | None, remote_name: str) -> Path: + """Determine the local path where a remote file should be stored.""" + if not raw_path: + # Filename only + return Path(remote_name) + + expanded = Path(raw_path).expanduser() + if raw_path.endswith(("/", "\\")) or expanded.is_dir(): + return expanded / remote_name + + return expanded + + def do_exit(self, _): + """Exit the CLI session.""" + return True + + def default(self, line: str): + """Handle EOF shortcuts or fall back to cmd.Cmd implementation.""" + if line == "EOF": + print() + return self.do_exit(line) + return super().default(line) diff --git a/test/test_context.py b/test/test_context.py new file mode 100644 index 0000000..db5c15c --- /dev/null +++ b/test/test_context.py @@ -0,0 +1,41 @@ +import pytest + +from littlefs import LittleFS +from littlefs.context import UserContextFile + + +def test_user_context_file_requires_existing(tmp_path): + missing = tmp_path / "missing.bin" + + with pytest.raises(FileNotFoundError): + UserContextFile(str(missing)) + + +def test_user_context_file_persists_between_mounts(tmp_path): + block_size = 128 + block_count = 32 + backing = tmp_path / "littlefs.bin" + + ctx = UserContextFile(str(backing), create=True) + fs = LittleFS(context=ctx, block_size=block_size, block_count=block_count) + + with fs.open("hello.txt", "w") as fh: + fh.write("hello world") + + with fs.open("data.bin", "wb") as fh: + fh.write(bytes.fromhex("de ad be ef")) + + fs.unmount() + ctx.close() + + ctx2 = UserContextFile(str(backing)) + fs2 = LittleFS(context=ctx2, block_size=block_size, block_count=block_count) + + with fs2.open("hello.txt", "r") as fh: + assert fh.read() == "hello world" + + with fs2.open("data.bin", "rb") as fh: + assert fh.read() == bytes.fromhex("de ad be ef") + + fs2.unmount() + ctx2.close()