Skip to content

Commit 02391c2

Browse files
authored
Compute days_ago at runtime (#685)
1 parent 689715e commit 02391c2

File tree

4 files changed

+40
-39
lines changed

4 files changed

+40
-39
lines changed

runners/run_gtfs_rt_parquet_converter.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from lamp_py.bus_performance_manager.events_joined import TMDailyWorkPiece
77
from lamp_py.tableau.conversions import convert_gtfs_rt_trip_updates, convert_gtfs_rt_vehicle_position
88
from lamp_py.tableau.hyper import HyperJob
9-
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob, days_ago
9+
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob
1010
from lamp_py.tableau.pipeline import (
1111
GTFS_RT_TABLEAU_PROJECT,
1212
HyperGtfsRtVehiclePositions,
@@ -87,7 +87,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
8787
HyperGtfsRtVehiclePositions = FilteredHyperJob(
8888
remote_input_location=springboard_rt_vehicle_positions,
8989
remote_output_location=tableau_rt_vehicle_positions_lightrail_60_day,
90-
start_date=days_ago(60),
90+
start_date=60,
9191
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
9292
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
9393
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -97,7 +97,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
9797
HyperGtfsRtTripUpdates = FilteredHyperJob(
9898
remote_input_location=springboard_rt_trip_updates,
9999
remote_output_location=tableau_rt_trip_updates_lightrail_60_day,
100-
start_date=days_ago(60),
100+
start_date=60,
101101
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
102102
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_prod,
103103
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,
@@ -107,7 +107,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
107107
HyperGtfsRtVehiclePositionsHeavyRail = FilteredHyperJob(
108108
remote_input_location=springboard_rt_vehicle_positions,
109109
remote_output_location=tableau_rt_vehicle_positions_heavyrail_30_day,
110-
start_date=days_ago(60),
110+
start_date=60,
111111
processed_schema=convert_gtfs_rt_vehicle_position.HeavyRailTerminalVehiclePositions.to_pyarrow_schema(),
112112
dataframe_filter=convert_gtfs_rt_vehicle_position.heavyrail,
113113
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.heavy_rail,
@@ -117,7 +117,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
117117
HyperGtfsRtTripUpdatesHeavyRail = FilteredHyperJob(
118118
remote_input_location=springboard_rt_trip_updates,
119119
remote_output_location=tableau_rt_trip_updates_heavyrail_30_day,
120-
start_date=days_ago(30),
120+
start_date=30,
121121
processed_schema=convert_gtfs_rt_trip_updates.HeavyRailTerminalTripUpdates.to_pyarrow_schema(),
122122
dataframe_filter=convert_gtfs_rt_trip_updates.heavyrail,
123123
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.heavy_rail,
@@ -127,7 +127,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
127127
HyperDevGreenGtfsRtVehiclePositions = FilteredHyperJob(
128128
remote_input_location=springboard_devgreen_rt_vehicle_positions,
129129
remote_output_location=tableau_devgreen_rt_vehicle_positions_lightrail_60_day,
130-
start_date=days_ago(60),
130+
start_date=60,
131131
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
132132
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
133133
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -137,7 +137,7 @@ def start_gtfs_rt_parquet_updates_local() -> None:
137137
HyperDevGreenGtfsRtTripUpdates = FilteredHyperJob(
138138
remote_input_location=springboard_devgreen_rt_trip_updates,
139139
remote_output_location=tableau_devgreen_rt_trip_updates_lightrail_60_day,
140-
start_date=days_ago(60),
140+
start_date=60,
141141
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
142142
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_devgreen,
143143
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,

runners/run_parquet_hyper_lamp.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44

55
from lamp_py.tableau.hyper import HyperJob
66
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceRecent, HyperBusPerformanceAll
7-
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob, days_ago
8-
from datetime import date
97
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob
8+
from datetime import date, timedelta
109

1110
from lamp_py.runtime_utils.remote_files import (
1211
LAMP,
@@ -47,18 +46,18 @@
4746
TestHyperBusPerformanceRecent = HyperBusPerformanceRecent
4847
TestHyperBusPerformanceAll = HyperBusPerformanceAll
4948

50-
51-
TestHyperGtfsRtVehiclePositions.start_date = days_ago(1)
52-
TestHyperGtfsRtTripUpdates.start_date = days_ago(1)
53-
TestHyperGtfsRtVehiclePositionsHeavyRail.start_date = days_ago(1)
54-
TestHyperGtfsRtTripUpdatesHeavyRail.start_date = days_ago(1)
55-
TestHyperGtfsRtVehiclePositionsAllLightRail.start_date = days_ago(1)
56-
TestHyperDevGreenGtfsRtVehiclePositions.start_date = days_ago(1)
57-
TestHyperDevGreenGtfsRtTripUpdates.start_date = days_ago(1)
58-
TestHyperBusOperatorMappingRecent.start_date = days_ago(1)
59-
TestHyperBusOperatorMappingAll.start_date = days_ago(1)
60-
# TestHyperRtRailSubway.start_date=days_ago(1)
61-
# TestHyperRtRailCommuter.start_date=days_ago(1)
49+
yesterday = date.today() - timedelta(1)
50+
TestHyperGtfsRtVehiclePositions.start_date = yesterday
51+
TestHyperGtfsRtTripUpdates.start_date = yesterday
52+
TestHyperGtfsRtVehiclePositionsHeavyRail.start_date = yesterday
53+
TestHyperGtfsRtTripUpdatesHeavyRail.start_date = yesterday
54+
TestHyperGtfsRtVehiclePositionsAllLightRail.start_date = yesterday
55+
TestHyperDevGreenGtfsRtVehiclePositions.start_date = yesterday
56+
TestHyperDevGreenGtfsRtTripUpdates.start_date = yesterday
57+
TestHyperBusOperatorMappingRecent.start_date = yesterday
58+
TestHyperBusOperatorMappingAll.start_date = yesterday
59+
# TestHyperRtRailSubway.start_date=yesterday
60+
# TestHyperRtRailCommuter.start_date=yesterday
6261

6362
TestHyperBus = FilteredHyperJob(
6463
remote_input_location=bus_events,

src/lamp_py/tableau/jobs/filtered_hyper.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Callable
2-
from datetime import date, datetime, timedelta
2+
from datetime import date, timedelta
33
import pyarrow
44
import pyarrow.parquet as pq
55
import pyarrow.dataset as pd
@@ -16,11 +16,6 @@
1616
from lamp_py.aws.s3 import file_list_from_s3, file_list_from_s3_date_range
1717

1818

19-
def days_ago(num_days: int) -> date:
20-
"""helper function to get a date() object set to num_days ago"""
21-
return (datetime.now() - timedelta(days=num_days)).date()
22-
23-
2419
# pylint: disable=R0917,R0902,R0913
2520
# pylint too many local variables (more than 15)
2621
class FilteredHyperJob(HyperJob):
@@ -32,20 +27,27 @@ def __init__(
3227
remote_output_location: S3Location,
3328
processed_schema: pyarrow.schema,
3429
tableau_project_name: str,
35-
start_date: date | None = days_ago(7), # default this the past week of data
36-
end_date: date | None = datetime.now().date(),
30+
start_date: date | int | None = None,
31+
end_date: date | None = None,
3732
partition_template: str = "year={yy}/month={mm}/day={dd}/",
3833
parquet_preprocess: Callable[[pyarrow.Table], pyarrow.Table] | None = None,
3934
parquet_filter: pc.Expression | None = None,
4035
dataframe_filter: Callable[[pl.DataFrame], pl.DataFrame] | None = None,
4136
) -> None:
37+
"""Validate start_date and end_date and assign properties if so."""
4238
HyperJob.__init__(
4339
self,
4440
hyper_file_name=remote_output_location.prefix.rsplit("/")[-1].replace(".parquet", ".hyper"),
4541
remote_parquet_path=remote_output_location.s3_uri,
4642
lamp_version=remote_output_location.version,
4743
project_name=tableau_project_name,
4844
)
45+
if end_date is not None:
46+
assert isinstance(start_date, date)
47+
elif isinstance(start_date, int):
48+
end_date = date.today()
49+
start_date = end_date - timedelta(days=start_date)
50+
4951
self.remote_input_location = remote_input_location
5052
self.remote_output_location = remote_output_location
5153
self.processed_schema = processed_schema # expected output schema passed in

src/lamp_py/tableau/jobs/lamp_jobs.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from lamp_py.tableau.conversions.convert_bus_performance_data import apply_bus_analysis_conversions
3838
from lamp_py.tableau.jobs.bus_performance import bus_schema
3939

40-
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob, days_ago
40+
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob
4141
from lamp_py.tableau.jobs.rt_rail import HyperRtRail
4242
from lamp_py.utils.filter_bank import FilterBankRtTripUpdates, FilterBankRtVehiclePositions
4343

@@ -47,7 +47,7 @@
4747
HyperGtfsRtVehiclePositions = FilteredHyperJob(
4848
remote_input_location=springboard_rt_vehicle_positions,
4949
remote_output_location=tableau_rt_vehicle_positions_lightrail_60_day,
50-
start_date=days_ago(60),
50+
start_date=60,
5151
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
5252
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
5353
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -57,7 +57,7 @@
5757
HyperGtfsRtTripUpdates = FilteredHyperJob(
5858
remote_input_location=springboard_lrtp_trip_updates,
5959
remote_output_location=tableau_rt_trip_updates_lightrail_60_day,
60-
start_date=days_ago(60),
60+
start_date=60,
6161
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
6262
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_prod,
6363
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,
@@ -67,7 +67,7 @@
6767
HyperGtfsRtVehiclePositionsHeavyRail = FilteredHyperJob(
6868
remote_input_location=springboard_rt_vehicle_positions,
6969
remote_output_location=tableau_rt_vehicle_positions_heavyrail_30_day,
70-
start_date=days_ago(30),
70+
start_date=30,
7171
processed_schema=convert_gtfs_rt_vehicle_position.HeavyRailTerminalVehiclePositions.to_pyarrow_schema(),
7272
dataframe_filter=convert_gtfs_rt_vehicle_position.heavyrail,
7373
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.heavy_rail,
@@ -77,7 +77,7 @@
7777
HyperGtfsRtTripUpdatesHeavyRail = FilteredHyperJob(
7878
remote_input_location=springboard_rt_trip_updates,
7979
remote_output_location=tableau_rt_trip_updates_heavyrail_30_day,
80-
start_date=days_ago(30),
80+
start_date=30,
8181
processed_schema=convert_gtfs_rt_trip_updates.HeavyRailTerminalTripUpdates.to_pyarrow_schema(),
8282
dataframe_filter=convert_gtfs_rt_trip_updates.heavyrail,
8383
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.heavy_rail,
@@ -87,7 +87,7 @@
8787
HyperGtfsRtVehiclePositionsAllLightRail = FilteredHyperJob(
8888
remote_input_location=springboard_rt_vehicle_positions,
8989
remote_output_location=tableau_rt_vehicle_positions_all_light_rail_7_day,
90-
start_date=days_ago(7),
90+
start_date=7,
9191
processed_schema=convert_gtfs_rt_vehicle_position.VehiclePositions.to_pyarrow_schema(),
9292
dataframe_filter=convert_gtfs_rt_vehicle_position.apply_gtfs_rt_vehicle_positions_timezone_conversions,
9393
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -97,7 +97,7 @@
9797
HyperDevGreenGtfsRtVehiclePositions = FilteredHyperJob(
9898
remote_input_location=springboard_devgreen_rt_vehicle_positions,
9999
remote_output_location=tableau_devgreen_rt_vehicle_positions_lightrail_60_day,
100-
start_date=days_ago(60),
100+
start_date=60,
101101
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
102102
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
103103
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -107,7 +107,7 @@
107107
HyperDevGreenGtfsRtTripUpdates = FilteredHyperJob(
108108
remote_input_location=springboard_devgreen_lrtp_trip_updates,
109109
remote_output_location=tableau_devgreen_rt_trip_updates_lightrail_60_day,
110-
start_date=days_ago(60),
110+
start_date=60,
111111
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
112112
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_devgreen,
113113
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,
@@ -117,7 +117,7 @@
117117
HyperBusOperatorMappingRecent = FilteredHyperJob(
118118
remote_input_location=bus_operator_mapping,
119119
remote_output_location=tableau_bus_operator_mapping_recent,
120-
start_date=days_ago(7),
120+
start_date=7,
121121
processed_schema=TMDailyWorkPiece.to_pyarrow_schema(),
122122
dataframe_filter=None,
123123
parquet_filter=None,
@@ -128,7 +128,7 @@
128128
HyperBusOperatorMappingAll = FilteredHyperJob(
129129
remote_input_location=bus_operator_mapping,
130130
remote_output_location=tableau_bus_operator_mapping_all,
131-
start_date=days_ago(60),
131+
start_date=60,
132132
processed_schema=TMDailyWorkPiece.to_pyarrow_schema(),
133133
dataframe_filter=None,
134134
parquet_filter=None,

0 commit comments

Comments
 (0)