Skip to content

Commit 0a41390

Browse files
[tworker] Speed up fuzz_task preprocess (#4422)
Attempt to make the tworkers faster so they can scale up to OSS-Fuzz. Make the following changes based on profiling. 1. Sign URLs in parallel. 2. Don't do postprocess tasks. Pulling from a queue which we don't get anything is relatively expensive. 3. Don't clean up after tasks since preprocess doesn't change the state of the machine. 4. Don't sign delete URLs for corpus in fuzz task, we don't delete in fuzz task anyway. 5. Limit the number of upload URLs for corpus in fuzz task to the number we will actually use. 6. Memoize to avoid parsing YAML often. 7. (unrelated to preprocess) Remove overly-conservative, useless limitation on async downloads. 8. Remove unneeded call to last_updated. 9. Remove unnecessary sleep when task is rate limited. This sleep only makes sense for oss-fuzz hosts. 10. Don't download more than 25k testcases. This limit is probably good in general but for now it's needed since a lot of oss-fuzz projects seem to not have pruning jobs (maybe because they are in zone2).
1 parent 9eed2e2 commit 0a41390

File tree

7 files changed

+83
-23
lines changed

7 files changed

+83
-23
lines changed

src/clusterfuzz/_internal/base/concurrency.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,32 @@
2121
POOL_SIZE = multiprocessing.cpu_count()
2222

2323

24+
class SingleThreadPool:
25+
"""Single thread pool for when it's not worth using Python's thread
26+
implementation."""
27+
28+
def __init__(self, size):
29+
del size
30+
31+
def map(self, f, l):
32+
return list(map(f, l))
33+
34+
2435
@contextlib.contextmanager
25-
def make_pool(pool_size=POOL_SIZE):
36+
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
37+
"""Returns a pool that can (usually) execute tasks concurrently."""
38+
if max_pool_size is not None:
39+
pool_size = max(pool_size, max_pool_size)
40+
2641
# Don't use processes on Windows and unittests to avoid hangs.
2742
if (environment.get_value('PY_UNITTESTS') or
2843
environment.platform() == 'WINDOWS'):
29-
yield futures.ThreadPoolExecutor(pool_size)
44+
if cpu_bound:
45+
yield SingleThreadPool(pool_size)
46+
else:
47+
yield futures.ThreadPoolExecutor(pool_size)
3048
else:
3149
yield futures.ProcessPoolExecutor(pool_size)
50+
51+
52+
# TODO(metzman): Find out if batching makes things even faster.

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,11 @@ def get_preprocess_task():
317317

318318
def tworker_get_task():
319319
assert environment.is_tworker()
320-
task = get_postprocess_task()
321-
if task:
322-
return task
323-
320+
# TODO(metzman): Pulling tasks is relatively expensive compared to
321+
# preprocessing. It's too expensive to pull twice (once from the postproces
322+
# queue that is probably empty) to do a single preprocess. Investigate
323+
# combining preprocess and postprocess queues and allowing pulling of
324+
# multiple messages.
324325
return get_preprocess_task()
325326

326327

src/clusterfuzz/_internal/bot/tasks/commands.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ def run_command(task_name, task_argument, job_name, uworker_env):
233233
'argument': task_argument,
234234
})
235235
logs.error(f'Rate limited task: {task_name} {task_argument} {job_name}')
236-
if task_name == 'fuzz':
236+
if task_name == 'fuzz' and not environment.is_tworker():
237+
# TODO(b/377885331): Get rid of this when oss-fuzz is migrated.
237238
# Wait 10 seconds. We don't want to try again immediately because if we
238239
# tried to run a fuzz task then there is no other task to run.
239240
time.sleep(environment.get_value('FAIL_WAIT'))
@@ -467,6 +468,7 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
467468
return run_command(task_name, task_argument, job_name, uworker_env)
468469
finally:
469470
# Final clean up.
470-
cleanup_task_state()
471+
if not environment.is_tworker():
472+
cleanup_task_state()
471473
if 'CF_TASK_ID' in os.environ:
472474
del os.environ['CF_TASK_ID']

src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ def get(self) -> uworker_msg_pb2.BlobUploadUrl:
127127
return url
128128

129129

130+
def _get_max_testcases() -> int:
131+
return environment.get_value('MAX_TESTCASES', 1)
132+
133+
134+
def _get_max_corpus_uploads_per_task():
135+
number_of_fuzzer_runs = _get_max_testcases()
136+
return MAX_NEW_CORPUS_FILES * number_of_fuzzer_runs
137+
138+
130139
class Crash:
131140
"""Represents a crash (before creating a testcase)."""
132141

@@ -1497,7 +1506,7 @@ def do_engine_fuzzing(self, engine_impl):
14971506

14981507
self.fuzz_task_output.app_revision = environment.get_value('APP_REVISION')
14991508
# Do the actual fuzzing.
1500-
for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
1509+
for fuzzing_round in range(_get_max_testcases()):
15011510
logs.info(f'Fuzzing round {fuzzing_round}.')
15021511
try:
15031512
with _TrackFuzzTime(self.fully_qualified_fuzzer_name,
@@ -1572,7 +1581,7 @@ def do_blackbox_fuzzing(self, fuzzer, fuzzer_directory, job_type):
15721581
thread_timeout = test_timeout
15731582

15741583
# Determine number of testcases to process.
1575-
testcase_count = environment.get_value('MAX_TESTCASES')
1584+
testcase_count = _get_max_testcases()
15761585

15771586
# For timeout multipler greater than 1, we need to decrease testcase count
15781587
# to prevent exceeding task lease time.
@@ -2023,7 +2032,11 @@ def utask_preprocess(fuzzer_name, job_type, uworker_env):
20232032
uworker_io.entity_to_protobuf(fuzz_target))
20242033
fuzz_task_input.corpus.CopyFrom(
20252034
corpus_manager.get_fuzz_target_corpus(
2026-
fuzzer_name, fuzz_target.project_qualified_name()).serialize())
2035+
fuzzer_name,
2036+
fuzz_target.project_qualified_name(),
2037+
include_delete_urls=False,
2038+
max_upload_urls=_get_max_corpus_uploads_per_task(),
2039+
max_download_urls=25000).serialize())
20272040

20282041
for _ in range(MAX_CRASHES_UPLOADED):
20292042
url = fuzz_task_input.crash_upload_urls.add()

src/clusterfuzz/_internal/fuzzing/corpus_manager.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
"""Functions for corpus synchronization with GCS."""
1515

16+
import itertools
1617
import os
1718
import re
1819
import shutil
@@ -641,15 +642,27 @@ def sync_data_bundle_corpus_to_disk(data_bundle_corpus, directory):
641642
return len(fails) < MAX_SYNC_ERRORS
642643

643644

645+
def _last_updated(*args, **kwargs):
646+
if environment.is_tworker():
647+
return None
648+
return storage.last_updated(*args, **kwargs)
649+
650+
644651
def get_proto_corpus(bucket_name,
645652
bucket_path,
646653
max_upload_urls,
647-
include_delete_urls=False):
654+
include_delete_urls=False,
655+
max_download_urls=None):
648656
"""Returns a proto representation of a corpus."""
649657
gcs_url = _get_gcs_url(bucket_name, bucket_path)
650658
# TODO(metzman): Allow this step to be skipped by trusted fuzzers.
651659
urls = (f'{storage.GS_PREFIX}/{bucket_name}/{url}'
652660
for url in storage.list_blobs(gcs_url))
661+
662+
if max_download_urls is not None:
663+
urls = itertools.islice(urls, max_download_urls)
664+
# TODO(metzman): Stop limiting URLs when pruning works on oss-fuzz
665+
# again.
653666
corpus_urls = dict(
654667
storage.sign_urls_for_existing_files(urls, include_delete_urls))
655668

@@ -660,7 +673,7 @@ def get_proto_corpus(bucket_name,
660673
upload_urls=upload_urls,
661674
gcs_url=gcs_url,
662675
)
663-
last_updated = storage.last_updated(_get_gcs_url(bucket_name, bucket_path))
676+
last_updated = _last_updated(_get_gcs_url(bucket_name, bucket_path))
664677
if last_updated:
665678
timestamp = timestamp_pb2.Timestamp() # pylint: disable=no-member
666679
timestamp.FromDatetime(last_updated)
@@ -688,7 +701,8 @@ def get_fuzz_target_corpus(engine,
688701
quarantine=False,
689702
include_regressions=False,
690703
include_delete_urls=False,
691-
max_upload_urls=10000):
704+
max_upload_urls=10000,
705+
max_download_urls=None):
692706
"""Copies the corpus from gcs to disk. Can run on uworker."""
693707
fuzz_target_corpus = uworker_msg_pb2.FuzzTargetCorpus() # pylint: disable=no-member
694708
bucket_name, bucket_path = get_target_bucket_and_path(
@@ -697,7 +711,8 @@ def get_fuzz_target_corpus(engine,
697711
bucket_name,
698712
bucket_path,
699713
include_delete_urls=include_delete_urls,
700-
max_upload_urls=max_upload_urls)
714+
max_upload_urls=max_upload_urls,
715+
max_download_urls=max_download_urls)
701716
fuzz_target_corpus.corpus.CopyFrom(corpus)
702717

703718
assert not (include_regressions and quarantine)
@@ -707,7 +722,8 @@ def get_fuzz_target_corpus(engine,
707722
bucket_name,
708723
regressions_bucket_path,
709724
max_upload_urls=0, # This is never uploaded to using this mechanism.
710-
include_delete_urls=False) # This is never deleted from.
725+
include_delete_urls=False, # This is never deleted from.
726+
max_download_urls=max_download_urls)
711727
fuzz_target_corpus.regressions_corpus.CopyFrom(regressions_corpus)
712728

713729
return ProtoFuzzTargetCorpus(engine, project_qualified_target_name,

src/clusterfuzz/_internal/google_cloud_utils/storage.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ def get_object_size(cloud_storage_file_path):
11351135
return int(gcs_object['size'])
11361136

11371137

1138+
@memoize.wrap(memoize.FifoInMemory(1))
11381139
def blobs_bucket():
11391140
"""Get the blobs bucket name."""
11401141
# Allow tests to override blobs bucket name safely.
@@ -1351,18 +1352,24 @@ def _sign_urls_for_existing_file(
13511352
return (download_url, delete_url)
13521353

13531354

1355+
def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls):
1356+
url, include_delete_urls = url_and_include_delete_urls
1357+
return _sign_urls_for_existing_file(url, include_delete_urls)
1358+
1359+
13541360
def sign_urls_for_existing_files(urls,
13551361
include_delete_urls) -> List[Tuple[str, str]]:
13561362
logs.info('Signing URLs for existing files.')
1357-
result = [
1358-
_sign_urls_for_existing_file(url, include_delete_urls) for url in urls
1359-
]
1363+
args = ((url, include_delete_urls) for url in urls)
1364+
with concurrency.make_pool(cpu_bound=True, max_pool_size=2) as pool:
1365+
result = pool.map(_mappable_sign_urls_for_existing_file, args)
13601366
logs.info('Done signing URLs for existing files.')
13611367
return result
13621368

13631369

13641370
def get_arbitrary_signed_upload_url(remote_directory):
1365-
return get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1)[0]
1371+
return list(
1372+
get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0]
13661373

13671374

13681375
def get_arbitrary_signed_upload_urls(remote_directory: str,
@@ -1390,6 +1397,8 @@ def get_arbitrary_signed_upload_urls(remote_directory: str,
13901397

13911398
urls = (f'{base_path}-{idx}' for idx in range(num_uploads))
13921399
logs.info('Signing URLs for arbitrary uploads.')
1393-
result = [get_signed_upload_url(url) for url in urls]
1400+
with concurrency.make_pool(
1401+
_POOL_SIZE, cpu_bound=True, max_pool_size=2) as pool:
1402+
result = list(pool.map(get_signed_upload_url, urls))
13941403
logs.info('Done signing URLs for arbitrary uploads.')
13951404
return result

src/clusterfuzz/_internal/system/fast_http.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ def download_urls(urls_and_filepaths: List[Tuple[str, str]]) -> List[bool]:
3636
batch_size = len(urls_and_filepaths) // concurrency.POOL_SIZE
3737
# Avoid issues with range when urls is less than _POOL_SIZE.
3838
batch_size = max(batch_size, len(urls_and_filepaths))
39-
# Avoid OOMs by limiting the amount of concurrent downloads.
40-
batch_size = min(5, batch_size)
4139

4240
for idx in range(0, len(urls_and_filepaths), batch_size):
4341
batch = urls_and_filepaths[idx:idx + batch_size]

0 commit comments

Comments
 (0)