Skip to content

Commit bb76058

Browse files
authored
Split processing out from loading into BQ (#53)
* write results to intermediate bucket rather than loading to bigquery on demand * make blob uri aware of date in file, not date of process time * fix tests * test function completely * restore gutted config for load function
1 parent 8d68205 commit bb76058

4 files changed

+46
-126
lines changed

main.py

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
)
2121

2222
DEFAULT_PROJECT = os.environ.get("GCP_PROJECT", "the-psf")
23+
RESULT_BUCKET = os.environ.get("RESULT_BUCKET")
24+
2325
# Multiple datasets can be specified by separating them with whitespace
2426
# Datasets in other projects can be referenced by using the full dataset id:
2527
# <project_id>.<dataset_name>
@@ -28,16 +30,13 @@
2830
DATASETS = os.environ.get("BIGQUERY_DATASET", "").strip().split()
2931
SIMPLE_TABLE = os.environ.get("BIGQUERY_SIMPLE_TABLE")
3032
DOWNLOAD_TABLE = os.environ.get("BIGQUERY_DOWNLOAD_TABLE")
31-
RESULT_BUCKET = os.environ.get("RESULT_BUCKET")
3233

3334
prefix = {Simple.__name__: "simple_requests", Download.__name__: "file_downloads"}
3435

3536

3637
def process_fastly_log(data, context):
3738
storage_client = storage.Client()
38-
bigquery_client = bigquery.Client()
39-
identifier = os.path.basename(data["name"]).split("-", 3)[-1].rstrip(".log.gz")
40-
default_partition = datetime.datetime.utcnow().strftime("%Y%m%d")
39+
file_name = os.path.basename(data["name"]).rstrip(".log.gz")
4140

4241
print(f"Beginning processing for gs://{data['bucket']}/{data['name']}")
4342

@@ -59,9 +58,11 @@ def process_fastly_log(data, context):
5958
simple_results_file = stack.enter_context(NamedTemporaryFile())
6059
download_results_file = stack.enter_context(NamedTemporaryFile())
6160

61+
min_timestamp = arrow.utcnow()
6262
for line in input_file:
6363
try:
6464
res = parse(line.decode())
65+
min_timestamp = min(min_timestamp, res.timestamp)
6566
if res is not None:
6667
if res.__class__.__name__ == Simple.__name__:
6768
simple_results_file.write(
@@ -88,47 +89,18 @@ def process_fastly_log(data, context):
8889
f"Processed gs://{data['bucket']}/{data['name']}: {total} lines, {simple_lines} simple_requests, {download_lines} file_downloads, {unprocessed_lines} unprocessed"
8990
)
9091

91-
# Load the data into the dataset(s)
92-
job_config = bigquery.LoadJobConfig()
93-
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
94-
job_config.ignore_unknown_values = True
95-
96-
for DATASET in DATASETS:
97-
dataset_ref = bigquery.dataset.DatasetReference.from_string(
98-
DATASET, default_project=DEFAULT_PROJECT
99-
)
100-
if download_lines > 0:
101-
load_job = bigquery_client.load_table_from_file(
102-
download_results_file,
103-
dataset_ref.table(DOWNLOAD_TABLE),
104-
job_id_prefix="linehaul_file_downloads",
105-
location="US",
106-
job_config=job_config,
107-
rewind=True,
108-
)
109-
load_job.result()
110-
print(
111-
f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}"
112-
)
113-
114-
if simple_lines > 0:
115-
load_job = bigquery_client.load_table_from_file(
116-
simple_results_file,
117-
dataset_ref.table(SIMPLE_TABLE),
118-
job_id_prefix="linehaul_file_downloads",
119-
location="US",
120-
job_config=job_config,
121-
rewind=True,
122-
)
123-
load_job.result()
124-
print(
125-
f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}"
126-
)
127-
128-
bucket = storage_client.bucket(RESULT_BUCKET)
92+
bucket = storage_client.bucket(RESULT_BUCKET)
93+
partition = min_timestamp.strftime("%Y%m%d")
94+
95+
if simple_lines > 0:
96+
blob = bucket.blob(f"processed/{partition}/simple-{file_name}.json")
97+
blob.upload_from_file(simple_results_file, rewind=True)
98+
if download_lines > 0:
99+
blob = bucket.blob(f"processed/{partition}/downloads-{file_name}.json")
100+
blob.upload_from_file(download_results_file, rewind=True)
129101

130102
if unprocessed_lines > 0:
131-
blob = bucket.blob(f"unprocessed/{default_partition}/{identifier}.txt")
103+
blob = bucket.blob(f"unprocessed/{partition}/{file_name}.txt")
132104
try:
133105
blob.upload_from_file(unprocessed_file, rewind=True)
134106
except Exception:

test_function.py

Lines changed: 31 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -7,57 +7,38 @@
77
import main
88

99
GCP_PROJECT = "my-gcp-project"
10-
BIGQUERY_DATASET = "my-bigquery-dataset"
11-
BIGQUERY_SIMPLE_TABLE = "my-simple-table"
12-
BIGQUERY_DOWNLOAD_TABLE = "my-download-table"
1310
RESULT_BUCKET = "my-result-bucket"
1411

15-
16-
@pytest.mark.parametrize(
17-
"bigquery_dataset, expected_from_string_calls",
18-
[
19-
(
20-
"my-bigquery-dataset",
21-
[pretend.call("my-bigquery-dataset", default_project=GCP_PROJECT)],
22-
),
23-
(
24-
"my-bigquery-dataset some-other-dataset",
25-
[
26-
pretend.call("my-bigquery-dataset", default_project=GCP_PROJECT),
27-
pretend.call("some-other-dataset", default_project=GCP_PROJECT),
28-
],
29-
),
30-
],
31-
)
3212
@pytest.mark.parametrize(
33-
"log_filename, table_name, expected",
13+
"log_filename, expected_data, expected_unprocessed, expected_unprocessed_filename, expected_data_filename",
3414
[
3515
(
3616
"downloads-2021-01-07-20-55-2021-01-07T20-55-00.000-B8Hs_G6d6xN61En2ypwk.log.gz",
37-
BIGQUERY_DOWNLOAD_TABLE,
3817
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/f7/12/ec3f2e203afa394a149911729357aa48affc59c20e2c1c8297a60f33f133/threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "file": {"filename": "threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "version": "2.1.0", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.1.1"}, "python": "3.7.9", "implementation": {"name": "CPython", "version": "3.7.9"}, "distro": {"name": "Debian GNU/Linux", "version": "9", "id": "stretch", "libc": {"lib": "glibc", "version": "2.24"}}, "system": {"name": "Linux", "release": "4.15.0-112-generic"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.0l 10 Sep 2019", "setuptools_version": "47.1.0", "ci": null}}\n'
3918
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/cd/f9/8fad70a3bd011a6be7c5c6067278f006a25341eb39d901fbda307e26804c/django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "file": {"filename": "django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "version": "0.7.9", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "16.04", "id": "xenial", "libc": {"lib": "glibc", "version": "2.23"}}, "system": {"name": "Linux", "release": "4.4.0-1113-aws"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.0.2g 1 Mar 2016", "setuptools_version": "44.1.0", "ci": null}}\n',
19+
b'download|Thu, 07 Jan 2021 20:54:56 GMT|US|/packages/c5/db/e56e6b4bbac7c4a06de1c50de6fe1ef3810018ae11732a50f15f62c7d050/enum34-1.1.6-py2-none-any.whl|TLSv1.2|ECDHE-RSA-AES128-GCM-SHA256|enum34|1.1.6|bdist_wheel|(null)\n',
20+
"unprocessed/20210107/downloads-2021-01-07-20-55-2021-01-07T20-55-00.000-B8Hs_G6d6xN61En2ypwk.txt",
21+
"processed/20210107/downloads-downloads-2021-01-07-20-55-2021-01-07T20-55-00.000-B8Hs_G6d6xN61En2ypwk.json",
4022
),
4123
(
4224
"simple-2021-01-07-20-55-2021-01-07T20-55-00.000-3wuB00t9tqgbGLFI2fSI.log.gz",
43-
BIGQUERY_SIMPLE_TABLE,
4425
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/azureml-model-management-sdk/", "project": "azureml-model-management-sdk", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.7.5", "implementation": {"name": "CPython", "version": "3.7.5"}, "distro": {"name": "Ubuntu", "version": "18.04", "id": "bionic", "libc": {"lib": "glibc", "version": "2.27"}}, "system": {"name": "Linux", "release": "4.15.0-1092-azure"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1 11 Sep 2018", "setuptools_version": "45.2.0", "ci": null}}\n'
4526
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/pyrsistent/", "project": "pyrsistent", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "20.04", "id": "focal", "libc": {"lib": "glibc", "version": "2.31"}}, "system": {"name": "Linux", "release": "5.4.72-flatcar"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1f 31 Mar 2020", "setuptools_version": "45.2.0", "ci": true}}\n',
27+
b'simple|Thu, 07 Jan 2021 20:54:52 GMT|US|/simple/numpy/|TLSv1.2|ECDHE-RSA-AES128-GCM-SHA256||||(null)\n',
28+
"unprocessed/20210107/simple-2021-01-07-20-55-2021-01-07T20-55-00.000-3wuB00t9tqgbGLFI2fSI.txt",
29+
"processed/20210107/simple-simple-2021-01-07-20-55-2021-01-07T20-55-00.000-3wuB00t9tqgbGLFI2fSI.json",
4630
),
4731
],
4832
)
4933
def test_function(
5034
monkeypatch,
5135
log_filename,
52-
table_name,
53-
expected,
54-
bigquery_dataset,
55-
expected_from_string_calls,
36+
expected_data,
37+
expected_unprocessed,
38+
expected_data_filename,
39+
expected_unprocessed_filename,
5640
):
5741
monkeypatch.setenv("GCP_PROJECT", GCP_PROJECT)
58-
monkeypatch.setenv("BIGQUERY_DATASET", bigquery_dataset)
59-
monkeypatch.setenv("BIGQUERY_SIMPLE_TABLE", BIGQUERY_SIMPLE_TABLE)
60-
monkeypatch.setenv("BIGQUERY_DOWNLOAD_TABLE", BIGQUERY_DOWNLOAD_TABLE)
6142
monkeypatch.setenv("RESULT_BUCKET", RESULT_BUCKET)
6243

6344
reload(main)
@@ -66,46 +47,28 @@ def _download_to_file(file_handler):
6647
with open(Path(".") / "fixtures" / log_filename, "rb") as f:
6748
file_handler.write(f.read())
6849

69-
blob_stub = pretend.stub(
50+
get_blob_stub = pretend.stub(
7051
download_to_file=_download_to_file, delete=pretend.call_recorder(lambda: None),
7152
)
72-
bucket_stub = pretend.stub(get_blob=pretend.call_recorder(lambda a: blob_stub),)
73-
storage_client_stub = pretend.stub(
74-
bucket=pretend.call_recorder(lambda a: bucket_stub),
75-
)
76-
monkeypatch.setattr(
77-
main, "storage", pretend.stub(Client=lambda: storage_client_stub)
78-
)
7953

80-
table_stub = pretend.stub()
81-
dataset_stub = pretend.stub(table=pretend.call_recorder(lambda a: table_stub))
82-
load_job_stub = pretend.stub(
83-
result=pretend.call_recorder(lambda: None), output_rows=pretend.stub(),
84-
)
54+
blobs = {}
55+
class Blob(object):
56+
def __init__(self, blob_uri):
57+
self.uri = blob_uri
58+
self.data = None
59+
blobs[blob_uri] = self
8560

86-
def _load_table_from_file(fh, *a, **kw):
87-
fh.flush()
88-
with open(fh.name, "rb") as f:
89-
load_job_stub._result = f.read()
90-
return load_job_stub
61+
def upload_from_file(self, file_handler, rewind=False):
62+
if rewind:
63+
file_handler.seek(0)
64+
self.data = file_handler.read()
9165

92-
bigquery_client_stub = pretend.stub(
93-
load_table_from_file=pretend.call_recorder(_load_table_from_file),
94-
)
95-
job_config_stub = pretend.stub()
96-
dataset_reference_stub = pretend.stub(
97-
from_string=pretend.call_recorder(lambda *a, **kw: dataset_stub)
66+
bucket_stub = pretend.stub(get_blob=pretend.call_recorder(lambda a: get_blob_stub), blob=pretend.call_recorder(lambda a: Blob(a)),)
67+
storage_client_stub = pretend.stub(
68+
bucket=pretend.call_recorder(lambda a: bucket_stub),
9869
)
99-
10070
monkeypatch.setattr(
101-
main,
102-
"bigquery",
103-
pretend.stub(
104-
Client=lambda: bigquery_client_stub,
105-
LoadJobConfig=lambda: job_config_stub,
106-
SourceFormat=pretend.stub(NEWLINE_DELIMITED_JSON=pretend.stub()),
107-
dataset=pretend.stub(DatasetReference=dataset_reference_stub),
108-
),
71+
main, "storage", pretend.stub(Client=lambda: storage_client_stub)
10972
)
11073

11174
data = {
@@ -118,24 +81,9 @@ def _load_table_from_file(fh, *a, **kw):
11881

11982
assert storage_client_stub.bucket.calls == [pretend.call("my-bucket")] + [
12083
pretend.call(RESULT_BUCKET),
121-
] * len(expected_from_string_calls)
84+
]
12285
assert bucket_stub.get_blob.calls == [pretend.call(log_filename)]
123-
assert dataset_reference_stub.from_string.calls == expected_from_string_calls
124-
assert bigquery_client_stub.load_table_from_file.calls == [
125-
pretend.call(
126-
bigquery_client_stub.load_table_from_file.calls[0].args[0], # shh
127-
table_stub,
128-
job_id_prefix="linehaul_file_downloads",
129-
location="US",
130-
job_config=job_config_stub,
131-
rewind=True,
132-
)
133-
] * len(expected_from_string_calls)
134-
assert dataset_stub.table.calls == [pretend.call(table_name)] * len(
135-
expected_from_string_calls
136-
)
137-
assert blob_stub.delete.calls == [pretend.call()]
138-
assert load_job_stub.result.calls == [pretend.call()] * len(
139-
expected_from_string_calls
140-
)
141-
assert load_job_stub._result == expected
86+
assert bucket_stub.blob.calls == [pretend.call(expected_data_filename), pretend.call(expected_unprocessed_filename)]
87+
assert get_blob_stub.delete.calls == [pretend.call()]
88+
assert blobs[expected_data_filename].data == expected_data
89+
assert blobs[expected_unprocessed_filename].data == expected_unprocessed

0 commit comments

Comments
 (0)