Skip to content

Commit 33880cc

Browse files
authored
Fix prefixes and source URIs (#57)
* Fix prefixes and source URIs * Update test
1 parent 46438fb commit 33880cc

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

main.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def load_processed_files_into_bigquery(event, context):
124124
# Otherwise, this was triggered via cron, use the current time
125125
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
126126

127-
folder = f"gs://{RESULT_BUCKET}/processed/{partition}"
127+
folder = f"processed/{partition}"
128128

129129
# Load the data into the dataset(s)
130130
job_config = bigquery.LoadJobConfig()
@@ -137,10 +137,16 @@ def load_processed_files_into_bigquery(event, context):
137137
bigquery_client = bigquery.Client()
138138

139139
# Get the processed files we're loading
140-
download_prefix = f"{folder}/downloads-*.json"
141-
download_source_uris = bucket.list_blobs(prefix=download_prefix)
142-
simple_prefix = f"{folder}/simple-*.json"
143-
simple_source_uris = bucket.list_blobs(prefix=simple_prefix)
140+
download_prefix = f"{folder}/downloads-"
141+
download_source_blobs = bucket.list_blobs(prefix=download_prefix)
142+
download_source_uris = [
143+
f"gs://{blob.bucket.name}/{blob.name}" for blob in download_source_blobs
144+
]
145+
simple_prefix = f"{folder}/simple-"
146+
simple_source_blobs = bucket.list_blobs(prefix=simple_prefix)
147+
simple_source_uris = [
148+
f"gs://{blob.bucket.name}/{blob.name}" for blob in simple_source_blobs
149+
]
144150

145151
for DATASET in DATASETS:
146152
dataset_ref = bigquery.dataset.DatasetReference.from_string(

test_functions.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,12 @@ def test_load_processed_files_into_bigquery(
147147

148148
blob_lists = {}
149149

150+
bucket = pretend.stub(name=RESULT_BUCKET)
151+
152+
blob_stub = pretend.stub(name="blobname", bucket=bucket)
153+
150154
def _generate_blob_list(prefix):
151-
blob_list = pretend.stub(prefix=prefix)
155+
blob_list = [blob_stub]
152156
blob_lists[prefix] = blob_list
153157
return blob_list
154158

@@ -199,17 +203,14 @@ def _generate_blob_list(prefix):
199203
pretend.call(RESULT_BUCKET),
200204
]
201205
assert bucket_stub.list_blobs.calls == [
202-
pretend.call(
203-
prefix=f"gs://my-result-bucket/processed/{partition}/downloads-*.json"
204-
),
205-
pretend.call(
206-
prefix=f"gs://my-result-bucket/processed/{partition}/simple-*.json"
207-
),
206+
pretend.call(prefix=f"processed/{partition}/downloads-"),
207+
pretend.call(prefix=f"processed/{partition}/simple-"),
208208
]
209209
assert (
210210
load_job_stub.result.calls
211211
== [pretend.call()] * len(bigquery_dataset.split()) * 2
212212
)
213-
assert set(bucket_stub.delete_blobs.calls) == set(
214-
pretend.call(blobs=blobs) for blobs in blob_lists.values()
213+
assert (
214+
bucket_stub.delete_blobs.calls
215+
== [pretend.call(blobs=[f"gs://{RESULT_BUCKET}/{blob_stub.name}"])] * 2
215216
)

0 commit comments

Comments
 (0)