Skip to content

Commit 800a39d

Browse files
authored
Factor num_days_ago out of start_date (#688)
1 parent 0800284 commit 800a39d

File tree

3 files changed

+50
-42
lines changed

3 files changed

+50
-42
lines changed

runners/run_parquet_hyper_lamp.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from lamp_py.tableau.hyper import HyperJob
66
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceRecent, HyperBusPerformanceAll
77
from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob
8-
from datetime import date, timedelta
8+
from datetime import date
99

1010
from lamp_py.runtime_utils.remote_files import (
1111
LAMP,
@@ -46,18 +46,18 @@
4646
TestHyperBusPerformanceRecent = HyperBusPerformanceRecent
4747
TestHyperBusPerformanceAll = HyperBusPerformanceAll
4848

49-
yesterday = date.today() - timedelta(1)
50-
TestHyperGtfsRtVehiclePositions.start_date = yesterday
51-
TestHyperGtfsRtTripUpdates.start_date = yesterday
52-
TestHyperGtfsRtVehiclePositionsHeavyRail.start_date = yesterday
53-
TestHyperGtfsRtTripUpdatesHeavyRail.start_date = yesterday
54-
TestHyperGtfsRtVehiclePositionsAllLightRail.start_date = yesterday
55-
TestHyperDevGreenGtfsRtVehiclePositions.start_date = yesterday
56-
TestHyperDevGreenGtfsRtTripUpdates.start_date = yesterday
57-
TestHyperBusOperatorMappingRecent.start_date = yesterday
58-
TestHyperBusOperatorMappingAll.start_date = yesterday
59-
# TestHyperRtRailSubway.start_date=yesterday
60-
# TestHyperRtRailCommuter.start_date=yesterday
49+
yesterday = 1
50+
TestHyperGtfsRtVehiclePositions.num_days_ago = yesterday
51+
TestHyperGtfsRtTripUpdates.num_days_ago = yesterday
52+
TestHyperGtfsRtVehiclePositionsHeavyRail.num_days_ago = yesterday
53+
TestHyperGtfsRtTripUpdatesHeavyRail.num_days_ago = yesterday
54+
TestHyperGtfsRtVehiclePositionsAllLightRail.num_days_ago = yesterday
55+
TestHyperDevGreenGtfsRtVehiclePositions.num_days_ago = yesterday
56+
TestHyperDevGreenGtfsRtTripUpdates.num_days_ago = yesterday
57+
TestHyperBusOperatorMappingRecent.num_days_ago = yesterday
58+
TestHyperBusOperatorMappingAll.num_days_ago = yesterday
59+
# TestHyperRtRailSubway.num_days_ago=yesterday
60+
# TestHyperRtRailCommuter.num_days_ago=yesterday
6161

6262
TestHyperBus = FilteredHyperJob(
6363
remote_input_location=bus_events,
@@ -78,16 +78,16 @@ def start_hyper() -> None:
7878
"""Run all HyperFile Update Jobs"""
7979

8080
hyper_jobs: List[HyperJob] = [
81-
TestHyperGtfsRtVehiclePositions,
82-
TestHyperGtfsRtTripUpdates,
83-
TestHyperGtfsRtVehiclePositionsHeavyRail,
84-
TestHyperGtfsRtTripUpdatesHeavyRail,
85-
TestHyperGtfsRtVehiclePositionsAllLightRail,
81+
# TestHyperGtfsRtVehiclePositions,
82+
# TestHyperGtfsRtTripUpdates,
83+
# TestHyperGtfsRtVehiclePositionsHeavyRail,
84+
# TestHyperGtfsRtTripUpdatesHeavyRail,
85+
# TestHyperGtfsRtVehiclePositionsAllLightRail,
8686
TestHyperDevGreenGtfsRtVehiclePositions,
87-
TestHyperDevGreenGtfsRtTripUpdates,
88-
TestHyperBusOperatorMappingRecent,
89-
TestHyperBusOperatorMappingAll,
90-
TestHyperBus,
87+
# TestHyperDevGreenGtfsRtTripUpdates,
88+
# TestHyperBusOperatorMappingRecent,
89+
# TestHyperBusOperatorMappingAll,
90+
# TestHyperBus,
9191
# TestHyperRtRailSubway,
9292
# TestHyperRtRailCommuter,
9393
]

src/lamp_py/tableau/jobs/filtered_hyper.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Callable
2-
from datetime import date, datetime, timedelta
2+
from datetime import date, timedelta
33
import pyarrow
44
import pyarrow.parquet as pq
55
import pyarrow.dataset as pd
@@ -27,7 +27,8 @@ def __init__(
2727
remote_output_location: S3Location,
2828
processed_schema: pyarrow.schema,
2929
tableau_project_name: str,
30-
start_date: date | int | None = None,
30+
num_days_ago: int | None = None,
31+
start_date: date | None = None,
3132
end_date: date | None = None,
3233
partition_template: str = "year={yy}/month={mm}/day={dd}/",
3334
parquet_preprocess: Callable[[pyarrow.Table], pyarrow.Table] | None = None,
@@ -49,11 +50,12 @@ def __init__(
4950
self.partition_template = partition_template
5051

5152
if start_date is not None and end_date is not None:
52-
assert isinstance(start_date, date)
53+
assert num_days_ago is None # only set num_days_ago or start_date and end_date
5354
assert start_date <= end_date
5455

5556
self.start_date = start_date
5657
self.end_date = end_date
58+
self.num_days_ago = num_days_ago
5759

5860
self.parquet_preprocess = parquet_preprocess # level 1 | complex preprocess
5961
self.parquet_filter = parquet_filter # level 2 | by column and simple filter
@@ -67,11 +69,11 @@ def create_parquet(self, _: DatabaseManager | None) -> None:
6769
self.update_parquet(None)
6870

6971
def update_parquet(self, _: DatabaseManager | None) -> bool:
70-
return self.create_tableau_parquet(partition_template=self.partition_template)
72+
return self.create_tableau_parquet(partition_template=self.partition_template, num_days_ago=self.num_days_ago)
7173

7274
# pylint: disable=R0914, R0912
7375
# pylint too many local variables (more than 15)
74-
def create_tableau_parquet(self, partition_template: str) -> bool:
76+
def create_tableau_parquet(self, partition_template: str, num_days_ago: int | None) -> bool:
7577
"""
7678
Join files into single parquet file for upload to Tableau. apply filter and conversions as necessary
7779
@@ -84,11 +86,6 @@ def create_tableau_parquet(self, partition_template: str) -> bool:
8486
True if parquet created, False otherwise
8587
"""
8688
process_logger = ProcessLogger("filtered_hyper_create_parquet")
87-
if isinstance(self.start_date, int):
88-
end_datetime = datetime.now()
89-
process_logger.add_metadata(now=end_datetime)
90-
self.end_date = end_datetime.date()
91-
self.start_date = self.end_date - timedelta(days=self.start_date)
9289

9390
if self.start_date is not None and self.end_date is not None:
9491
# limitation of filtered hyper only does whole days.
@@ -103,6 +100,18 @@ def create_tableau_parquet(self, partition_template: str) -> bool:
103100
end_date=self.end_date,
104101
start_date=self.start_date,
105102
)
103+
process_logger.add_metadata(start_date=self.start_date, end_date=self.end_date)
104+
elif isinstance(num_days_ago, int):
105+
end_date = date.today()
106+
start_date = end_date - timedelta(days=num_days_ago)
107+
s3_uris = file_list_from_s3_date_range(
108+
bucket_name=self.remote_input_location.bucket,
109+
file_prefix=self.remote_input_location.prefix,
110+
path_template=partition_template,
111+
end_date=end_date,
112+
start_date=start_date,
113+
)
114+
process_logger.add_metadata(start_date=start_date, end_date=end_date)
106115
else:
107116
s3_uris = file_list_from_s3(
108117
bucket_name=self.remote_input_location.bucket,
@@ -115,7 +124,6 @@ def create_tableau_parquet(self, partition_template: str) -> bool:
115124
format="parquet",
116125
filesystem=S3FileSystem(),
117126
)
118-
process_logger.add_metadata(start_date=self.start_date, end_date=self.end_date)
119127
process_logger.log_start()
120128
if len(ds_paths) == 0:
121129
process_logger.add_metadata(n_paths_zero=len(ds_paths))

src/lamp_py/tableau/jobs/lamp_jobs.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
HyperGtfsRtVehiclePositions = FilteredHyperJob(
4848
remote_input_location=springboard_rt_vehicle_positions,
4949
remote_output_location=tableau_rt_vehicle_positions_lightrail_60_day,
50-
start_date=60,
50+
num_days_ago=60,
5151
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
5252
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
5353
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -57,7 +57,7 @@
5757
HyperGtfsRtTripUpdates = FilteredHyperJob(
5858
remote_input_location=springboard_lrtp_trip_updates,
5959
remote_output_location=tableau_rt_trip_updates_lightrail_60_day,
60-
start_date=60,
60+
num_days_ago=60,
6161
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
6262
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_prod,
6363
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,
@@ -67,7 +67,7 @@
6767
HyperGtfsRtVehiclePositionsHeavyRail = FilteredHyperJob(
6868
remote_input_location=springboard_rt_vehicle_positions,
6969
remote_output_location=tableau_rt_vehicle_positions_heavyrail_30_day,
70-
start_date=30,
70+
num_days_ago=30,
7171
processed_schema=convert_gtfs_rt_vehicle_position.HeavyRailTerminalVehiclePositions.to_pyarrow_schema(),
7272
dataframe_filter=convert_gtfs_rt_vehicle_position.heavyrail,
7373
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.heavy_rail,
@@ -77,7 +77,7 @@
7777
HyperGtfsRtTripUpdatesHeavyRail = FilteredHyperJob(
7878
remote_input_location=springboard_rt_trip_updates,
7979
remote_output_location=tableau_rt_trip_updates_heavyrail_30_day,
80-
start_date=30,
80+
num_days_ago=30,
8181
processed_schema=convert_gtfs_rt_trip_updates.HeavyRailTerminalTripUpdates.to_pyarrow_schema(),
8282
dataframe_filter=convert_gtfs_rt_trip_updates.heavyrail,
8383
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.heavy_rail,
@@ -87,7 +87,7 @@
8787
HyperGtfsRtVehiclePositionsAllLightRail = FilteredHyperJob(
8888
remote_input_location=springboard_rt_vehicle_positions,
8989
remote_output_location=tableau_rt_vehicle_positions_all_light_rail_7_day,
90-
start_date=7,
90+
num_days_ago=7,
9191
processed_schema=convert_gtfs_rt_vehicle_position.VehiclePositions.to_pyarrow_schema(),
9292
dataframe_filter=convert_gtfs_rt_vehicle_position.apply_gtfs_rt_vehicle_positions_timezone_conversions,
9393
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -97,7 +97,7 @@
9797
HyperDevGreenGtfsRtVehiclePositions = FilteredHyperJob(
9898
remote_input_location=springboard_devgreen_rt_vehicle_positions,
9999
remote_output_location=tableau_devgreen_rt_vehicle_positions_lightrail_60_day,
100-
start_date=60,
100+
num_days_ago=60,
101101
processed_schema=convert_gtfs_rt_vehicle_position.LightRailTerminalVehiclePositions.to_pyarrow_schema(),
102102
dataframe_filter=convert_gtfs_rt_vehicle_position.lrtp,
103103
parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.light_rail,
@@ -107,7 +107,7 @@
107107
HyperDevGreenGtfsRtTripUpdates = FilteredHyperJob(
108108
remote_input_location=springboard_devgreen_lrtp_trip_updates,
109109
remote_output_location=tableau_devgreen_rt_trip_updates_lightrail_60_day,
110-
start_date=60,
110+
num_days_ago=60,
111111
processed_schema=convert_gtfs_rt_trip_updates.LightRailTerminalTripUpdates.to_pyarrow_schema(),
112112
dataframe_filter=convert_gtfs_rt_trip_updates.lrtp_devgreen,
113113
parquet_filter=FilterBankRtTripUpdates.ParquetFilter.light_rail,
@@ -117,7 +117,7 @@
117117
HyperBusOperatorMappingRecent = FilteredHyperJob(
118118
remote_input_location=bus_operator_mapping,
119119
remote_output_location=tableau_bus_operator_mapping_recent,
120-
start_date=7,
120+
num_days_ago=7,
121121
processed_schema=TMDailyWorkPiece.to_pyarrow_schema(),
122122
dataframe_filter=None,
123123
parquet_filter=None,
@@ -128,7 +128,7 @@
128128
HyperBusOperatorMappingAll = FilteredHyperJob(
129129
remote_input_location=bus_operator_mapping,
130130
remote_output_location=tableau_bus_operator_mapping_all,
131-
start_date=60,
131+
num_days_ago=60,
132132
processed_schema=TMDailyWorkPiece.to_pyarrow_schema(),
133133
dataframe_filter=None,
134134
parquet_filter=None,

0 commit comments

Comments
 (0)