Skip to content

Commit 2d818ca

Browse files
authored
feat: Use the in-memory file system and clean it at startup (#1381)
1 parent 1162f9a commit 2d818ca

File tree

4 files changed

+230
-31
lines changed

4 files changed

+230
-31
lines changed

functions-python/pmtiles_builder/src/csv_cache.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import csv
1717
import logging
1818
import os
19+
import subprocess
20+
from pathlib import Path
1921
from typing import TypedDict, List, Dict
2022

2123
from gtfs import stop_txt_is_lat_log_required
@@ -39,6 +41,40 @@ class ShapeTrips(TypedDict):
3941
trip_ids: List[str]
4042

4143

44+
def get_volume_size(mountpoint: str):
45+
"""
46+
Returns the total size of the specified filesystem mount point in a human-readable format.
47+
48+
This function uses the `df` command-line utility to determine the size of the filesystem
49+
mounted at the path specified by `mountpoint`. If the mount point does not exist, the function
50+
prints an error message to the standard error and returns "N/A".
51+
52+
Parameters:
53+
mountpoint: str
54+
The filesystem mount point path to check.
55+
56+
Returns:
57+
str
58+
The total size of the specified filesystem mount point in human-readable format. If the
59+
mount point is not found, returns "N/A".
60+
"""
61+
mp = Path(mountpoint)
62+
if not mp.exists():
63+
logging.warning("Mountpoint not found: %s", mountpoint)
64+
return "N/A"
65+
cmd = [
66+
"bash",
67+
"-c",
68+
"df -h \"$1\" | awk 'NR==2 {print $2}'",
69+
"_", # $0 placeholder (ignored)
70+
str(mp), # $1
71+
]
72+
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
73+
size = result.stdout.strip()
74+
75+
return size
76+
77+
4278
class CsvCache:
4379
"""
4480
CsvCache provides cached access to GTFS CSV files in a specified working directory.
@@ -68,6 +104,7 @@ def __init__(
68104
self.trips_no_shapes_per_route: Dict[str, List[str]] = {}
69105

70106
self.logger.info("Using work directory: %s", self.workdir)
107+
self.logger.info("Size of workdir: %s", get_volume_size(self.workdir))
71108

72109
def debug_log_size(self, label: str, obj: object) -> None:
73110
"""Log the deep size of an object in bytes when DEBUG is enabled."""

functions-python/pmtiles_builder/src/main.py

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import json
2121
import logging
2222
import os
23+
import shutil
2324
import subprocess
2425
import sys
25-
import tempfile
2626
from enum import Enum
2727
from typing import TypedDict, Tuple, List, Dict
2828

@@ -53,6 +53,108 @@
5353
init_logger()
5454

5555

56+
class EphemeralOrDebugWorkdir:
57+
"""Context manager similar to tempfile.TemporaryDirectory with debug + auto-clean.
58+
59+
Behavior:
60+
- If DEBUG_WORKDIR env var is set (non-empty): that directory is created (if needed),
61+
returned, and never deleted nor cleaned.
62+
- Else: creates a temporary directory under the provided dir (or WORKDIR_ROOT env var),
63+
removes sibling directories older than a TTL (default 3600s / override via WORKDIR_MAX_AGE_SECONDS),
64+
and deletes the created directory on exit.
65+
66+
Only directories whose names start with the fixed CLEANUP_PREFIX are considered for cleanup
67+
to avoid deleting unrelated folders that might exist under the same root.
68+
69+
The final on-disk directory name always starts with the hardcoded prefix 'pmtiles_'.
70+
The caller-supplied prefix (if any) is appended verbatim after that.
71+
"""
72+
73+
CLEANUP_PREFIX = "pmtiles_"
74+
75+
def __init__(
76+
self,
77+
dir: str | None = None,
78+
prefix: str | None = None,
79+
logger: logging.Logger | None = None,
80+
):
81+
import tempfile
82+
83+
self._debug_dir = os.getenv("DEBUG_WORKDIR") or None
84+
self._root = dir or os.getenv("WORKDIR_ROOT", "/tmp/in-memory")
85+
self._logger = logger or get_logger("Workdir")
86+
self._temp: tempfile.TemporaryDirectory[str] | None = None
87+
self.name: str
88+
89+
os.makedirs(self._root, exist_ok=True)
90+
91+
self._ttl_seconds = int(os.getenv("WORKDIR_MAX_AGE_SECONDS", "3600"))
92+
93+
if self._debug_dir:
94+
os.makedirs(self._debug_dir, exist_ok=True)
95+
self.name = self._debug_dir
96+
return
97+
98+
self._cleanup_old()
99+
100+
# Simple prefix: fixed manager prefix + raw user prefix (if any)
101+
combined_prefix = self.CLEANUP_PREFIX + (prefix or "")
102+
103+
self._temp = tempfile.TemporaryDirectory(dir=self._root, prefix=combined_prefix)
104+
self.name = self._temp.name
105+
106+
def _cleanup_old(self):
107+
"""
108+
Delete stale work directories created by this manager (names starting with CLEANUP_PREFIX)
109+
whose modification time is older than the configured TTL.
110+
"""
111+
import time
112+
113+
# If in debug mode, dont cleanup anything
114+
if self._debug_dir:
115+
return
116+
117+
now = time.time()
118+
deleted_count = 0
119+
try:
120+
entries = list(os.scandir(self._root))
121+
except OSError as e:
122+
self._logger.warning("Could not scan workdir root %s: %s", self._root, e)
123+
return
124+
125+
for entry in entries:
126+
try:
127+
if not entry.is_dir(follow_symlinks=False):
128+
continue
129+
if not entry.name.startswith(self.CLEANUP_PREFIX):
130+
continue
131+
try:
132+
age = now - entry.stat(follow_symlinks=False).st_mtime
133+
except OSError:
134+
continue
135+
if age > self._ttl_seconds:
136+
shutil.rmtree(entry.path, ignore_errors=True)
137+
deleted_count += 1
138+
self._logger.debug(
139+
"Removed expired workdir: %s age=%.0fs", entry.path, age
140+
)
141+
except OSError as e:
142+
self._logger.warning("Failed to remove %s: %s", entry.path, e)
143+
144+
if deleted_count:
145+
self._logger.info(
146+
"Cleanup removed %d expired workdirs from %s", deleted_count, self._root
147+
)
148+
149+
def __enter__(self) -> str: # Return path like TemporaryDirectory
150+
return self.name
151+
152+
def __exit__(self, exc_type, exc, tb):
153+
if self._temp:
154+
self._temp.cleanup()
155+
return False # do not suppress exceptions
156+
157+
56158
class RouteCoordinates(TypedDict):
57159
shape_id: str
58160
trip_ids: List[str]
@@ -93,18 +195,11 @@ def build_pmtiles_handler(request: flask.Request) -> dict:
93195
}
94196

95197
try:
96-
# Create a temporary folder to work in. It will be deleted when exiting the block.
97-
with tempfile.TemporaryDirectory(prefix="build_pmtiles_") as temp_dir:
98-
# If DEBUG_WORKDIR is set, use it as the work directory so it survives at the end and can be examined.
99-
# In that case temp_dir will not be used but still deleted at the end of the block.
100-
101-
debug_workdir = os.getenv("DEBUG_WORKDIR")
102-
if debug_workdir:
103-
os.makedirs(debug_workdir, exist_ok=True)
104-
workdir = debug_workdir
105-
else:
106-
workdir = temp_dir
107-
198+
workdir_root = os.getenv("WORKDIR_ROOT", "/tmp/in-memory")
199+
# Use combined context manager that also cleans old directories
200+
with EphemeralOrDebugWorkdir(
201+
dir=workdir_root, prefix=f"{dataset_stable_id}_"
202+
) as workdir:
108203
result = {
109204
"params": {
110205
"feed_stable_id": feed_stable_id,

functions-python/pmtiles_builder/tests/test_build_pmtiles.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -480,25 +480,26 @@ def test_build_pmtiles_handler_feed_not_prefix(self, mock_builder):
480480
self.assertIn("is not a prefix of dataset_stable_id", result["error"])
481481

482482
@patch("main.PmtilesBuilder")
483-
def test_build_pmtiles_handler_success(self, mock_builder):
483+
def test_build_pmtiles_handler_failure(self, mock_builder):
484484
# Set up environment and request
485-
debug_dir = tempfile.mkdtemp()
486-
os.environ["DEBUG_WORKDIR"] = debug_dir
487-
payload = {
488-
"feed_stable_id": self.feed_stable_id,
489-
"dataset_stable_id": self.dataset_stable_id,
490-
}
491-
request = MagicMock()
492-
request.get_json.return_value = payload
493-
494-
# Simulate FAILURE status
495-
instance = mock_builder.return_value
496-
instance.build_pmtiles.return_value = (
497-
instance.OperationStatus.FAILURE,
498-
"fail msg",
499-
)
500-
result = build_pmtiles_handler(request)
501-
self.assertIn("Successfully", result["message"])
485+
with tempfile.TemporaryDirectory() as temp_dir:
486+
os.environ["WORKDIR_ROOT"] = temp_dir
487+
payload = {
488+
"feed_stable_id": self.feed_stable_id,
489+
"dataset_stable_id": self.dataset_stable_id,
490+
}
491+
request = MagicMock()
492+
request.get_json.return_value = payload
493+
494+
# Simulate FAILURE status
495+
instance = mock_builder.return_value
496+
instance.build_pmtiles.return_value = (
497+
instance.OperationStatus.FAILURE,
498+
"fail msg",
499+
)
500+
result = build_pmtiles_handler(request)
501+
self.assertIn("Successfully", result["message"])
502+
self.assertEqual(os.listdir(temp_dir), [], "Expected empty workdir root")
502503

503504

504505
class TestPmtilesBuilderUpload(unittest.TestCase):
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import os
2+
import tempfile
3+
import time
4+
from pathlib import Path
5+
6+
from src.main import EphemeralOrDebugWorkdir
7+
8+
9+
def _backdate(path: Path, seconds_ago: int):
10+
t = time.time() - seconds_ago
11+
os.utime(path, (t, t))
12+
13+
14+
def test_old_prefixed_dir_deleted_and_root_auto_removed(monkeypatch):
15+
monkeypatch.delenv("DEBUG_WORKDIR", raising=False)
16+
ttl = 30
17+
monkeypatch.setenv("WORKDIR_MAX_AGE_SECONDS", str(ttl))
18+
19+
with tempfile.TemporaryDirectory() as root:
20+
root_path = Path(root)
21+
22+
stale_dir = root_path / "pmtiles_stale123"
23+
stale_dir.mkdir()
24+
_backdate(stale_dir, ttl + 300)
25+
26+
keep_other = root_path / "unrelated_dir"
27+
keep_other.mkdir()
28+
29+
fresh_prefixed = root_path / "pmtiles_fresh"
30+
fresh_prefixed.mkdir()
31+
_backdate(fresh_prefixed, 1)
32+
33+
with EphemeralOrDebugWorkdir(dir=root) as new_dir:
34+
assert not stale_dir.exists()
35+
assert keep_other.exists()
36+
assert fresh_prefixed.exists()
37+
assert new_dir.startswith(root)
38+
assert Path(new_dir).exists()
39+
assert Path(new_dir).name.startswith("pmtiles_")
40+
41+
# Temp workdir created by EphemeralOrDebugWorkdir should be gone
42+
assert not Path(new_dir).exists()
43+
44+
# Root still exists inside the TemporaryDirectory context
45+
assert root_path.exists()
46+
47+
# After exiting TemporaryDirectory, root is removed (Not strictly a test of the code, but check anyway)
48+
assert not root_path.exists()
49+
50+
51+
def test_fresh_prefixed_dir_retained(monkeypatch, tmp_path):
52+
monkeypatch.delenv("DEBUG_WORKDIR", raising=False)
53+
monkeypatch.setenv("WORKDIR_MAX_AGE_SECONDS", "1000")
54+
55+
root = tmp_path / "workroot2"
56+
root.mkdir()
57+
58+
recent_dir = root / "pmtiles_recent"
59+
recent_dir.mkdir()
60+
_backdate(recent_dir, 10) # Younger than TTL
61+
62+
with EphemeralOrDebugWorkdir(dir=str(root)):
63+
assert recent_dir.exists()
64+
65+
# Recent directory still present
66+
assert recent_dir.exists()

0 commit comments

Comments
 (0)