Skip to content

Commit fb416cf

Browse files
authored
allow for continuance of processing a given partition (#64)
* allow for continuance of processing a given partition * upgrayd * upgrade google deps
1 parent 1d21e23 commit fb416cf

File tree

4 files changed

+164
-80
lines changed

4 files changed

+164
-80
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ jobs:
1111
uses: actions/setup-python@v2
1212
with:
1313
python-version: '3.7'
14+
- name: Upgrade python tooling
15+
run: python -m pip install --upgrade pip setuptools wheel
1416
- name: Install dependencies
1517
run: python -m pip install -r requirements.txt
1618
- name: Install test dependencies

main.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from linehaul.events.parser import parse, Download, Simple
1414

1515
from google.api_core import exceptions
16-
from google.cloud import bigquery, storage
16+
from google.cloud import bigquery, storage, pubsub_v1
1717

1818
_cattr = cattr.Converter()
1919
_cattr.register_unstructure_hook(
@@ -22,6 +22,7 @@
2222

2323
DEFAULT_PROJECT = os.environ.get("GCP_PROJECT", "the-psf")
2424
RESULT_BUCKET = os.environ.get("RESULT_BUCKET")
25+
PUBSUB_TOPIC = os.environ.get("PUBSUB_TOPIC")
2526

2627
# Multiple datasets can be specified by separating them with whitespace
2728
# Datasets in other projects can be referenced by using the full dataset id:
@@ -120,9 +121,12 @@ def process_fastly_log(data, context):
120121

121122

122123
def load_processed_files_into_bigquery(event, context):
124+
continue_publishing = False
123125
if "attributes" in event and "partition" in event["attributes"]:
124126
# Check to see if we've manually triggered the function and provided a partition
125127
partition = event["attributes"]["partition"]
128+
if "continue_publishing" in event["attributes"]:
129+
continue_publishing = bool(event["attributes"]["continue_publishing"])
126130
else:
127131
# Otherwise, this was triggered via cron, use the current time
128132
partition = datetime.datetime.utcnow().strftime("%Y%m%d")
@@ -192,3 +196,14 @@ def load_processed_files_into_bigquery(event, context):
192196
with storage_client.batch():
193197
for blob in simple_source_blobs:
194198
blob.delete()
199+
200+
if continue_publishing and (
201+
len(download_source_blobs) > 0 or len(simple_source_blobs) > 0
202+
):
203+
publisher = pubsub_v1.PublisherClient()
204+
topic_path = publisher.topic_path(DEFAULT_PROJECT, PUBSUB_TOPIC)
205+
publisher.publish(
206+
topic_path,
207+
partition=partition,
208+
continue_publishing=str(continue_publishing),
209+
)

requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ packaging
44
pyparsing
55
google-cloud-storage
66
google-cloud-bigquery
7+
google-cloud-pubsub

0 commit comments

Comments
 (0)