Skip to content

Commit 42c39d9

Browse files
authored
fix(importer): batch puts together (#4428)
The sequential puts is holding up the importer. Try batching them to see if it improves things.
1 parent 2518722 commit 42c39d9

File tree

2 files changed

+77
-33
lines changed

2 files changed

+77
-33
lines changed

deployment/clouddeploy/gke-workers/base/importer.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ spec:
2727
resources:
2828
requests:
2929
cpu: "1"
30-
memory: "1G"
30+
memory: "4G"
3131
limits:
3232
cpu: "1"
33-
memory: "2G"
33+
memory: "8G"
3434
nodeSelector:
3535
cloud.google.com/gke-nodepool: importer-pool
3636
restartPolicy: Never

gcp/workers/importer/importer.py

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
_BUCKET_THREAD_COUNT = 20
5757
_HTTP_LAST_MODIFIED_FORMAT = '%a, %d %b %Y %H:%M:%S %Z'
5858
_TIMEOUT_SECONDS = 60
59+
_NDB_PUT_BATCH_SIZE = 500
5960

6061
_client_store = threading.local()
6162

@@ -610,6 +611,7 @@ def _process_updates_git(self, source_repo: osv.SourceRepository):
610611
changed_entries[rel_path] = None
611612

612613
import_failure_logs = []
614+
changed_entries_to_process = []
613615
# Create tasks for changed files.
614616
for changed_entry, ts in changed_entries.items():
615617
path = os.path.join(osv.repo_path(repo), changed_entry)
@@ -641,10 +643,17 @@ def _process_updates_git(self, source_repo: osv.SourceRepository):
641643

642644
logging.info('Re-analysis triggered for %s', changed_entry)
643645
original_sha256 = osv.sha256(path)
644-
put_if_newer(vuln, source_repo.name, path)
645-
646-
self._request_analysis_external(
647-
source_repo, original_sha256, changed_entry, source_timestamp=ts)
646+
# Collect for batch processing
647+
changed_entries_to_process.append(
648+
(vuln, path, ts, original_sha256, changed_entry))
649+
650+
if changed_entries_to_process:
651+
put_if_newer_batch(
652+
[(v, p) for v, p, _, _, _ in changed_entries_to_process],
653+
source_repo.name)
654+
for vuln, path, ts, orig_sha256, entry in changed_entries_to_process:
655+
self._request_analysis_external(
656+
source_repo, orig_sha256, entry, source_timestamp=ts)
648657

649658
replace_importer_log(storage.Client(), source_repo.name,
650659
self._public_log_bucket, import_failure_logs)
@@ -733,8 +742,7 @@ def _process_updates_bucket(self, source_repo: osv.SourceRepository):
733742
logging.info('Requesting analysis of bucket entry: %s/%s',
734743
source_repo.bucket, blob_name)
735744

736-
for vuln in vulns:
737-
put_if_newer(vuln, source_repo.name, blob_name)
745+
put_if_newer_batch([(v, blob_name) for v in vulns], source_repo.name)
738746

739747
self._request_analysis_external(
740748
source_repo,
@@ -952,6 +960,7 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository):
952960
vulns_last_modified = last_update_date
953961
logging.info('%d records to consider', len(vulns))
954962
# Create tasks for changed files.
963+
vulns_to_process = []
955964
for vuln in vulns:
956965
import_failure_logs = []
957966
vuln_modified = vuln.modified.ToDatetime(datetime.UTC)
@@ -981,15 +990,9 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository):
981990
self._record_quality_finding(source_repo.name, bug_id)
982991
continue
983992

984-
put_if_newer(v, source_repo.name, v.id + source_repo.extension)
985-
logging.info('Requesting analysis of REST record: %s',
986-
vuln.id + source_repo.extension)
987993
ts = None if ignore_last_import else vuln_modified
988-
self._request_analysis_external(
989-
source_repo,
990-
osv.sha256_bytes(single_vuln.text.encode()),
991-
vuln.id + source_repo.extension,
992-
source_timestamp=ts)
994+
vulns_to_process.append((v, vuln.id + source_repo.extension, ts,
995+
osv.sha256_bytes(single_vuln.text.encode())))
993996
except osv.sources.KeyPathError:
994997
# Key path doesn't exist in the vulnerability.
995998
# No need to log a full error, as this is expected result.
@@ -1001,6 +1004,13 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository):
10011004
import_failure_logs.append(f'Failed to parse vulnerability "{vuln.id}"')
10021005
continue
10031006

1007+
if vulns_to_process:
1008+
put_if_newer_batch([(v, p) for v, p, _, _ in vulns_to_process],
1009+
source_repo.name)
1010+
for v, path, ts, sha256 in vulns_to_process:
1011+
self._request_analysis_external(
1012+
source_repo, sha256, path, source_timestamp=ts)
1013+
10041014
replace_importer_log(storage.Client(), source_repo.name,
10051015
self._public_log_bucket, import_failure_logs)
10061016

@@ -1235,25 +1245,59 @@ def put_if_newer(vuln: vulnerability_pb2.Vulnerability, source: str, path: str):
12351245
unchanged. Does not write if vuln's modified date is older than what's already
12361246
in datastore.
12371247
"""
1238-
preprocess_vuln(vuln)
1239-
bug = osv.Bug.get_by_id(vuln.id)
1240-
if bug is None:
1241-
bug = new_bug_from_vuln(vuln, source, path)
1242-
bug.put()
1243-
log_update_latency(bug)
1244-
return
1248+
put_if_newer_batch([(vuln, path)], source)
12451249

1246-
# Only update if the incoming vulnerability is newer.
1247-
orig_modified = vuln.modified.ToDatetime(datetime.UTC)
1248-
if bug.import_last_modified and orig_modified <= bug.import_last_modified:
1249-
logging.info(
1250-
'Skipping update for %s because incoming modification time'
1251-
' (%s) is not newer than existing record (%s)', vuln.id, orig_modified,
1252-
bug.import_last_modified)
1250+
1251+
def put_if_newer_batch(
1252+
vulns_and_paths: list[tuple[vulnerability_pb2.Vulnerability,
1253+
str]], source: str):
1254+
"""Try to write vulnerabilities to datastore in batch, keeping enumerated
1255+
versions if unchanged. Does not write if vuln's modified date is older than
1256+
what's already in datastore.
1257+
"""
1258+
if not vulns_and_paths:
12531259
return
1254-
update_bug_from_vuln(bug, vuln, source, path)
1255-
bug.put()
1256-
log_update_latency(bug)
1260+
1261+
# Deduplicate by vuln.id, keeping the last one.
1262+
unique_vulns_and_paths = {}
1263+
for vuln, path in vulns_and_paths:
1264+
unique_vulns_and_paths[vuln.id] = (vuln, path)
1265+
vulns_and_paths = list(unique_vulns_and_paths.values())
1266+
1267+
for vuln, _ in vulns_and_paths:
1268+
preprocess_vuln(vuln)
1269+
1270+
keys = [ndb.Key(osv.Bug, v.id) for v, _ in vulns_and_paths]
1271+
existing_bugs = []
1272+
for i in range(0, len(keys), _NDB_PUT_BATCH_SIZE):
1273+
batch_keys = keys[i:i + _NDB_PUT_BATCH_SIZE]
1274+
existing_bugs.extend(ndb.get_multi(batch_keys))
1275+
1276+
bugs_to_put = []
1277+
for i, (vuln, path) in enumerate(vulns_and_paths):
1278+
bug = existing_bugs[i]
1279+
if bug is None:
1280+
bug = new_bug_from_vuln(vuln, source, path)
1281+
bugs_to_put.append(bug)
1282+
continue
1283+
1284+
# Only update if the incoming vulnerability is newer.
1285+
orig_modified = vuln.modified.ToDatetime(datetime.UTC)
1286+
if bug.import_last_modified and orig_modified <= bug.import_last_modified:
1287+
logging.info(
1288+
'Skipping update for %s because incoming modification time'
1289+
' (%s) is not newer than existing record (%s)', vuln.id,
1290+
orig_modified, bug.import_last_modified)
1291+
continue
1292+
update_bug_from_vuln(bug, vuln, source, path)
1293+
bugs_to_put.append(bug)
1294+
1295+
if bugs_to_put:
1296+
for i in range(0, len(bugs_to_put), _NDB_PUT_BATCH_SIZE):
1297+
batch = bugs_to_put[i:i + _NDB_PUT_BATCH_SIZE]
1298+
ndb.put_multi(batch)
1299+
for bug in batch:
1300+
log_update_latency(bug)
12571301

12581302

12591303
def main():

0 commit comments

Comments
 (0)