Skip to content

Commit 352ae94

Browse files
authored
fixup publisher (#59)
* fixup publisher * one batch
1 parent aee40d1 commit 352ae94

File tree

2 files changed

+67
-54
lines changed

2 files changed

+67
-54
lines changed

main.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
DATASETS = os.environ.get("BIGQUERY_DATASET", "").strip().split()
3232
SIMPLE_TABLE = os.environ.get("BIGQUERY_SIMPLE_TABLE")
3333
DOWNLOAD_TABLE = os.environ.get("BIGQUERY_DOWNLOAD_TABLE")
34-
MAX_BLOBS_PER_RUN = int(os.environ.get("MAX_BLOBS_PER_RUN", '5000')) # Cannot exceed 10,000
34+
MAX_BLOBS_PER_RUN = int(
35+
os.environ.get("MAX_BLOBS_PER_RUN", "5000")
36+
) # Cannot exceed 10,000
3537

3638
prefix = {Simple.__name__: "simple_requests", Download.__name__: "file_downloads"}
3739

@@ -139,15 +141,15 @@ def load_processed_files_into_bigquery(event, context):
139141

140142
# Get the processed files we're loading
141143
download_prefix = f"{folder}/downloads-"
142-
download_source_blobs = bucket.list_blobs(
143-
prefix=download_prefix, max_results=MAX_BLOBS_PER_RUN
144+
download_source_blobs = list(
145+
bucket.list_blobs(prefix=download_prefix, max_results=MAX_BLOBS_PER_RUN)
144146
)
145147
download_source_uris = [
146148
f"gs://{blob.bucket.name}/{blob.name}" for blob in download_source_blobs
147149
]
148150
simple_prefix = f"{folder}/simple-"
149-
simple_source_blobs = bucket.list_blobs(
150-
prefix=simple_prefix, max_results=MAX_BLOBS_PER_RUN
151+
simple_source_blobs = list(
152+
bucket.list_blobs(prefix=simple_prefix, max_results=MAX_BLOBS_PER_RUN)
151153
)
152154
simple_source_uris = [
153155
f"gs://{blob.bucket.name}/{blob.name}" for blob in simple_source_blobs
@@ -158,27 +160,32 @@ def load_processed_files_into_bigquery(event, context):
158160
DATASET, default_project=DEFAULT_PROJECT
159161
)
160162

161-
# Load the files for the downloads table
162-
load_job = bigquery_client.load_table_from_uri(
163-
download_source_uris,
164-
dataset_ref.table(DOWNLOAD_TABLE),
165-
job_id_prefix="linehaul_file_downloads",
166-
location="US",
167-
job_config=job_config,
168-
)
169-
load_job.result()
170-
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}")
171-
172-
# Load the files for the simple table
173-
load_job = bigquery_client.load_table_from_uri(
174-
simple_source_uris,
175-
dataset_ref.table(SIMPLE_TABLE),
176-
job_id_prefix="linehaul_simple_requests",
177-
location="US",
178-
job_config=job_config,
179-
)
180-
load_job.result()
181-
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}")
182-
183-
bucket.delete_blobs(blobs=download_source_uris)
184-
bucket.delete_blobs(blobs=simple_source_uris)
163+
if len(download_source_uris) > 0:
164+
# Load the files for the downloads table
165+
load_job = bigquery_client.load_table_from_uri(
166+
download_source_uris,
167+
dataset_ref.table(DOWNLOAD_TABLE),
168+
job_id_prefix="linehaul_file_downloads",
169+
location="US",
170+
job_config=job_config,
171+
)
172+
load_job.result()
173+
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{DOWNLOAD_TABLE}")
174+
175+
if len(simple_source_uris) > 0:
176+
# Load the files for the simple table
177+
load_job = bigquery_client.load_table_from_uri(
178+
simple_source_uris,
179+
dataset_ref.table(SIMPLE_TABLE),
180+
job_id_prefix="linehaul_simple_requests",
181+
location="US",
182+
job_config=job_config,
183+
)
184+
load_job.result()
185+
print(f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}")
186+
187+
with storage_client.batch():
188+
for blob in download_source_blobs:
189+
blob.delete()
190+
for blob in simple_source_blobs:
191+
blob.delete()

test_functions.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import datetime
23
from pathlib import Path
34
from importlib import reload
@@ -114,28 +115,26 @@ def upload_from_file(self, file_handler, rewind=False):
114115
],
115116
)
116117
@pytest.mark.parametrize(
117-
"log_filename, table_name, expected",
118+
"blobs, expected_load_jobs, expected_delete_calls",
118119
[
120+
({"simple": [], "downloads": ["blob0", "blob1", "blob2"]}, 1, 3),
121+
({"simple": ["blob0", "blob1", "blob2"], "downloads": []}, 1, 3),
119122
(
120-
"downloads-2021-01-07-20-55-2021-01-07T20-55-00.000-B8Hs_G6d6xN61En2ypwk.log.gz",
121-
BIGQUERY_DOWNLOAD_TABLE,
122-
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/f7/12/ec3f2e203afa394a149911729357aa48affc59c20e2c1c8297a60f33f133/threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "file": {"filename": "threadpoolctl-2.1.0-py3-none-any.whl", "project": "threadpoolctl", "version": "2.1.0", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.1.1"}, "python": "3.7.9", "implementation": {"name": "CPython", "version": "3.7.9"}, "distro": {"name": "Debian GNU/Linux", "version": "9", "id": "stretch", "libc": {"lib": "glibc", "version": "2.24"}}, "system": {"name": "Linux", "release": "4.15.0-112-generic"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.0l 10 Sep 2019", "setuptools_version": "47.1.0", "ci": null}}\n'
123-
b'{"timestamp": "2021-01-07 20:54:54 +00:00", "url": "/packages/cd/f9/8fad70a3bd011a6be7c5c6067278f006a25341eb39d901fbda307e26804c/django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "file": {"filename": "django_crum-0.7.9-py2.py3-none-any.whl", "project": "django-crum", "version": "0.7.9", "type": "bdist_wheel"}, "tls_protocol": "TLSv1.2", "tls_cipher": "ECDHE-RSA-AES128-GCM-SHA256", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "16.04", "id": "xenial", "libc": {"lib": "glibc", "version": "2.23"}}, "system": {"name": "Linux", "release": "4.4.0-1113-aws"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.0.2g 1 Mar 2016", "setuptools_version": "44.1.0", "ci": null}}\n',
124-
),
125-
(
126-
"simple-2021-01-07-20-55-2021-01-07T20-55-00.000-3wuB00t9tqgbGLFI2fSI.log.gz",
127-
BIGQUERY_SIMPLE_TABLE,
128-
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/azureml-model-management-sdk/", "project": "azureml-model-management-sdk", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.7.5", "implementation": {"name": "CPython", "version": "3.7.5"}, "distro": {"name": "Ubuntu", "version": "18.04", "id": "bionic", "libc": {"lib": "glibc", "version": "2.27"}}, "system": {"name": "Linux", "release": "4.15.0-1092-azure"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1 11 Sep 2018", "setuptools_version": "45.2.0", "ci": null}}\n'
129-
b'{"timestamp": "2021-01-07 20:54:52 +00:00", "url": "/simple/pyrsistent/", "project": "pyrsistent", "tls_protocol": "TLSv1.3", "tls_cipher": "AES256-GCM", "country_code": "US", "details": {"installer": {"name": "pip", "version": "20.0.2"}, "python": "3.8.5", "implementation": {"name": "CPython", "version": "3.8.5"}, "distro": {"name": "Ubuntu", "version": "20.04", "id": "focal", "libc": {"lib": "glibc", "version": "2.31"}}, "system": {"name": "Linux", "release": "5.4.72-flatcar"}, "cpu": "x86_64", "openssl_version": "OpenSSL 1.1.1f 31 Mar 2020", "setuptools_version": "45.2.0", "ci": true}}\n',
123+
{
124+
"simple": ["blob0", "blob1", "blob2"],
125+
"downloads": ["blob0", "blob1", "blob2"],
126+
},
127+
2,
128+
6,
130129
),
131130
],
132131
)
133132
def test_load_processed_files_into_bigquery(
134133
monkeypatch,
135-
log_filename,
136-
table_name,
137-
expected,
138134
bigquery_dataset,
135+
blobs,
136+
expected_load_jobs,
137+
expected_delete_calls,
139138
):
140139
monkeypatch.setenv("GCP_PROJECT", GCP_PROJECT)
141140
monkeypatch.setenv("BIGQUERY_DATASET", bigquery_dataset)
@@ -145,23 +144,33 @@ def test_load_processed_files_into_bigquery(
145144

146145
reload(main)
147146

148-
blob_lists = {}
149-
150147
bucket = pretend.stub(name=RESULT_BUCKET)
151148

152-
blob_stub = pretend.stub(name="blobname", bucket=bucket)
149+
blob_stub = pretend.stub(
150+
name="blobname", bucket=bucket, delete=pretend.call_recorder(lambda: None)
151+
)
153152

154153
def _generate_blob_list(prefix, max_results):
155-
blob_list = [blob_stub]
156-
blob_lists[prefix] = blob_list
154+
if "simple" in prefix:
155+
_blobs = blobs["simple"]
156+
elif "downloads" in prefix:
157+
_blobs = blobs["downloads"]
158+
else:
159+
_blobs = []
160+
blob_list = [blob_stub for b in _blobs]
157161
return blob_list
158162

159163
bucket_stub = pretend.stub(
160164
list_blobs=pretend.call_recorder(_generate_blob_list),
161-
delete_blobs=pretend.call_recorder(lambda *a, **kw: None),
162165
)
166+
167+
@contextlib.contextmanager
168+
def fake_batch(*a, **kw):
169+
yield True
170+
163171
storage_client_stub = pretend.stub(
164172
bucket=pretend.call_recorder(lambda a: bucket_stub),
173+
batch=fake_batch,
165174
)
166175
monkeypatch.setattr(
167176
main, "storage", pretend.stub(Client=lambda: storage_client_stub)
@@ -208,9 +217,6 @@ def _generate_blob_list(prefix, max_results):
208217
]
209218
assert (
210219
load_job_stub.result.calls
211-
== [pretend.call()] * len(bigquery_dataset.split()) * 2
212-
)
213-
assert (
214-
bucket_stub.delete_blobs.calls
215-
== [pretend.call(blobs=[f"gs://{RESULT_BUCKET}/{blob_stub.name}"])] * 2
220+
== [pretend.call()] * len(bigquery_dataset.split()) * expected_load_jobs
216221
)
222+
assert blob_stub.delete.calls == [pretend.call()] * expected_delete_calls

0 commit comments

Comments
 (0)