Skip to content

Commit f79881c

Browse files
authored
fix(bus): regeneration of bus datasets hangs on old days (#667)
* temporary change to limit metrics regeneration to 2025 - OOM errors processing 2023 data. * add fall2025 bus dataset to tableau job lists * need partition template otherwise it loops through and processes the whole directory...many times over. * need to do it for the original bus_mapping files as well * set bus n days to include fall rating that started aug 24 * setting a date in write_bus_metrics causes it to always reprocess all of the files from that date forward. this is causing us to process all of 2025 constantly. * remove import unused
1 parent cc36eee commit f79881c

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

src/lamp_py/tableau/jobs/lamp_jobs.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import date
12
import os
23
from lamp_py.bus_performance_manager.events_joined import TMDailyWorkPiece
34
from lamp_py.common.gtfs_types import RouteType
@@ -7,6 +8,9 @@
78
)
89

910
from lamp_py.runtime_utils.remote_files import (
11+
LAMP,
12+
S3_ARCHIVE,
13+
S3Location,
1014
springboard_rt_vehicle_positions,
1115
springboard_devgreen_rt_vehicle_positions,
1216
springboard_rt_trip_updates, # main feed, all lines, unique records
@@ -26,6 +30,13 @@
2630
tableau_rail_subway,
2731
)
2832

33+
34+
from lamp_py.runtime_utils.remote_files import (
35+
bus_events,
36+
)
37+
from lamp_py.tableau.conversions.convert_bus_performance_data import apply_bus_analysis_conversions
38+
from lamp_py.tableau.jobs.bus_performance import bus_schema
39+
2940
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob, days_ago
3041
from lamp_py.tableau.jobs.rt_rail import HyperRtRail
3142
from lamp_py.utils.filter_bank import FilterBankRtTripUpdates, FilterBankRtVehiclePositions
@@ -111,7 +122,7 @@
111122
dataframe_filter=None,
112123
parquet_filter=None,
113124
tableau_project_name=LAMP_API_PROJECT,
114-
partition_template="",
125+
partition_template="operator_map_pii_{yy}{mm:02d}{dd:02d}.parquet",
115126
)
116127

117128
HyperBusOperatorMappingAll = FilteredHyperJob(
@@ -122,7 +133,39 @@
122133
dataframe_filter=None,
123134
parquet_filter=None,
124135
tableau_project_name=LAMP_API_PROJECT,
125-
partition_template="",
136+
partition_template="operator_map_pii_{yy}{mm:02d}{dd:02d}.parquet",
137+
)
138+
139+
HyperBusFall2025 = FilteredHyperJob(
140+
remote_input_location=bus_events,
141+
remote_output_location=S3Location(
142+
bucket=S3_ARCHIVE,
143+
prefix=os.path.join(LAMP, "bus_rating_datasets", "year=2025", "Fall2025_BusMetrics.parquet"),
144+
version="1.0",
145+
),
146+
start_date=date(2025, 8, 24),
147+
end_date=date(2025, 12, 13),
148+
processed_schema=bus_schema,
149+
dataframe_filter=apply_bus_analysis_conversions,
150+
parquet_filter=None,
151+
tableau_project_name=LAMP_API_PROJECT,
152+
partition_template="{yy}{mm:02d}{dd:02d}.parquet",
153+
)
154+
155+
HyperBusOperatorFall2025 = FilteredHyperJob(
156+
remote_input_location=bus_operator_mapping,
157+
remote_output_location=S3Location(
158+
bucket=S3_ARCHIVE,
159+
prefix=os.path.join(LAMP, "bus_rating_datasets", "year=2025", "Fall2025_Operator.parquet"),
160+
version="1.0",
161+
),
162+
start_date=date(2025, 8, 24),
163+
end_date=date(2025, 12, 13),
164+
processed_schema=TMDailyWorkPiece.to_pyarrow_schema(),
165+
dataframe_filter=None,
166+
parquet_filter=None,
167+
tableau_project_name=LAMP_API_PROJECT,
168+
partition_template="operator_map_pii_{yy}{mm:02d}{dd:02d}.parquet",
126169
)
127170

128171
# light rail and heavy rail - Enum Types < 2 == 0, 1

src/lamp_py/tableau/pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
HyperGtfsRtVehiclePositionsHeavyRail,
1818
HyperRtRailSubway,
1919
HyperRtRailCommuter,
20+
HyperBusFall2025,
21+
HyperBusOperatorFall2025,
2022
)
2123
from lamp_py.tableau.jobs.rt_alerts import HyperRtAlerts
2224
from lamp_py.tableau.jobs.gtfs_rail import (
@@ -91,6 +93,8 @@ def start_hyper_updates() -> None:
9193
HyperGtfsRtVehiclePositionsHeavyRail,
9294
HyperGtfsRtTripUpdatesHeavyRail,
9395
HyperGtfsRtVehiclePositionsAllLightRail,
96+
HyperBusFall2025,
97+
HyperBusOperatorFall2025,
9498
]
9599

96100
for job in hyper_jobs:
@@ -138,6 +142,8 @@ def start_bus_parquet_updates() -> None:
138142
HyperBusOperatorMappingRecent,
139143
HyperBusPerformanceAll(),
140144
HyperBusOperatorMappingAll,
145+
HyperBusFall2025,
146+
HyperBusOperatorFall2025,
141147
]
142148

143149
for job in parquet_update_jobs:

0 commit comments

Comments
 (0)