Skip to content

Commit 07c921d

Browse files
Add option to log to file mount SD Card (#299)
Co-authored-by: Nate Gay <[email protected]> Co-authored-by: Nate Gay <[email protected]>
1 parent 7ec6e0c commit 07c921d

File tree

10 files changed

+444
-20
lines changed

10 files changed

+444
-20
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ repos:
2222
language: pygrep
2323
types: [python]
2424
files: ^circuitpython-workspaces/flight-software
25-
exclude: ^circuitpython-workspaces/flight-software/src/pysquared/beacon\.py|^circuitpython-workspaces/flight-software/src/pysquared/logger\.py|^circuitpython-workspaces/flight-software/src/pysquared/rtc/manager/microcontroller\.py
25+
exclude: ^circuitpython-workspaces/flight-software/src/pysquared/beacon\.py|^circuitpython-workspaces/flight-software/src/pysquared/logger\.py|^circuitpython-workspaces/flight-software/src/pysquared/rtc/manager/microcontroller\.py|^circuitpython-workspaces/flight-software/src/pysquared/hardware/sd_card/manager/sd_card\.py
2626

2727
- repo: local
2828
hooks:
@@ -32,7 +32,7 @@ repos:
3232
entry: '# pyright:? *.*false'
3333
language: pygrep
3434
types: [python]
35-
files: ^circuitpython-workspaces/flight-software
35+
files: ^circuitpython-workspaces/flight-software|^cpython-workspace/
3636

3737
- repo: https://github.com/codespell-project/codespell
3838
rev: v2.4.1
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""
2+
This module provides utilities that can run during the boot process by adding them to boot.py.
3+
"""
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""File includes utilities for managing the filesystem during the boot process."""
2+
3+
import os
4+
import time
5+
6+
import storage
7+
8+
9+
def mkdir(
10+
path: str,
11+
storage_action_delay: float = 0.02,
12+
) -> None:
13+
"""
14+
Create directories on internal storage during boot.
15+
16+
In CircuitPython the internal storage is not writable by default. In order to mount
17+
any external storage (such as an SD Card) the drive must be remounted in read/write mode.
18+
This function handles the necessary steps to safely create a directory on the internal
19+
storage during boot.
20+
21+
Args:
22+
mount_point: Path to mount point
23+
storage_action_delay: Delay after storage actions to ensure stability
24+
25+
Usage:
26+
```python
27+
from pysquared.boot.filesystem import mkdir
28+
mkdir("/sd")
29+
```
30+
"""
31+
try:
32+
storage.disable_usb_drive()
33+
time.sleep(storage_action_delay)
34+
print("Disabled USB drive")
35+
36+
storage.remount("/", False)
37+
time.sleep(storage_action_delay)
38+
print("Remounted root filesystem")
39+
40+
try:
41+
os.mkdir(path)
42+
print(f"Mount point {path} created.")
43+
except OSError:
44+
print(f"Mount point {path} already exists.")
45+
46+
finally:
47+
storage.enable_usb_drive()
48+
time.sleep(storage_action_delay)
49+
print("Enabled USB drive")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""
2+
This module provides an interface for controlling SD cards.
3+
"""
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""This module provides a SD Card class to manipulate the sd card filesystem"""
2+
3+
import sdcardio
4+
import storage
5+
from busio import SPI
6+
from microcontroller import Pin
7+
8+
from ...exception import HardwareInitializationError
9+
10+
11+
class SDCardManager:
12+
"""Class providing various functionalities related to USB and SD card operations."""
13+
14+
"""Initializing class, remounting storage, and initializing SD card"""
15+
16+
def __init__(
17+
self,
18+
spi_bus: SPI,
19+
chip_select: Pin,
20+
baudrate: int = 400000,
21+
mount_path: str = "/sd",
22+
) -> None:
23+
try:
24+
sd = sdcardio.SDCard(spi_bus, chip_select, baudrate)
25+
vfs = storage.VfsFat(sd) # type: ignore # Issue: https://github.com/adafruit/Adafruit_CircuitPython_Typing/issues/51
26+
storage.mount(vfs, mount_path)
27+
except Exception as e:
28+
raise HardwareInitializationError("Failed to initialize SD Card") from e

circuitpython-workspaces/flight-software/src/pysquared/logger.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"""
1414

1515
import json
16+
import os
1617
import time
1718
import traceback
1819
from collections import OrderedDict
@@ -76,6 +77,8 @@ class LogLevel:
7677
class Logger:
7778
"""Handles logging messages with different severity levels."""
7879

80+
_log_dir: str | None = None
81+
7982
def __init__(
8083
self,
8184
error_counter: Counter,
@@ -86,13 +89,13 @@ def __init__(
8689
Initializes the Logger instance.
8790
8891
Args:
89-
error_counter (Counter): Counter for error occurrences.
90-
log_level (int): Initial log level.
91-
colorized (bool): Whether to colorize output.
92+
error_counter: Counter for error occurrences.
93+
log_level: Initial log level.
94+
colorized: Whether to colorize output.
9295
"""
9396
self._error_counter: Counter = error_counter
9497
self._log_level: int = log_level
95-
self.colorized: bool = colorized
98+
self._colorized: bool = colorized
9699

97100
def _can_print_this_level(self, level_value: int) -> bool:
98101
"""
@@ -151,24 +154,19 @@ def _log(self, level: str, level_value: int, message: str, **kwargs) -> None:
151154

152155
json_order.update(kwargs)
153156

154-
try:
155-
json_output = json.dumps(json_order)
156-
except TypeError as e:
157-
json_output = json.dumps(
158-
OrderedDict(
159-
[
160-
("time", asctime),
161-
("level", "ERROR"),
162-
("msg", f"Failed to serialize log message: {e}"),
163-
]
164-
),
165-
)
157+
json_output = json.dumps(json_order)
166158

167159
if self._can_print_this_level(level_value):
168-
if self.colorized:
160+
if self._log_dir is not None:
161+
file = self._log_dir + os.sep + "activity.log"
162+
with open(file, "a") as f:
163+
f.write(json_output + "\n")
164+
165+
if self._colorized:
169166
json_output = json_output.replace(
170167
f'"level": "{level}"', f'"level": "{LogColors[level]}"'
171168
)
169+
172170
print(json_output)
173171

174172
def debug(self, message: str, **kwargs: object) -> None:
@@ -235,3 +233,26 @@ def get_error_count(self) -> int:
235233
int: The number of errors logged.
236234
"""
237235
return self._error_counter.get()
236+
237+
def set_log_dir(self, log_dir: str) -> None:
238+
"""
239+
Sets the log directory for file logging.
240+
241+
Args:
242+
log_dir (str): Directory to save log files.
243+
244+
Raises:
245+
ValueError: If the provided path is not a valid directory.
246+
"""
247+
try:
248+
# Octal number 0o040000 is the stat mode indicating the file being stat'd is a directory
249+
directory_mode: int = 0o040000
250+
st_mode = os.stat(log_dir)[0]
251+
if st_mode != directory_mode:
252+
raise ValueError(
253+
f"Logging path must be a directory, received {st_mode}."
254+
)
255+
except OSError as e:
256+
raise ValueError("Invalid logging path.") from e
257+
258+
self._log_dir = log_dir
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""Unit tests for the filesystem class.
2+
3+
This file contains unit tests for the `filesystem` class, which provides utilities
4+
for managing the filesystem during the boot process. The tests focus on verifying that
5+
the correct sequence of filesystem operations.
6+
"""
7+
8+
import os
9+
import shutil
10+
import sys
11+
import tempfile
12+
from unittest.mock import MagicMock, patch
13+
14+
import pytest
15+
16+
sys.modules["storage"] = MagicMock()
17+
from pysquared.boot.filesystem import mkdir # noqa: E402
18+
19+
20+
@pytest.fixture(autouse=True)
21+
def test_dir():
22+
"""Sets up a temporary directory for testing and cleans it up afterwards."""
23+
temp_dir = tempfile.mkdtemp()
24+
25+
test_dir = os.path.join(temp_dir, "mytestdir")
26+
yield test_dir
27+
28+
shutil.rmtree(temp_dir, ignore_errors=True)
29+
30+
31+
@patch("pysquared.boot.filesystem.storage")
32+
@patch("pysquared.boot.filesystem.time.sleep")
33+
def test_mkdir_success(
34+
mock_sleep: MagicMock,
35+
mock_storage: MagicMock,
36+
test_dir: str,
37+
capsys: pytest.CaptureFixture[str],
38+
) -> None:
39+
"""Test the mkdir function for successful directory creation."""
40+
mkdir(test_dir, 0.02)
41+
42+
# Verify each expected call was made
43+
mock_storage.disable_usb_drive.assert_called_once()
44+
mock_storage.remount.assert_called_once_with("/", False)
45+
mock_storage.enable_usb_drive.assert_called_once()
46+
47+
assert mock_sleep.call_count == 3
48+
mock_sleep.assert_called_with(0.02)
49+
50+
# Verify correct print messages
51+
captured = capsys.readouterr()
52+
assert "Disabled USB drive" in captured.out
53+
assert "Remounted root filesystem" in captured.out
54+
assert f"Mount point {test_dir} created." in captured.out
55+
assert "Enabled USB drive" in captured.out
56+
57+
# directory exists
58+
assert os.path.exists(test_dir)
59+
60+
61+
@patch("pysquared.boot.filesystem.storage")
62+
@patch("pysquared.boot.filesystem.time.sleep")
63+
@patch("pysquared.boot.filesystem.os.mkdir")
64+
def test_mkdir_directory_already_exists(
65+
mock_os_mkdir: MagicMock,
66+
mock_sleep: MagicMock,
67+
mock_storage: MagicMock,
68+
test_dir: str,
69+
capsys: pytest.CaptureFixture[str],
70+
) -> None:
71+
"""Test the mkdir function when directory already exists."""
72+
mock_os_mkdir.side_effect = OSError("Directory already exists")
73+
74+
mkdir(test_dir, 0.02)
75+
76+
# Verify all storage operations still performed
77+
mock_storage.disable_usb_drive.assert_called_once()
78+
mock_storage.remount.assert_called_once_with("/", False)
79+
mock_os_mkdir.assert_called_once_with(test_dir)
80+
mock_storage.enable_usb_drive.assert_called_once()
81+
82+
# Verify correct print messages
83+
captured = capsys.readouterr()
84+
assert "Disabled USB drive" in captured.out
85+
assert "Remounted root filesystem" in captured.out
86+
assert f"Mount point {test_dir} already exists." in captured.out
87+
assert "Enabled USB drive" in captured.out
88+
89+
90+
@patch("pysquared.boot.filesystem.storage")
91+
@patch("pysquared.boot.filesystem.time.sleep")
92+
@patch("pysquared.boot.filesystem.os.mkdir")
93+
def test_mkdir_custom_delay(
94+
mock_os_mkdir: MagicMock,
95+
mock_sleep: MagicMock,
96+
mock_storage: MagicMock,
97+
test_dir: str,
98+
) -> None:
99+
"""Test the mkdir function with custom storage action delay."""
100+
custom_delay = 0.05
101+
mkdir(test_dir, custom_delay)
102+
103+
# Verify sleep was called with custom delay
104+
assert mock_sleep.call_count == 3
105+
mock_sleep.assert_called_with(custom_delay)
106+
107+
108+
@patch("pysquared.boot.filesystem.storage")
109+
@patch("pysquared.boot.filesystem.time.sleep")
110+
@patch("pysquared.boot.filesystem.os.mkdir")
111+
def test_mkdir_default_delay(
112+
mock_os_mkdir: MagicMock,
113+
mock_sleep: MagicMock,
114+
mock_storage: MagicMock,
115+
test_dir: str,
116+
) -> None:
117+
"""Test the mkdir function with default storage action delay."""
118+
mkdir(test_dir) # No delay parameter provided
119+
120+
# Verify sleep was called with default delay
121+
assert mock_sleep.call_count == 3
122+
mock_sleep.assert_called_with(0.02)
123+
124+
125+
@patch("pysquared.boot.filesystem.storage")
126+
@patch("pysquared.boot.filesystem.time.sleep")
127+
@patch("pysquared.boot.filesystem.os.mkdir")
128+
def test_mkdir_storage_disable_exception(
129+
mock_os_mkdir: MagicMock,
130+
mock_sleep: MagicMock,
131+
mock_storage: MagicMock,
132+
test_dir: str,
133+
) -> None:
134+
"""Test mkdir function when storage.disable_usb_drive() raises an exception."""
135+
mock_storage.disable_usb_drive.side_effect = Exception("USB disable failed")
136+
137+
with pytest.raises(Exception, match="USB disable failed"):
138+
mkdir(test_dir, 0.02)
139+
140+
# Verify that enable_usb_drive is still called in finally block
141+
mock_storage.enable_usb_drive.assert_called_once()
142+
143+
144+
@patch("pysquared.boot.filesystem.storage")
145+
@patch("pysquared.boot.filesystem.time.sleep")
146+
def test_mkdir_storage_remount_exception(
147+
mock_sleep: MagicMock,
148+
mock_storage: MagicMock,
149+
test_dir: str,
150+
) -> None:
151+
"""Test mkdir function when storage.remount() raises an exception."""
152+
mock_storage.remount.side_effect = Exception("Remount failed")
153+
154+
with pytest.raises(Exception, match="Remount failed"):
155+
mkdir(test_dir, 0.02)
156+
157+
# Verify that enable_usb_drive is still called in finally block
158+
mock_storage.enable_usb_drive.assert_called_once()
159+
160+
161+
@patch("pysquared.boot.filesystem.storage")
162+
@patch("pysquared.boot.filesystem.time.sleep")
163+
@patch("pysquared.boot.filesystem.os.mkdir")
164+
def test_mkdir_mkdir_exception_not_oserror(
165+
mock_os_mkdir: MagicMock,
166+
mock_sleep: MagicMock,
167+
mock_storage: MagicMock,
168+
test_dir: str,
169+
) -> None:
170+
"""Test mkdir function when os.mkdir() raises a non-OSError exception."""
171+
mock_os_mkdir.side_effect = ValueError("Invalid path")
172+
173+
with pytest.raises(ValueError, match="Invalid path"):
174+
mkdir(test_dir, 0.02)
175+
176+
# Verify that enable_usb_drive is still called in finally block
177+
mock_storage.enable_usb_drive.assert_called_once()

cpython-workspaces/flight-software-unit-tests/src/unit-tests/hardware/test_burnwire.py renamed to cpython-workspaces/flight-software-unit-tests/src/unit-tests/hardware/burnwire/manager/test_burnwire.py

File renamed without changes.

0 commit comments

Comments
 (0)