Skip to content

Commit 115ba31

Browse files
authored
Strip column names before using them. Added a bit of robustness. (#1444)
1 parent 9f8d57d commit 115ba31

File tree

10 files changed

+292
-165
lines changed

10 files changed

+292
-165
lines changed

functions-python/pmtiles_builder/src/agencies_processor.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import csv
2-
31
from base_processor import BaseProcessor
42
from csv_cache import AGENCY_FILE
53

@@ -16,9 +14,9 @@ def __init__(
1614
def process_file(self):
1715
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
1816
header = f.readline()
19-
if not header:
17+
columns = self.csv_parser.parse_header(header)
18+
if not columns:
2019
return
21-
columns = next(csv.reader([header]))
2220

2321
agency_id_index = self.csv_cache.get_index(columns, "agency_id")
2422
agency_name_index = self.csv_cache.get_index(columns, "agency_name")

functions-python/pmtiles_builder/src/fast_csv_parser.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,21 @@ def parse(
1919
) -> List[str]: # pragma: no cover (behavior tested indirectly)
2020
if '"' in line:
2121
self._lines_with_quotes += 1
22-
return next(csv.reader([line]))
23-
return line.rstrip("\r\n").split(",")
22+
row = next(
23+
csv.reader([line]), []
24+
) # default to empty list if iterator is exhausted
25+
else:
26+
row = line.rstrip("\r\n").split(",")
27+
28+
return [c.strip() for c in row]
29+
30+
@staticmethod
31+
def parse_header(header: str) -> List[str]:
32+
"""Parse a CSV header line into a list of column names.
33+
Ignore leading/trailing whitespace around column names.
34+
35+
"""
36+
if not header:
37+
return []
38+
columns = next(csv.reader([header]))
39+
return [c.strip() for c in columns]

functions-python/pmtiles_builder/src/routes_processor.py

Lines changed: 86 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import csv
21
import json
32
from typing import TextIO, Dict, List
43

@@ -53,79 +52,99 @@ def process_file(self):
5352
) as routes_json_file:
5453
geojson_file.write('{"type": "FeatureCollection", "features": [\n')
5554
routes_json_file.write("[\n")
56-
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
57-
header = f.readline()
58-
if not header:
59-
return
60-
columns = next(csv.reader([header]))
61-
62-
route_id_index = csv_cache.get_index(columns, "route_id")
63-
agency_id_index = csv_cache.get_index(columns, "agency_id")
64-
route_short_name_index = csv_cache.get_index(
65-
columns, "route_short_name"
66-
)
67-
route_long_name_index = csv_cache.get_index(columns, "route_long_name")
68-
route_type_index = csv_cache.get_index(columns, "route_type")
69-
route_text_color_index = csv_cache.get_index(
70-
columns, "route_text_color"
71-
)
72-
route_color_index = csv_cache.get_index(columns, "route_color")
73-
74-
line_number = 1
75-
for line in f:
76-
if not line.strip():
77-
continue
78-
79-
row = self.csv_parser.parse(line)
80-
route_id = csv_cache.get_safe_value_from_index(row, route_id_index)
81-
agency_id = csv_cache.get_safe_value_from_index(
82-
row, agency_id_index, "default"
83-
)
84-
route_short_name = csv_cache.get_safe_value_from_index(
85-
row, route_short_name_index
55+
try:
56+
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
57+
header = f.readline()
58+
columns = self.csv_parser.parse_header(header)
59+
if not columns:
60+
return
61+
62+
route_id_index = csv_cache.get_index(columns, "route_id")
63+
agency_id_index = csv_cache.get_index(columns, "agency_id")
64+
route_short_name_index = csv_cache.get_index(
65+
columns, "route_short_name"
8666
)
87-
route_long_name = csv_cache.get_safe_value_from_index(
88-
row, route_long_name_index
67+
route_long_name_index = csv_cache.get_index(
68+
columns, "route_long_name"
8969
)
90-
route_type = csv_cache.get_safe_value_from_index(
91-
row, route_type_index
92-
)
93-
route_color = csv_cache.get_safe_value_from_index(
94-
row, route_color_index
95-
)
96-
route_text_color = csv_cache.get_safe_value_from_index(
97-
row, route_text_color_index
70+
route_type_index = csv_cache.get_index(columns, "route_type")
71+
route_text_color_index = csv_cache.get_index(
72+
columns, "route_text_color"
9873
)
74+
route_color_index = csv_cache.get_index(columns, "route_color")
9975

100-
# Pass all parsed values to add_to_routes_geojson
101-
self.add_to_routes_geojson(
102-
geojson_file=geojson_file,
103-
route_id=route_id,
104-
agency_id=agency_id,
105-
route_short_name=route_short_name,
106-
route_long_name=route_long_name,
107-
route_type=route_type,
108-
route_color=route_color,
109-
route_text_color=route_text_color,
110-
)
76+
if route_id_index is None:
77+
self.logger.warning(
78+
"Missing required route_id column in routes header; skipping routes processing"
79+
)
80+
return
11181

112-
self.add_to_routes_json(
113-
routes_json_file=routes_json_file,
114-
route_id=route_id,
115-
route_short_name=route_short_name,
116-
route_long_name=route_long_name,
117-
route_type=route_type,
118-
route_color=route_color,
119-
route_text_color=route_text_color,
120-
)
82+
line_number = 0
83+
for line in f:
84+
line_number += 1
85+
if not line.strip():
86+
continue
12187

122-
if line_number % 100 == 0 or line_number == 1:
123-
self.logger.debug(
124-
"Processed route %d (route_id: %s)", line_number, route_id
125-
)
88+
row = self.csv_parser.parse(line)
89+
route_id = csv_cache.get_safe_value_from_index(
90+
row, route_id_index
91+
)
92+
agency_id = csv_cache.get_safe_value_from_index(
93+
row, agency_id_index, "default"
94+
)
95+
route_short_name = csv_cache.get_safe_value_from_index(
96+
row, route_short_name_index
97+
)
98+
route_long_name = csv_cache.get_safe_value_from_index(
99+
row, route_long_name_index
100+
)
101+
route_type = csv_cache.get_safe_value_from_index(
102+
row, route_type_index
103+
)
104+
route_color = csv_cache.get_safe_value_from_index(
105+
row, route_color_index
106+
)
107+
route_text_color = csv_cache.get_safe_value_from_index(
108+
row, route_text_color_index
109+
)
110+
111+
# Pass all parsed values to add_to_routes_geojson
112+
self.add_to_routes_geojson(
113+
geojson_file=geojson_file,
114+
route_id=route_id,
115+
agency_id=agency_id,
116+
route_short_name=route_short_name,
117+
route_long_name=route_long_name,
118+
route_type=route_type,
119+
route_color=route_color,
120+
route_text_color=route_text_color,
121+
)
126122

127-
geojson_file.write("\n]}")
128-
routes_json_file.write("\n]")
123+
self.add_to_routes_json(
124+
routes_json_file=routes_json_file,
125+
route_id=route_id,
126+
route_short_name=route_short_name,
127+
route_long_name=route_long_name,
128+
route_type=route_type,
129+
route_color=route_color,
130+
route_text_color=route_text_color,
131+
)
132+
133+
if line_number % 100 == 0 or line_number == 1:
134+
self.logger.debug(
135+
"Processed route %d (route_id: %s)", line_number, route_id
136+
)
137+
finally:
138+
# Ensure we always close the JSON arrays even on early return or exceptions.
139+
try:
140+
geojson_file.write("\n]}")
141+
except Exception:
142+
# best-effort: don't let closing failures mask the original error
143+
pass
144+
try:
145+
routes_json_file.write("\n]")
146+
except Exception:
147+
pass
129148

130149
if self.missing_coordinates_routes:
131150
self.logger.info(

functions-python/pmtiles_builder/src/routes_processor_for_colors.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
1-
import csv
2-
31
from base_processor import BaseProcessor
42
from csv_cache import ROUTES_FILE
53

64

75
class RoutesProcessorForColors(BaseProcessor):
86
"""Read routes.txt to map route_id → route_color for later use.
9-
Routes processing is split in two to avoid circular dependencies: StopsProcessor can rely on route colors
10-
without requiring the full routes build to have run.
11-
The input file is retained for the next pass over routes.txt in RoutesProcessor (no_delete=True).
7+
8+
Routes processing is split in two to avoid circular dependencies: StopsProcessor can
9+
rely on route colors without requiring the full routes build to have run. The
10+
input file is retained for the next pass over routes.txt in RoutesProcessor
11+
(no_delete=True).
1212
"""
1313

14-
def __init__(
15-
self,
16-
csv_cache,
17-
logger=None,
18-
):
14+
def __init__(self, csv_cache, logger=None):
1915
super().__init__(ROUTES_FILE, csv_cache, logger, no_delete=True)
2016
self.route_colors_map = {}
2117

2218
def process_file(self):
2319
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
2420
header = f.readline()
25-
if not header:
21+
columns = self.csv_parser.parse_header(header)
22+
if not columns:
2623
return
27-
columns = next(csv.reader([header]))
2824

2925
route_id_index = self.csv_cache.get_index(columns, "route_id")
3026
route_color_index = self.csv_cache.get_index(columns, "route_color")
3127

28+
if route_id_index is None:
29+
self.logger.warning(
30+
"Missing required route_id column in routes header; skipping routes processing for colors"
31+
)
32+
return
33+
3234
for line in f:
3335
if not line.strip():
3436
continue

functions-python/pmtiles_builder/src/shapes_processor.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import collections
2-
import csv
32
import os
43
import psutil
54

@@ -8,7 +7,6 @@
87
from base_processor import BaseProcessor
98
from csv_cache import SHAPES_FILE
109
from shared.helpers.runtime_metrics import track_metrics
11-
from shared.helpers.utils import detect_encoding
1210

1311

1412
class ShapesProcessor(BaseProcessor):
@@ -29,18 +27,27 @@ def process_file(self):
2927
process = psutil.Process(os.getpid())
3028

3129
try:
32-
encoding = detect_encoding(filename=self.filepath, logger=self.logger)
33-
with open(self.filepath, "r", encoding=encoding, newline="") as f:
30+
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
3431
header = f.readline()
35-
if not header:
32+
columns = self.csv_parser.parse_header(header)
33+
if not columns:
3634
return
37-
columns = next(csv.reader([header]))
35+
3836
shape_id_index = csv_cache.get_index(columns, "shape_id")
3937
lon_idx = csv_cache.get_index(columns, "shape_pt_lon")
4038
lat_idx = csv_cache.get_index(columns, "shape_pt_lat")
4139
seq_idx = csv_cache.get_index(columns, "shape_pt_sequence")
4240

41+
# If any required column index is None, warn and skip processing.
42+
if None in (shape_id_index, lon_idx, lat_idx, seq_idx):
43+
self.logger.warning(
44+
"Missing required columns in shapes header; skipping shapes processing"
45+
)
46+
return
47+
4348
for line in f:
49+
line_count += 1
50+
4451
try:
4552
if not line.strip():
4653
continue
@@ -52,7 +59,6 @@ def process_file(self):
5259
)
5360

5461
self.unique_shape_id_counts[shape_id] += 1
55-
line_count += 1
5662
if line_count % 1_000_000 == 0:
5763
mem_mb = process.memory_info().rss / (
5864
1024 * 1024
@@ -89,6 +95,8 @@ def process_file(self):
8995
f.readline() # Skip header
9096
needs_sorting = False
9197
for line in f:
98+
line_count += 1
99+
92100
try:
93101
if not line.strip():
94102
continue
@@ -120,7 +128,6 @@ def process_file(self):
120128

121129
positions_in_coordinates_arrays[shape_id] = position + 1
122130

123-
line_count += 1
124131
if line_count % 1_000_000 == 0:
125132
mem_mb = process.memory_info().rss / (
126133
1024 * 1024

functions-python/pmtiles_builder/src/stop_times_processor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import csv
21
from collections import defaultdict
32
from typing import Dict, List
43

@@ -20,13 +19,19 @@ def process_file(self):
2019
trip_to_stops: Dict[str, List[tuple]] = {}
2120
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
2221
header = f.readline()
23-
if not header:
22+
columns = self.csv_parser.parse_header(header)
23+
if not columns:
2424
return
25-
columns = next(csv.reader([header]))
2625
stop_id_index = self.csv_cache.get_index(columns, "stop_id")
2726
trip_id_index = self.csv_cache.get_index(columns, "trip_id")
2827
seq_index = self.csv_cache.get_index(columns, "stop_sequence")
2928

29+
if trip_id_index is None:
30+
self.logger.warning(
31+
"Missing required trip_id column in stop_times header; skipping stop_times processing"
32+
)
33+
return
34+
3035
# Collect unique trips without shapes across all routes (for parsing only)
3136
trips_without_shape_set = set()
3237
for trip_list in self.trips_processor.trips_no_shapes_per_route.values():
@@ -37,13 +42,13 @@ def process_file(self):
3742
seq_fallback_counter = defaultdict(int)
3843

3944
for line in f:
45+
line_count += 1
4046
if not line.strip():
4147
continue
4248
row = self.csv_parser.parse(line)
4349
stop_id = self.csv_cache.get_safe_value_from_index(row, stop_id_index)
4450
trip_id = self.csv_cache.get_safe_value_from_index(row, trip_id_index)
4551

46-
line_count += 1
4752
if line_count % 1_000_000 == 0:
4853
self.logger.debug(
4954
"Processed %d lines of %s", line_count, self.filename

0 commit comments

Comments
 (0)