Skip to content

Commit ed4d762

Browse files
[tworker] Speed up fuzz_task preprocess.
Attempt to make the tworkers faster so they can scale up to OSS-Fuzz. Make the following changes based on profiling. 1. Sign URLs in parallel. 2. Don't do postprocess tasks. Pulling from a queue which we don't get anything is relatively expensive. 3. Don't clean up after tasks since preprocess doesn't change the state of the machine. 4. Don't sign delete URLs for corpus in fuzz task, we don't delete in fuzz task anyway. 5. Limit the number of upload URLs for corpus in fuzz task to the number we will actually use. 6. Memoize to avoid parsing YAML often. 7. (unrelated to preprocess) Remove overly-conservative, useless limitation on async downloads.
1 parent b8b457f commit ed4d762

File tree

6 files changed

+50
-16
lines changed

6 files changed

+50
-16
lines changed

src/clusterfuzz/_internal/base/concurrency.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,30 @@
2121
POOL_SIZE = multiprocessing.cpu_count()
2222

2323

24+
class InProcessPool:
25+
26+
def __init__(self, size):
27+
del size
28+
29+
def map(self, f, l):
30+
return list(map(f, l))
31+
32+
2433
@contextlib.contextmanager
25-
def make_pool(pool_size=POOL_SIZE):
34+
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
35+
"""Returns a pool that can (usually) execute tasks concurrently."""
36+
if max_pool_size is not None:
37+
pool_size = max(pool_size, max_pool_size)
38+
2639
# Don't use processes on Windows and unittests to avoid hangs.
2740
if (environment.get_value('PY_UNITTESTS') or
2841
environment.platform() == 'WINDOWS'):
29-
yield futures.ThreadPoolExecutor(pool_size)
42+
if cpu_bound:
43+
yield InProcessPool(pool_size)
44+
else:
45+
yield futures.ThreadPoolExecutor(pool_size)
3046
else:
3147
yield futures.ProcessPoolExecutor(pool_size)
48+
49+
50+
# TODO(metzman): Find out if batching makes things even faster.

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,6 @@ def get_preprocess_task():
317317

318318
def tworker_get_task():
319319
assert environment.is_tworker()
320-
task = get_postprocess_task()
321-
if task:
322-
return task
323-
324320
return get_preprocess_task()
325321

326322

src/clusterfuzz/_internal/bot/tasks/commands.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
467467
return run_command(task_name, task_argument, job_name, uworker_env)
468468
finally:
469469
# Final clean up.
470-
cleanup_task_state()
470+
if not environment.is_tworker():
471+
cleanup_task_state()
471472
if 'CF_TASK_ID' in os.environ:
472473
del os.environ['CF_TASK_ID']

src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ def get(self) -> uworker_msg_pb2.BlobUploadUrl:
127127
return url
128128

129129

130+
def _get_max_testcases() -> int:
131+
return environment.get_value('MAX_TESTCASES', 1)
132+
133+
134+
def _get_max_corpus_uploads_per_task():
135+
number_of_fuzzer_runs = _get_max_testcases()
136+
return MAX_NEW_CORPUS_FILES * number_of_fuzzer_runs
137+
138+
130139
class Crash:
131140
"""Represents a crash (before creating a testcase)."""
132141

@@ -1497,7 +1506,7 @@ def do_engine_fuzzing(self, engine_impl):
14971506

14981507
self.fuzz_task_output.app_revision = environment.get_value('APP_REVISION')
14991508
# Do the actual fuzzing.
1500-
for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
1509+
for fuzzing_round in range(_get_max_testcases()):
15011510
logs.info(f'Fuzzing round {fuzzing_round}.')
15021511
try:
15031512
with _TrackFuzzTime(self.fully_qualified_fuzzer_name,
@@ -1572,7 +1581,7 @@ def do_blackbox_fuzzing(self, fuzzer, fuzzer_directory, job_type):
15721581
thread_timeout = test_timeout
15731582

15741583
# Determine number of testcases to process.
1575-
testcase_count = environment.get_value('MAX_TESTCASES')
1584+
testcase_count = _get_max_testcases()
15761585

15771586
# For timeout multipler greater than 1, we need to decrease testcase count
15781587
# to prevent exceeding task lease time.
@@ -2023,7 +2032,10 @@ def utask_preprocess(fuzzer_name, job_type, uworker_env):
20232032
uworker_io.entity_to_protobuf(fuzz_target))
20242033
fuzz_task_input.corpus.CopyFrom(
20252034
corpus_manager.get_fuzz_target_corpus(
2026-
fuzzer_name, fuzz_target.project_qualified_name()).serialize())
2035+
fuzzer_name,
2036+
fuzz_target.project_qualified_name(),
2037+
include_delete_urls=False,
2038+
max_upload_urls=_get_max_corpus_uploads_per_task()).serialize())
20272039

20282040
for _ in range(MAX_CRASHES_UPLOADED):
20292041
url = fuzz_task_input.crash_upload_urls.add()

src/clusterfuzz/_internal/google_cloud_utils/storage.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ def get_object_size(cloud_storage_file_path):
11351135
return int(gcs_object['size'])
11361136

11371137

1138+
@memoize.wrap(memoize.FifoInMemory(1))
11381139
def blobs_bucket():
11391140
"""Get the blobs bucket name."""
11401141
# Allow tests to override blobs bucket name safely.
@@ -1351,12 +1352,17 @@ def _sign_urls_for_existing_file(
13511352
return (download_url, delete_url)
13521353

13531354

1355+
def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls):
1356+
url, include_delete_urls = url_and_include_delete_urls
1357+
return _sign_urls_for_existing_file(url, include_delete_urls)
1358+
1359+
13541360
def sign_urls_for_existing_files(urls,
13551361
include_delete_urls) -> List[Tuple[str, str]]:
13561362
logs.info('Signing URLs for existing files.')
1357-
result = [
1358-
_sign_urls_for_existing_file(url, include_delete_urls) for url in urls
1359-
]
1363+
args = ((url, include_delete_urls) for url in urls)
1364+
with concurrency.make_pool(cpu_bound=True, max_pool_size=2) as pool:
1365+
result = pool.map(_mappable_sign_urls_for_existing_file, args)
13601366
logs.info('Done signing URLs for existing files.')
13611367
return result
13621368

@@ -1390,6 +1396,8 @@ def get_arbitrary_signed_upload_urls(remote_directory: str,
13901396

13911397
urls = (f'{base_path}-{idx}' for idx in range(num_uploads))
13921398
logs.info('Signing URLs for arbitrary uploads.')
1393-
result = [get_signed_upload_url(url) for url in urls]
1399+
with concurrency.make_pool(
1400+
_POOL_SIZE, cpu_bound=True, max_pool_size=2) as pool:
1401+
result = pool.map(get_signed_upload_url, urls)
13941402
logs.info('Done signing URLs for arbitrary uploads.')
13951403
return result

src/clusterfuzz/_internal/system/fast_http.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ def download_urls(urls_and_filepaths: List[Tuple[str, str]]) -> List[bool]:
3636
batch_size = len(urls_and_filepaths) // concurrency.POOL_SIZE
3737
# Avoid issues with range when urls is less than _POOL_SIZE.
3838
batch_size = max(batch_size, len(urls_and_filepaths))
39-
# Avoid OOMs by limiting the amount of concurrent downloads.
40-
batch_size = min(5, batch_size)
4139

4240
for idx in range(0, len(urls_and_filepaths), batch_size):
4341
batch = urls_and_filepaths[idx:idx + batch_size]

0 commit comments

Comments
 (0)