Skip to content

Commit ac5a1ba

Browse files
authored
feat: optimize pmtiles generation even more
1 parent d040e10 commit ac5a1ba

29 files changed

+2793
-1582
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import csv
2+
3+
from base_processor import BaseProcessor
4+
from csv_cache import AGENCY_FILE
5+
6+
7+
class AgenciesProcessor(BaseProcessor):
8+
def __init__(
9+
self,
10+
csv_cache,
11+
logger=None,
12+
):
13+
super().__init__(AGENCY_FILE, csv_cache, logger)
14+
self.agencies = {}
15+
16+
def process_file(self):
17+
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
18+
header = f.readline()
19+
if not header:
20+
return
21+
columns = next(csv.reader([header]))
22+
23+
agency_id_index = self.csv_cache.get_index(columns, "agency_id")
24+
agency_name_index = self.csv_cache.get_index(columns, "agency_name")
25+
26+
for line in f:
27+
if not line.strip():
28+
continue
29+
30+
row = self.csv_parser.parse(line)
31+
agency_id = self.csv_cache.get_safe_value_from_index(
32+
row, agency_id_index
33+
)
34+
agency_name = self.csv_cache.get_safe_value_from_index(
35+
row, agency_name_index
36+
)
37+
38+
if agency_id:
39+
self.agencies[agency_id] = agency_name
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from fast_csv_parser import FastCsvParser
2+
from shared.helpers.utils import detect_encoding
3+
from shared.helpers.runtime_metrics import track_metrics
4+
import os
5+
6+
7+
class BaseProcessor:
8+
"""
9+
Minimal base class for processors that read a GTFS CSV and write derived outputs.
10+
11+
Responsibilities
12+
- Resolve the absolute path of the input file via the shared CsvCache.
13+
- Detect file encoding and initialize a fast CSV parser.
14+
- Provide a safe early-return when the input file is absent (some GTFS files are optional).
15+
- Delegate the actual line-by-line work to subclasses via `process_file()`.
16+
17+
Lifecycle contract
18+
- Subclasses should override `process_file(self)` — do not call it directly.
19+
20+
Notes on flags
21+
- `no_download` and `no_delete` are orchestration hints used by the caller (e.g., the builder) to
22+
decide whether to download the source file and whether to delete it afterward. The base class does
23+
not use these flags directly; they are honored by the orchestrator.
24+
"""
25+
26+
def __init__(
27+
self, filename, csv_cache, logger=None, no_download=False, no_delete=False
28+
):
29+
self.filename = filename
30+
self.csv_cache = csv_cache
31+
self.logger = logger or csv_cache.logger
32+
self.no_download = no_download
33+
self.no_delete = no_delete
34+
35+
# Will be populated during `process()`
36+
self.filepath = None
37+
self.csv_parser = None
38+
self.encoding = None
39+
40+
@track_metrics(metrics=("time", "memory", "cpu"))
41+
def process(self):
42+
"""
43+
Entry point called by orchestrators to run a processor in a safe, uniform way.
44+
\
45+
"""
46+
self.filepath = self.csv_cache.get_path(self.filename)
47+
# If the target file does not exist in the workdir, skip processing.
48+
# This avoids exceptions for optional files and keeps pipelines resilient.
49+
if not os.path.exists(self.filepath):
50+
# We don't return an Exception here because the presence of mandatory files has been verified elsewhere.
51+
self.logger.debug(
52+
"File not present, skipping processing: %s", self.filepath
53+
)
54+
return
55+
self.csv_parser = FastCsvParser()
56+
self.encoding = detect_encoding(filename=self.filepath, logger=self.logger)
57+
self.logger.debug("Begin processing file %s", self.filename)
58+
self.process_file()
59+
60+
def process_file(self):
61+
"""
62+
Hook for subclasses to implement the actual processing logic.
63+
64+
Contract
65+
- May assume `self.filepath`, `self.csv_parser`, and `self.encoding` are initialized.
66+
- Should handle empty files gracefully (e.g., by returning early).
67+
- Should not raise on benign format issues where possible; prefer logging and continue.
68+
"""
69+
pass

functions-python/pmtiles_builder/src/csv_cache.py

Lines changed: 51 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
import csv
1716
import logging
1817
import os
1918
import subprocess
2019
from pathlib import Path
21-
from typing import TypedDict, List, Dict
20+
from typing import TypedDict, List
2221

23-
from gtfs import stop_txt_is_lat_log_required
2422
from pympler import asizeof
2523

2624
from shared.helpers.logger import get_logger
27-
from shared.helpers.transform import get_safe_value_from_csv, get_safe_float_from_csv
28-
29-
from shared.helpers.utils import detect_encoding
25+
from shared.helpers.transform import get_safe_value, get_safe_float, get_safe_int
3026

3127
STOP_TIMES_FILE = "stop_times.txt"
3228
SHAPES_FILE = "shapes.txt"
@@ -43,20 +39,13 @@ class ShapeTrips(TypedDict):
4339

4440
def get_volume_size(mountpoint: str):
4541
"""
46-
Returns the total size of the specified filesystem mount point in a human-readable format.
47-
48-
This function uses the `df` command-line utility to determine the size of the filesystem
49-
mounted at the path specified by `mountpoint`. If the mount point does not exist, the function
50-
prints an error message to the standard error and returns "N/A".
42+
Return the total size of the filesystem at `mountpoint` in a human-readable string (e.g., "10G").
5143
52-
Parameters:
53-
mountpoint: str
54-
The filesystem mount point path to check.
44+
Implementation notes
45+
- Uses the system `df -h` command piped through awk to extract the size column.
46+
- Requires a valid path; when the mountpoint doesn't exist, a warning is logged and "N/A" is returned.
47+
- Intended primarily for diagnostics in logs (does not affect processing logic).
5548
56-
Returns:
57-
str
58-
The total size of the specified filesystem mount point in human-readable format. If the
59-
mount point is not found, returns "N/A".
6049
"""
6150
mp = Path(mountpoint)
6251
if not mp.exists():
@@ -77,10 +66,18 @@ def get_volume_size(mountpoint: str):
7766

7867
class CsvCache:
7968
"""
80-
CsvCache provides cached access to GTFS CSV files in a specified working directory.
81-
It lazily loads and caches file contents as lists of dictionaries, and offers
82-
helper methods to retrieve relationships between routes, trips, stops, and shapes.
83-
It lazily loads because not all files are necessarily needed.
69+
Lightweight utility for working-directory file paths and safe CSV extraction helpers.
70+
71+
What it provides
72+
- Path resolution: `get_path(filename)` -> absolute path under the configured workdir.
73+
- Safe CSV accessors: helpers to get values/ints/floats from parsed rows by index without raising.
74+
- Column index lookup: helper to map header names to indices safely.
75+
- Diagnostics: optional deep-size logging of objects in DEBUG, and workdir size at initialization.
76+
77+
Notes
78+
- This class does not currently implement an in-memory cache of full CSVs; processors stream files
79+
directly and use these helpers for robust parsing.
80+
- Constants for common GTFS filenames are exposed at module level (e.g., TRIPS_FILE, STOPS_FILE).
8481
"""
8582

8683
def __init__(
@@ -95,19 +92,11 @@ def __init__(
9592

9693
self.workdir = workdir
9794

98-
self.file_data = {}
99-
self.trip_to_stops: Dict[str, List[str]] = None
100-
self.route_to_trip = None
101-
self.route_to_shape: Dict[str, Dict[str, ShapeTrips]] = None
102-
self.stop_to_route = None
103-
self.stop_to_coordinates = None
104-
self.trips_no_shapes_per_route: Dict[str, List[str]] = {}
105-
10695
self.logger.info("Using work directory: %s", self.workdir)
10796
self.logger.info("Size of workdir: %s", get_volume_size(self.workdir))
10897

10998
def debug_log_size(self, label: str, obj: object) -> None:
110-
"""Log the deep size of an object in bytes when DEBUG is enabled."""
99+
"""Log the deep size of `obj` in bytes when DEBUG is enabled (best-effort, may fall back silently)."""
111100
if self.logger.isEnabledFor(logging.DEBUG):
112101
try:
113102
size_bytes = asizeof.asizeof(obj)
@@ -116,133 +105,38 @@ def debug_log_size(self, label: str, obj: object) -> None:
116105
self.logger.debug("asizeof Failed to compute size for %s: %s", label, e)
117106

118107
def get_path(self, filename: str) -> str:
108+
"""Return the absolute path for `filename` under the current workdir."""
119109
return os.path.join(self.workdir, filename)
120110

121-
def get_file(self, filename) -> list[dict]:
122-
if self.file_data.get(filename) is None:
123-
self.file_data[filename] = self._read_csv(self.get_path(filename))
124-
self.debug_log_size(f"file data for {filename}", self.file_data[filename])
125-
return self.file_data[filename]
126-
127-
def add_data(self, filename: str, data: list[dict]):
128-
self.file_data[filename] = data
129-
130-
def _read_csv(self, filename) -> list[dict]:
131-
"""
132-
Reads the content of a CSV file and returns it as a list of dictionaries
133-
where each dictionary represents a row.
134-
135-
Parameters:
136-
filename (str): The file path of the CSV file to be read.
137-
138-
Raises:
139-
Exception: If there is an error during file opening or reading. The raised
140-
exception will include the original error message along with the file name.
141-
142-
Returns:
143-
list[dict]: A list of dictionaries, each representing a row in the CSV file.
144-
"""
145-
try:
146-
self.logger.debug("Loading %s", filename)
147-
encoding = detect_encoding(filename, logger=self.logger)
148-
with open(filename, newline="", encoding=encoding) as f:
149-
return list(csv.DictReader(f))
150-
except Exception as e:
151-
raise Exception(f"Failed to read CSV file {filename}: {e}") from e
152-
153-
def clear_trip_from_route(self):
154-
self.route_to_trip = None
155-
156-
def get_shape_from_route(self, route_id) -> Dict[str, ShapeTrips]:
157-
"""
158-
Returns a list of shape_ids with associated trip_ids information with a given route_id from the trips file.
159-
The relationship from the route to the shape is via the trips file.
160-
Parameters:
161-
route_id(str): The route identifier to look up.
162-
163-
Returns:
164-
The corresponding shape id.
165-
Example return value: [{'shape_id1': { 'shape_id': 'shape_id1', 'trip_ids': ['trip1', 'trip2']}},
166-
{'shape_id': 'shape_id2', 'trip_ids': ['trip3']}}]
167-
"""
168-
if self.route_to_shape is None:
169-
self._build_route_to_shape()
170-
return self.route_to_shape.get(route_id, {})
171-
172-
def _build_route_to_shape(self):
173-
self.route_to_shape = {}
174-
for row in self.get_file(TRIPS_FILE):
175-
route_id = get_safe_value_from_csv(row, "route_id")
176-
shape_id = get_safe_value_from_csv(row, "shape_id")
177-
trip_id = get_safe_value_from_csv(row, "trip_id")
178-
if route_id and trip_id:
179-
if shape_id:
180-
route_shapes = self.route_to_shape.setdefault(route_id, {})
181-
shape_trips = route_shapes.setdefault(
182-
shape_id, {"shape_id": shape_id, "trip_ids": []}
183-
)
184-
shape_trips["trip_ids"].append(trip_id)
185-
else:
186-
# Registering the trip without a shape for this route for later retrieval.
187-
trip_no_shapes = (
188-
self.trips_no_shapes_per_route.get(route_id)
189-
if route_id in self.trips_no_shapes_per_route
190-
else None
191-
)
192-
if trip_no_shapes is None:
193-
trip_no_shapes = []
194-
self.trips_no_shapes_per_route[route_id] = trip_no_shapes
195-
trip_no_shapes.append(trip_id)
196-
197-
def clear_shape_from_route(self):
198-
self.route_to_shape = None
199-
200-
def get_trips_without_shape_from_route(self, route_id) -> List[str]:
201-
return self.trips_no_shapes_per_route.get(route_id, [])
202-
203-
def _build_trip_to_stops(self):
204-
self.trip_to_stops = {}
205-
for row in self.get_file(STOP_TIMES_FILE):
206-
trip_id = get_safe_value_from_csv(row, "trip_id")
207-
stop_id = get_safe_value_from_csv(row, "stop_id")
208-
if trip_id and stop_id:
209-
trip_to_stops = self.trip_to_stops.setdefault(trip_id, [])
210-
trip_to_stops.append(stop_id)
211-
212-
def clear_stops_from_trip(self):
213-
self.trip_to_stops = None
214-
215-
def get_stops_from_trip(self, trip_id):
216-
if self.trip_to_stops is None:
217-
self._build_trip_to_stops()
218-
return self.trip_to_stops.get(trip_id, [])
219-
220-
def _build_stop_to_coordinates(self):
221-
self.stop_to_coordinates = {}
222-
for s in self.get_file(STOPS_FILE):
223-
row_stop_id = get_safe_value_from_csv(s, "stop_id")
224-
row_stop_lon = get_safe_float_from_csv(s, "stop_lon")
225-
row_stop_lat = get_safe_float_from_csv(s, "stop_lat")
226-
if row_stop_id is None:
227-
self.logger.warning("Missing stop id: %s", s)
228-
continue
229-
if row_stop_lon is None or row_stop_lat is None:
230-
if stop_txt_is_lat_log_required(s):
231-
self.logger.warning("Missing stop latitude and longitude : %s", s)
232-
else:
233-
self.logger.debug(
234-
"Missing optional stop latitude and longitude : %s", s
235-
)
236-
continue
237-
self.stop_to_coordinates[row_stop_id] = (row_stop_lon, row_stop_lat)
238-
239-
def get_coordinates_for_stop(self, stop_id) -> tuple[float, float] | None:
240-
if self.stop_to_coordinates is None:
241-
self._build_stop_to_coordinates()
242-
return self.stop_to_coordinates.get(stop_id, None)
243-
244-
def clear_coordinate_for_stops(self):
245-
self.stop_to_coordinates = None
246-
247111
def set_workdir(self, workdir):
112+
"""Update the working directory used to resolve file paths (does not move files)."""
248113
self.workdir = workdir
114+
115+
@staticmethod
116+
def get_index(columns, column_name):
117+
"""Return the index of `column_name` in the header list `columns`, or None if absent."""
118+
try:
119+
return columns.index(column_name)
120+
except ValueError:
121+
return None
122+
123+
@staticmethod
124+
def get_safe_value_from_index(columns, index, default_value: str = None):
125+
"""Safely fetch a value from `columns` at `index`, applying default and transform semantics."""
126+
return (
127+
get_safe_value(columns[index], default_value)
128+
if index is not None and index < len(columns)
129+
else default_value
130+
)
131+
132+
@staticmethod
133+
def get_safe_float_from_index(columns, index):
134+
"""Fetch a value and coerce to float using standard transform rules (returns None on invalid)."""
135+
raw_value = CsvCache.get_safe_value_from_index(columns, index)
136+
return get_safe_float(raw_value)
137+
138+
@staticmethod
139+
def get_safe_int_from_index(columns, index):
140+
"""Fetch a value and coerce to int using standard transform rules (returns None on invalid)."""
141+
raw_value = CsvCache.get_safe_value_from_index(columns, index)
142+
return get_safe_int(raw_value)

0 commit comments

Comments
 (0)