Skip to content

Commit c17c3ff

Browse files
Bring back parallel signing for tworkers. (#4447)
It's important for speed, probably safe on tworkers, and it will be very easy to notice if things go wrong since the preprocess queue will pile up. This PR also fixes a bad bug where the max_pool_size was not obeyed, but in practice since this code never runs on machines with more than 2 cores, it's unlikely to matter. Partial undoes #4430
1 parent e83c7f4 commit c17c3ff

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

src/clusterfuzz/_internal/base/concurrency.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def map(self, f, l):
3636
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
3737
"""Returns a pool that can (usually) execute tasks concurrently."""
3838
if max_pool_size is not None:
39-
pool_size = max(pool_size, max_pool_size)
39+
pool_size = min(pool_size, max_pool_size)
4040

4141
# Don't use processes on Windows and unittests to avoid hangs.
4242
if (environment.get_value('PY_UNITTESTS') or

src/clusterfuzz/_internal/google_cloud_utils/storage.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,7 +1369,7 @@ def sign_urls_for_existing_files(urls,
13691369
include_delete_urls) -> List[Tuple[str, str]]:
13701370
logs.info('Signing URLs for existing files.')
13711371
args = ((url, include_delete_urls) for url in urls)
1372-
result = list(map(_sign_urls_for_existing_file, args))
1372+
result = maybe_parallel_map(_sign_urls_for_existing_file, args)
13731373
logs.info('Done signing URLs for existing files.')
13741374
return result
13751375

@@ -1379,6 +1379,19 @@ def get_arbitrary_signed_upload_url(remote_directory):
13791379
get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0]
13801380

13811381

1382+
def maybe_parallel_map(func, arguments):
1383+
"""Wrapper around pool.map so we don't do it on OSS-Fuzz hosts which
1384+
will OOM."""
1385+
if not environment.is_tworker():
1386+
# TODO(b/metzman): When the rearch is done, internal google CF won't have
1387+
# tworkers, but maybe should be using parallel.
1388+
return list(map(func, arguments))
1389+
1390+
max_size = 2
1391+
with concurrency.make_pool(cpu_bound=True, max_pool_size=max_size) as pool:
1392+
return list(pool.map(func, arguments))
1393+
1394+
13821395
def get_arbitrary_signed_upload_urls(remote_directory: str,
13831396
num_uploads: int) -> List[str]:
13841397
"""Returns |num_uploads| number of signed upload URLs to upload files with
@@ -1404,6 +1417,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str,
14041417

14051418
urls = (f'{base_path}-{idx}' for idx in range(num_uploads))
14061419
logs.info('Signing URLs for arbitrary uploads.')
1407-
result = list(map(get_signed_upload_url, urls))
1420+
result = maybe_parallel_map(get_signed_upload_url, urls)
14081421
logs.info('Done signing URLs for arbitrary uploads.')
14091422
return result

0 commit comments

Comments
 (0)