Skip to content

Commit 6f8cd1d

Browse files
authored
New function to load processed files (#55)
* New function to load processed files * Rename test file * Add test * Update deployments
1 parent bb76058 commit 6f8cd1d

File tree

4 files changed

+293
-89
lines changed

4 files changed

+293
-89
lines changed

cloudbuild.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,21 @@ steps:
1919
'--entry-point', 'process_fastly_log',
2020
'--retry'
2121
]
22+
- name: 'gcr.io/cloud-builders/gcloud'
23+
args: [
24+
'functions', 'deploy', 'linehaul-staging-publisher',
25+
'--trigger-topic', 'linehaul-staging-publisher-topic',
26+
'--runtime', 'python37',
27+
'--source', '.',
28+
'--entry-point', 'load_processed_files_into_bigquery',
29+
'--retry'
30+
]
31+
- name: 'gcr.io/cloud-builders/gcloud'
32+
args: [
33+
'functions', 'deploy', 'linehaul-publisher',
34+
'--trigger-topic', 'linehaul-publisher-topic',
35+
'--runtime', 'python37',
36+
'--source', '.',
37+
'--entry-point', 'load_processed_files_into_bigquery',
38+
'--retry'
39+
]

main.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import arrow
22
import cattr
33

4+
import base64
45
import datetime
56
import os
67
import json
@@ -113,3 +114,62 @@ def process_fastly_log(data, context):
113114
except exceptions.NotFound:
114115
# Sometimes we try to delete twice
115116
pass
117+
118+
119+
def load_processed_files_into_bigquery(event, context):
120+
if "attributes" in event and "partition" in event["attributes"]:
121+
# Check to see if we've manually triggered the function and provided a partition
122+
partition = event["attributes"]["partition"]
123+
else:
124+
# Otherwise, this was triggered via cron, use the current time
125+
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
126+
127+
folder = f"gs://{RESULT_BUCKET}/processed/{partition}"
128+
129+
# Load the data into the dataset(s)
130+
job_config = bigquery.LoadJobConfig()
131+
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
132+
job_config.ignore_unknown_values = True
133+
134+
storage_client = storage.Client()
135+
bucket = storage_client.bucket(RESULT_BUCKET)
136+
137+
bigquery_client = bigquery.Client()
138+
139+
# Get the processed files we're loading
140+
download_prefix = f"{folder}/downloads-*.json"
141+
download_source_uris = bucket.list_blobs(prefix=download_prefix)
142+
simple_prefix = f"{folder}/simple-*.json"
143+
simple_source_uris = bucket.list_blobs(prefix=simple_prefix)
144+
145+
for DATASET in DATASETS:
146+
dataset_ref = bigquery.dataset.DatasetReference.from_string(
147+
DATASET, default_project=DEFAULT_PROJECT
148+
)
149+
150+
# Load the files for the downloads table
151+
load_job = bigquery_client.load_table_from_uri(
152+
download_source_uris,
153+
dataset_ref.table(DOWNLOAD_TABLE),
154+
job_id_prefix="linehaul_file_downloads",
155+
location="US",
156+
job_config=job_config,
157+
rewind=True,
158+
)
159+
load_job.result()
160+
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}")
161+
162+
# Load the files for the simple table
163+
load_job = bigquery_client.load_table_from_uri(
164+
simple_source_uris,
165+
dataset_ref.table(SIMPLE_TABLE),
166+
job_id_prefix="linehaul_file_downloads",
167+
location="US",
168+
job_config=job_config,
169+
rewind=True,
170+
)
171+
load_job.result()
172+
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}")
173+
174+
bucket.delete_blobs(blobs=download_source_uris)
175+
bucket.delete_blobs(blobs=simple_source_uris)

test_function.py

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)