Skip to content

Commit 5bbd867

Browse files
authored
Handle multiple datasets (#5)
* Update test to monkeypatch env vars * Handle multiple datasets * Split on whitespace instead
1 parent f2cb6a9 commit 5bbd867

File tree

2 files changed

+74
-47
lines changed

2 files changed

+74
-47
lines changed

main.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
arrow.Arrow, lambda o: o.format("YYYY-MM-DD HH:mm:ss ZZ")
2020
)
2121

22-
23-
DATASET = os.environ.get("BIGQUERY_DATASET")
22+
# Multiple datasets can be specified by separating them with whitespace
23+
DATASETS = os.environ.get("BIGQUERY_DATASET", "").strip().split()
2424
SIMPLE_TABLE = os.environ.get("BIGQUERY_SIMPLE_TABLE")
2525
DOWNLOAD_TABLE = os.environ.get("BIGQUERY_DOWNLOAD_TABLE")
2626
RESULT_BUCKET = os.environ.get("RESULT_BUCKET")
@@ -81,37 +81,43 @@ def process_fastly_log(data, context):
8181
f"Processed gs://{data['bucket']}/{data['name']}: {total} lines, {simple_lines} simple_requests, {download_lines} file_downloads, {unprocessed_lines} unprocessed"
8282
)
8383

84-
dataset_ref = bigquery_client.dataset(DATASET)
85-
84+
# Load the data into the dataset(s)
8685
job_config = bigquery.LoadJobConfig()
8786
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
8887
job_config.ignore_unknown_values = True
8988

90-
if download_lines > 0:
91-
load_job = bigquery_client.load_table_from_file(
92-
download_results_file,
93-
dataset_ref.table(DOWNLOAD_TABLE),
94-
job_id_prefix="linehaul_file_downloads",
95-
location="US",
96-
job_config=job_config,
97-
rewind=True,
98-
)
99-
load_job.result()
100-
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}")
101-
102-
if simple_lines > 0:
103-
load_job = bigquery_client.load_table_from_file(
104-
simple_results_file,
105-
dataset_ref.table(SIMPLE_TABLE),
106-
job_id_prefix="linehaul_file_downloads",
107-
location="US",
108-
job_config=job_config,
109-
rewind=True,
110-
)
111-
load_job.result()
112-
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}")
113-
114-
bucket = storage_client.bucket(RESULT_BUCKET)
89+
for DATASET in DATASETS:
90+
dataset_ref = bigquery_client.dataset(DATASET)
91+
if download_lines > 0:
92+
load_job = bigquery_client.load_table_from_file(
93+
download_results_file,
94+
dataset_ref.table(DOWNLOAD_TABLE),
95+
job_id_prefix="linehaul_file_downloads",
96+
location="US",
97+
job_config=job_config,
98+
rewind=True,
99+
)
100+
load_job.result()
101+
print(
102+
f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}"
103+
)
104+
105+
if simple_lines > 0:
106+
load_job = bigquery_client.load_table_from_file(
107+
simple_results_file,
108+
dataset_ref.table(SIMPLE_TABLE),
109+
job_id_prefix="linehaul_file_downloads",
110+
location="US",
111+
job_config=job_config,
112+
rewind=True,
113+
)
114+
load_job.result()
115+
print(
116+
f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}"
117+
)
118+
119+
bucket = storage_client.bucket(RESULT_BUCKET)
120+
115121
if unprocessed_lines > 0:
116122
blob = bucket.blob(f"unprocessed/{default_partition}/{identifier}.txt")
117123
try:
@@ -120,6 +126,7 @@ def process_fastly_log(data, context):
120126
# Be opprotunistic about unprocessed files...
121127
pass
122128

129+
# Remove the log file we processed
123130
try:
124131
bob_logs_log_blob.delete()
125132
except exceptions.NotFound:

test_function.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,58 @@
11
from pathlib import Path
2+
from importlib import reload
23

34
import pretend
45
import pytest
56

67
import main
78

8-
9-
DATASET = "my-bigquery-dataset"
10-
SIMPLE_TABLE = "my-simple-table"
11-
DOWNLOAD_TABLE = "my-download-table"
9+
BIGQUERY_DATASET = "my-bigquery-dataset"
10+
BIGQUERY_SIMPLE_TABLE = "my-simple-table"
11+
BIGQUERY_DOWNLOAD_TABLE = "my-download-table"
1212
RESULT_BUCKET = "my-result-bucket"
1313

1414

15+
@pytest.mark.parametrize(
16+
"bigquery_dataset, expected_dataset_calls",
17+
[
18+
("my-bigquery-dataset", [pretend.call("my-bigquery-dataset")]),
19+
(
20+
"my-bigquery-dataset some-other-dataset",
21+
[pretend.call("my-bigquery-dataset"), pretend.call("some-other-dataset")],
22+
),
23+
],
24+
)
1525
@pytest.mark.parametrize(
1626
"log_filename, table_name, expected",
1727
[
1828
(
1929
"downloads-2021-01-07-20-55-2021-01-07T20-55-00.000-B8Hs_G6d6xN61En2ypwk.log.gz",
20-
DOWNLOAD_TABLE,
30+
BIGQUERY_DOWNLOAD_TABLE,
2131
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/f7/12/ec3f2e203afa394a149911729357aa48affc59c20e2c1c8297a60f33f133/threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "file": {"filename": "threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "version": "2.1.0", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.1.1"}, "python": "3.7.9", "implementation": {"name": "CPython", "version": "3.7.9"}, "distro": {"name": "Debian GNU/Linux", "version": "9", "id": "stretch", "libc": {"lib": "glibc", "version": "2.24"}}, "system": {"name": "Linux", "release": "4.15.0-112-generic"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.0l 10 Sep 2019", "setuptools_version": "47.1.0", "ci": null}}\n'
2232
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/cd/f9/8fad70a3bd011a6be7c5c6067278f006a25341eb39d901fbda307e26804c/django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "file": {"filename": "django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "version": "0.7.9", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "16.04", "id": "xenial", "libc": {"lib": "glibc", "version": "2.23"}}, "system": {"name": "Linux", "release": "4.4.0-1113-aws"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.0.2g 1 Mar 2016", "setuptools_version": "44.1.0", "ci": null}}\n',
2333
),
2434
(
2535
"simple-2021-01-07-20-55-2021-01-07T20-55-00.000-3wuB00t9tqgbGLFI2fSI.log.gz",
26-
SIMPLE_TABLE,
36+
BIGQUERY_SIMPLE_TABLE,
2737
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/azureml-model-management-sdk/", "project": "azureml-model-management-sdk", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.7.5", "implementation": {"name": "CPython", "version": "3.7.5"}, "distro": {"name": "Ubuntu", "version": "18.04", "id": "bionic", "libc": {"lib": "glibc", "version": "2.27"}}, "system": {"name": "Linux", "release": "4.15.0-1092-azure"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1 11 Sep 2018", "setuptools_version": "45.2.0", "ci": null}}\n'
2838
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/pyrsistent/", "project": "pyrsistent", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "20.04", "id": "focal", "libc": {"lib": "glibc", "version": "2.31"}}, "system": {"name": "Linux", "release": "5.4.72-flatcar"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1f 31 Mar 2020", "setuptools_version": "45.2.0", "ci": true}}\n',
2939
),
3040
],
3141
)
32-
def test_function(monkeypatch, log_filename, table_name, expected):
33-
monkeypatch.setattr(main, "DATASET", DATASET)
34-
monkeypatch.setattr(main, "SIMPLE_TABLE", SIMPLE_TABLE)
35-
monkeypatch.setattr(main, "DOWNLOAD_TABLE", DOWNLOAD_TABLE)
36-
monkeypatch.setattr(main, "RESULT_BUCKET", RESULT_BUCKET)
42+
def test_function(
43+
monkeypatch,
44+
log_filename,
45+
table_name,
46+
expected,
47+
bigquery_dataset,
48+
expected_dataset_calls,
49+
):
50+
monkeypatch.setenv("BIGQUERY_DATASET", bigquery_dataset)
51+
monkeypatch.setenv("BIGQUERY_SIMPLE_TABLE", BIGQUERY_SIMPLE_TABLE)
52+
monkeypatch.setenv("BIGQUERY_DOWNLOAD_TABLE", BIGQUERY_DOWNLOAD_TABLE)
53+
monkeypatch.setenv("RESULT_BUCKET", RESULT_BUCKET)
54+
55+
reload(main)
3756

3857
def _download_to_file(file_handler):
3958
with open(Path(".") / "fixtures" / log_filename, "rb") as f:
@@ -85,12 +104,11 @@ def _load_table_from_file(fh, *a, **kw):
85104

86105
main.process_fastly_log(data, context)
87106

88-
assert storage_client_stub.bucket.calls == [
89-
pretend.call("my-bucket"),
107+
assert storage_client_stub.bucket.calls == [pretend.call("my-bucket")] + [
90108
pretend.call(RESULT_BUCKET),
91-
]
109+
] * len(expected_dataset_calls)
92110
assert bucket_stub.get_blob.calls == [pretend.call(log_filename)]
93-
assert bigquery_client_stub.dataset.calls == [pretend.call(DATASET)]
111+
assert bigquery_client_stub.dataset.calls == expected_dataset_calls
94112
assert bigquery_client_stub.load_table_from_file.calls == [
95113
pretend.call(
96114
bigquery_client_stub.load_table_from_file.calls[0].args[0], # shh
@@ -100,8 +118,10 @@ def _load_table_from_file(fh, *a, **kw):
100118
job_config=job_config_stub,
101119
rewind=True,
102120
)
103-
]
104-
assert dataset_stub.table.calls == [pretend.call(table_name)]
121+
] * len(expected_dataset_calls)
122+
assert dataset_stub.table.calls == [pretend.call(table_name)] * len(
123+
expected_dataset_calls
124+
)
105125
assert blob_stub.delete.calls == [pretend.call()]
106-
assert load_job_stub.result.calls == [pretend.call()]
126+
assert load_job_stub.result.calls == [pretend.call()] * len(expected_dataset_calls)
107127
assert load_job_stub._result == expected

0 commit comments

Comments
 (0)