|
16 | 16 |
|
17 | 17 | from google.cloud import datastore
|
18 | 18 | from google.cloud import storage
|
| 19 | +from google.cloud.exceptions import NotFound |
19 | 20 | from google.cloud.storage import retry
|
20 | 21 | from google.cloud.datastore.query import PropertyFilter
|
21 | 22 |
|
22 | 23 | import argparse
|
23 | 24 | import os
|
24 | 25 | import functools
|
25 | 26 |
|
26 |
| -MAX_BATCH_SIZE = 500 |
27 | 27 | MAX_QUERY_SIZE = 30
|
28 | 28 |
|
29 | 29 |
|
@@ -168,29 +168,34 @@ def main() -> None:
|
168 | 168 | result_to_fix = [r for r in result if r['source_of_truth'] == 2]
|
169 | 169 | print(f"There are {len(result_to_fix)} bugs to operate on...")
|
170 | 170 |
|
171 |
| - # Chunk the results to modify in acceptibly sized batches for the API. |
172 |
| - for batch in range(0, len(result_to_fix), MAX_BATCH_SIZE): |
173 |
| - try: |
174 |
| - with ds_client.transaction() as xact: |
175 |
| - for bug in result_to_fix[batch:batch + MAX_BATCH_SIZE]: |
176 |
| - bug_in_gcs = objname_for_bug(ds_client, bug) |
177 |
| - if args.verbose: |
178 |
| - print(f"Resetting creation time for {bug_in_gcs['uri']}") |
179 |
| - if not args.dryrun: |
| 171 | + try: |
| 172 | + with ds_client.transaction() as xact: |
| 173 | + for bug in result_to_fix: |
| 174 | + bug_in_gcs = objname_for_bug(ds_client, bug) |
| 175 | + if args.verbose: |
| 176 | + print(f"Resetting creation time for {bug_in_gcs['uri']}") |
| 177 | + if not args.dryrun: |
| 178 | + try: |
180 | 179 | reset_object_creation(bug_in_gcs["bucket"], bug_in_gcs["path"],
|
181 | 180 | args.tmpdir)
|
182 |
| - bug["import_last_modified"] = None |
183 |
| - if args.verbose: |
184 |
| - print(f"Resetting import_last_modified for {bug['db_id']}") |
185 |
| - print(f"Review at {url_base}{bug['db_id']} when reimport completes.") |
186 |
| - xact.put(bug) |
187 |
| - if args.dryrun: |
188 |
| - raise Exception("Dry run mode. Preventing transaction from commiting") # pylint: disable=broad-exception-raised |
189 |
| - except Exception as e: |
190 |
| - # Don't have the first batch's transaction-aborting exception stop |
191 |
| - # subsequent batches from being attempted. |
192 |
| - if args.dryrun and e.args[0].startswith("Dry run mode"): |
193 |
| - pass |
| 181 | + except NotFound as e: |
| 182 | + if args.verbose: |
| 183 | + print(f"Skipping, got {e}\n") |
| 184 | + continue |
| 185 | + bug["import_last_modified"] = None |
| 186 | + if args.verbose: |
| 187 | + print(f"Resetting import_last_modified for {bug['db_id']}") |
| 188 | + print(f"Review at {url_base}{bug['db_id']} when reimport completes.") |
| 189 | + xact.put(bug) |
| 190 | + if args.dryrun: |
| 191 | + raise Exception("Dry run mode. Preventing transaction from commiting") # pylint: disable=broad-exception-raised |
| 192 | + except Exception as e: |
| 193 | + # Don't have the first batch's transaction-aborting exception stop |
| 194 | + # subsequent batches from being attempted. |
| 195 | + if args.dryrun and e.args[0].startswith("Dry run mode"): |
| 196 | + pass |
| 197 | + else: |
| 198 | + raise |
194 | 199 |
|
195 | 200 |
|
196 | 201 | if __name__ == "__main__":
|
|
0 commit comments