Skip to content

Commit 22a6b26

Browse files
Undo batch and do names only for 10% speedup
1 parent 015857c commit 22a6b26

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

src/clusterfuzz/_internal/fuzzing/corpus_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ def get_proto_corpus(bucket_name,
665665
urls = itertools.islice(urls, max_download_urls)
666666
corpus_urls = dict(
667667
storage.sign_urls_for_existing_files(urls, include_delete_urls))
668-
668+
669669
upload_urls = storage.get_arbitrary_signed_upload_urls(
670670
gcs_url, num_uploads=max_upload_urls)
671671
corpus = uworker_msg_pb2.Corpus( # pylint: disable=no-member

src/clusterfuzz/_internal/google_cloud_utils/storage.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import collections
1717
import copy
1818
import datetime
19+
import itertools
1920
import json
2021
import os
2122
import shutil
@@ -125,7 +126,7 @@ def get_bucket(self, name):
125126
"""Get a bucket."""
126127
raise NotImplementedError
127128

128-
def list_blobs(self, remote_path, recursive=True):
129+
def list_blobs(self, remote_path, recursive=True, names_only=False):
129130
"""List the blobs under the remote path."""
130131
raise NotImplementedError
131132

@@ -230,7 +231,7 @@ def get_bucket(self, name):
230231

231232
raise
232233

233-
def list_blobs(self, remote_path, recursive=True):
234+
def list_blobs(self, remote_path, recursive=True, names_only=False):
234235
"""List the blobs under the remote path."""
235236
bucket_name, path = get_bucket_name_and_path(remote_path)
236237

@@ -246,7 +247,13 @@ def list_blobs(self, remote_path, recursive=True):
246247
else:
247248
delimiter = '/'
248249

249-
iterator = bucket.list_blobs(prefix=path, delimiter=delimiter)
250+
if names_only:
251+
fields = 'items(name),nextPageToken'
252+
iterator = bucket.list_blobs(
253+
prefix=path, delimiter=delimiter, fields=fields)
254+
else:
255+
iterator = bucket.list_blobs(prefix=path, delimiter=delimiter)
256+
250257
for blob in iterator:
251258
properties['bucket'] = bucket_name
252259
properties['name'] = blob.name
@@ -581,8 +588,9 @@ def _list_files_nonrecursive(self, fs_path):
581588
for filename in os.listdir(fs_path):
582589
yield os.path.join(fs_path, filename)
583590

584-
def list_blobs(self, remote_path, recursive=True):
591+
def list_blobs(self, remote_path, recursive=True, names_only=False):
585592
"""List the blobs under the remote path."""
593+
del names_only
586594
bucket, _ = get_bucket_name_and_path(remote_path)
587595
fs_path = self.convert_path(remote_path)
588596

@@ -1086,7 +1094,7 @@ def get_blobs(cloud_storage_path, recursive=True):
10861094
exception_types=_TRANSIENT_ERRORS)
10871095
def list_blobs(cloud_storage_path, recursive=True):
10881096
"""Return blob names under the given cloud storage path."""
1089-
for blob in _provider().list_blobs(cloud_storage_path, recursive=recursive):
1097+
for blob in _provider().list_blobs(cloud_storage_path, recursive=recursive, names_only=True):
10901098
yield blob['name']
10911099

10921100

@@ -1355,28 +1363,24 @@ def delete_signed_urls(urls):
13551363
logs.info('Done deleting URLs.')
13561364

13571365

1358-
def _sign_urls_for_existing_files(
1359-
urls_and_include_delete_urls: Tuple[List[str], bool],
1366+
def _sign_urls_for_existing_file(
1367+
url_and_include_delete_urls: Tuple[List[str], bool],
13601368
minutes: int = SIGNED_URL_EXPIRATION_MINUTES) -> Tuple[str, str]:
1361-
corpus_element_urls, include_delete_urls = urls_and_include_delete_urls
1362-
signed_urls = []
1363-
for corpus_element_url in corpus_element_urls:
1364-
download_url = get_signed_download_url(corpus_element_url, minutes)
1365-
if include_delete_urls:
1366-
delete_url = sign_delete_url(corpus_element_url, minutes)
1367-
else:
1368-
delete_url = ''
1369-
signed_urls.append(download_url, delete_url)
1370-
return signed_urls
1369+
corpus_element_url, include_delete_urls = url_and_include_delete_urls
1370+
download_url = get_signed_download_url(corpus_element_url, minutes)
1371+
if include_delete_urls:
1372+
delete_url = sign_delete_url(corpus_element_url, minutes)
1373+
else:
1374+
delete_url = ''
1375+
return (download_url, delete_url)
13711376

13721377

13731378
def sign_urls_for_existing_files(urls,
13741379
include_delete_urls) -> List[Tuple[str, str]]:
13751380
logs.info('Signing URLs for existing files.')
1376-
url_batches = utils.batched(urls, 2)
1377-
args = ((url_batch, include_delete_urls) for url_batch in url_batches)
1381+
args = ((url, include_delete_urls) for url in urls)
13781382
with concurrency.make_pool(cpu_bound=True, max_pool_size=2) as pool:
1379-
result = pool.map(_sign_urls_for_existing_files, args)
1383+
result = pool.map(_sign_urls_for_existing_file, args)
13801384
logs.info('Done signing URLs for existing files.')
13811385
return result
13821386

@@ -1415,6 +1419,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str,
14151419
with concurrency.make_pool(
14161420
_POOL_SIZE, cpu_bound=True, max_pool_size=2) as pool:
14171421
url_batches = utils.batched(urls, 2)
1418-
result = list(pool.map(get_signed_upload_urls, url_batches))
1422+
result = itertools.chain(*pool.map(get_signed_upload_urls, url_batches))
14191423
logs.info('Done signing URLs for arbitrary uploads.')
14201424
return result

0 commit comments

Comments
 (0)