Skip to content

Commit 54bdc3d

Browse files
authored
implement a lookback (#65)
1 parent e0e8bfd commit 54bdc3d

File tree

2 files changed

+60
-16
lines changed

2 files changed

+60
-16
lines changed

main.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,19 +145,40 @@ def _delete_blobs(
145145
)
146146

147147

148+
def _fetch_blobs(bucket, blob_type="downloads", past_partition=None, partition=None):
149+
# Get the processed files we're loading
150+
151+
if past_partition is not None:
152+
folder = f"processed/{past_partition}"
153+
prefix = f"{folder}/{blob_type}-"
154+
source_blobs = list(
155+
bucket.list_blobs(prefix=prefix, max_results=MAX_BLOBS_PER_RUN)
156+
)
157+
if len(source_blobs) > 0:
158+
return (source_blobs, prefix)
159+
160+
folder = f"processed/{partition}"
161+
prefix = f"{folder}/{blob_type}-"
162+
source_blobs = list(bucket.list_blobs(prefix=prefix, max_results=MAX_BLOBS_PER_RUN))
163+
return (source_blobs, prefix)
164+
165+
148166
def load_processed_files_into_bigquery(event, context):
149167
continue_publishing = False
150168
if "attributes" in event and "partition" in event["attributes"]:
151169
# Check to see if we've manually triggered the function and provided a partition
170+
past_partition = None
152171
partition = event["attributes"]["partition"]
153172
if "continue_publishing" in event["attributes"]:
154173
continue_publishing = bool(event["attributes"]["continue_publishing"])
155174
else:
156175
# Otherwise, this was triggered via cron, use the current time
176+
# checking the past day first
177+
past_partition = (
178+
datetime.datetime.utcnow() - datetime.timedelta(days=1)
179+
).strftime("%Y%m%d")
157180
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
158181

159-
folder = f"processed/{partition}"
160-
161182
# Load the data into the dataset(s)
162183
job_config = bigquery.LoadJobConfig()
163184
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
@@ -168,17 +189,17 @@ def load_processed_files_into_bigquery(event, context):
168189

169190
bigquery_client = bigquery.Client()
170191

171-
# Get the processed files we're loading
172-
download_prefix = f"{folder}/downloads-"
173-
download_source_blobs = list(
174-
bucket.list_blobs(prefix=download_prefix, max_results=MAX_BLOBS_PER_RUN)
192+
download_source_blobs, download_prefix = _fetch_blobs(
193+
bucket,
194+
blob_type="downloads",
195+
past_partition=past_partition,
196+
partition=partition,
175197
)
176198
download_source_uris = [
177199
f"gs://{blob.bucket.name}/{blob.name}" for blob in download_source_blobs
178200
]
179-
simple_prefix = f"{folder}/simple-"
180-
simple_source_blobs = list(
181-
bucket.list_blobs(prefix=simple_prefix, max_results=MAX_BLOBS_PER_RUN)
201+
simple_source_blobs, simple_prefix = _fetch_blobs(
202+
bucket, blob_type="simple", past_partition=past_partition, partition=partition
182203
)
183204
simple_source_uris = [
184205
f"gs://{blob.bucket.name}/{blob.name}" for blob in simple_source_blobs

test_functions.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,32 @@ def upload_from_file(self, file_handler, rewind=False):
115115
],
116116
)
117117
@pytest.mark.parametrize(
118-
"blobs, expected_load_jobs, expected_delete_calls",
118+
"blobs, simple_fetch_current, expected_load_jobs, expected_delete_calls",
119119
[
120-
({"simple": [], "downloads": ["blob0", "blob1", "blob2"]}, 1, 3),
121-
({"simple": ["blob0", "blob1", "blob2"], "downloads": []}, 1, 3),
120+
({"simple": [], "downloads": ["blob0", "blob1", "blob2"]}, True, 1, 3),
121+
({"simple": ["blob0", "blob1", "blob2"], "downloads": []}, True, 1, 3),
122122
(
123123
{
124124
"simple": ["blob0", "blob1", "blob2"],
125125
"downloads": ["blob0", "blob1", "blob2"],
126126
},
127+
True,
127128
2,
128129
6,
129130
),
131+
(
132+
{"simple": ["pastblob0", "pastblob1"], "downloads": ["blob0", "blob1"]},
133+
False,
134+
2,
135+
4,
136+
),
130137
],
131138
)
132139
def test_load_processed_files_into_bigquery(
133140
monkeypatch,
134141
bigquery_dataset,
135142
blobs,
143+
simple_fetch_current,
136144
expected_load_jobs,
137145
expected_delete_calls,
138146
):
@@ -150,11 +158,22 @@ def test_load_processed_files_into_bigquery(
150158
name="blobname", bucket=bucket, delete=pretend.call_recorder(lambda: None)
151159
)
152160

161+
past_partition = (datetime.datetime.utcnow() - datetime.timedelta(days=1)).strftime(
162+
"%Y%m%d"
163+
)
164+
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
165+
153166
def _generate_blob_list(prefix, max_results):
154167
if "simple" in prefix:
155-
_blobs = blobs["simple"]
168+
if past_partition in prefix:
169+
_blobs = [b for b in blobs["simple"] if b.startswith("past")]
170+
else:
171+
_blobs = blobs["simple"]
156172
elif "downloads" in prefix:
157-
_blobs = blobs["downloads"]
173+
if past_partition in prefix:
174+
_blobs = [b for b in blobs["downloads"] if b.startswith("past")]
175+
else:
176+
_blobs = blobs["downloads"]
158177
else:
159178
_blobs = []
160179
blob_list = [blob_stub for b in _blobs]
@@ -204,17 +223,21 @@ def fake_batch(*a, **kw):
204223

205224
event = {}
206225
context = pretend.stub()
207-
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
208226

209227
main.load_processed_files_into_bigquery(event, context)
210228

211229
assert storage_client_stub.bucket.calls == [
212230
pretend.call(RESULT_BUCKET),
213231
]
214-
assert bucket_stub.list_blobs.calls == [
232+
expected_list_blob_calls = [
233+
pretend.call(prefix=f"processed/{past_partition}/downloads-", max_results=1000),
215234
pretend.call(prefix=f"processed/{partition}/downloads-", max_results=1000),
235+
pretend.call(prefix=f"processed/{past_partition}/simple-", max_results=1000),
216236
pretend.call(prefix=f"processed/{partition}/simple-", max_results=1000),
217237
]
238+
if not simple_fetch_current:
239+
expected_list_blob_calls = expected_list_blob_calls[:3]
240+
assert bucket_stub.list_blobs.calls == expected_list_blob_calls
218241
assert (
219242
load_job_stub.result.calls
220243
== [pretend.call()] * len(bigquery_dataset.split()) * expected_load_jobs

0 commit comments

Comments
 (0)