|
13 | 13 | from linehaul.events.parser import parse, Download, Simple
|
14 | 14 |
|
15 | 15 | from google.api_core import exceptions
|
| 16 | +from google.api_core.retry import Retry |
16 | 17 | from google.cloud import bigquery, storage, pubsub_v1
|
17 | 18 |
|
18 | 19 | _cattr = cattr.Converter()
|
@@ -120,6 +121,30 @@ def process_fastly_log(data, context):
|
120 | 121 | pass
|
121 | 122 |
|
122 | 123 |
|
| 124 | +@Retry() |
| 125 | +def _delete_blobs( |
| 126 | + storage_client, |
| 127 | + download_source_blobs, |
| 128 | + download_prefix, |
| 129 | + simple_source_blobs, |
| 130 | + simple_prefix, |
| 131 | +): |
| 132 | + if len(download_source_blobs) > 0: |
| 133 | + with storage_client.batch(): |
| 134 | + for blob in download_source_blobs: |
| 135 | + blob.delete() |
| 136 | + print( |
| 137 | + f"Deleted {len(download_source_blobs)} blobs from gs://{RESULT_BUCKET}/{download_prefix}" |
| 138 | + ) |
| 139 | + if len(simple_source_blobs) > 0: |
| 140 | + with storage_client.batch(): |
| 141 | + for blob in simple_source_blobs: |
| 142 | + blob.delete() |
| 143 | + print( |
| 144 | + f"Deleted {len(simple_source_blobs)} blobs from gs://{RESULT_BUCKET}/{simple_prefix}" |
| 145 | + ) |
| 146 | + |
| 147 | + |
123 | 148 | def load_processed_files_into_bigquery(event, context):
|
124 | 149 | continue_publishing = False
|
125 | 150 | if "attributes" in event and "partition" in event["attributes"]:
|
@@ -188,20 +213,13 @@ def load_processed_files_into_bigquery(event, context):
|
188 | 213 | load_job.result()
|
189 | 214 | print(f"Loaded {load_job.output_rows} rows into {DATASET}:{SIMPLE_TABLE}")
|
190 | 215 |
|
191 |
| - if len(download_source_blobs) > 0: |
192 |
| - with storage_client.batch(): |
193 |
| - for blob in download_source_blobs: |
194 |
| - blob.delete() |
195 |
| - print( |
196 |
| - f"Deleted {len(download_source_blobs)} blobs from gs://{RESULT_BUCKET}/{download_prefix}" |
197 |
| - ) |
198 |
| - if len(simple_source_blobs) > 0: |
199 |
| - with storage_client.batch(): |
200 |
| - for blob in simple_source_blobs: |
201 |
| - blob.delete() |
202 |
| - print( |
203 |
| - f"Deleted {len(simple_source_blobs)} blobs from gs://{RESULT_BUCKET}/{simple_prefix}" |
204 |
| - ) |
| 216 | + _delete_blobs( |
| 217 | + storage_client, |
| 218 | + download_source_blobs, |
| 219 | + download_prefix, |
| 220 | + simple_source_blobs, |
| 221 | + simple_prefix, |
| 222 | + ) |
205 | 223 |
|
206 | 224 | if continue_publishing and (
|
207 | 225 | len(download_source_blobs) > 0 or len(simple_source_blobs) > 0
|
|
0 commit comments