Skip to content

Commit d0092e2

Browse files
Start to schedule fuzz tasks on batch in OSS-Fuzz (#4397)
Start to schedule fuzz tasks on batch in OSS-Fuzz The scheduler will work differenly in OSS-Fuzz and Chrome. This only implements and OSS-Fuzz version. This version will use job and project weights to decide which fuzzing jobs to schedule. It then adds these tasks to the queue for other bots to preprocess and then for the utask_main_scheduler to actually schedule on batch. For now, we will only do this for 100 CPUs. 1. Add a cron job to run the scheduler every 15 minutes. 2. Improve region handling in batch (still far from complete). 3. Add function for bulk adding of tasks to queue for use by scheduler. 4. Make fuzz tasks less of a priority on batch than others.
1 parent 613e1c6 commit d0092e2

File tree

14 files changed

+531
-79
lines changed

14 files changed

+531
-79
lines changed

configs/test/batch/batch.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ mapping:
2020
disk_size_gb: 110
2121
disk_type: pd-standard
2222
service_account_email: test-clusterfuzz-service-account-email
23-
subnetwork: null
2423
gce_region: 'gce-region'
2524
gce_zone: 'gce-zone'
2625
network: 'projects/google.com:clusterfuzz/global/networks/networkname'

infra/k8s/schedule-fuzz.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: batch/v1
16+
kind: CronJob
17+
metadata:
18+
name: schedule-fuzz
19+
spec:
20+
schedule: "*/10 * * * *"
21+
concurrencyPolicy: Forbid
22+
jobTemplate:
23+
spec:
24+
activeDeadlineSeconds: 900 # 15 minutes.
25+
template:
26+
spec:
27+
containers:
28+
- name: backup
29+
image: gcr.io/clusterfuzz-images/base:091c6c2-202409251610
30+
imagePullPolicy: Always
31+
env:
32+
- name: CLUSTERFUZZ_RELEASE
33+
value: "prod"
34+
- name: RUN_CMD
35+
value: "python3.11 $ROOT_DIR/src/python/bot/startup/run_cron.py schedule_fuzz"
36+
- name: IS_K8S_ENV
37+
value: "true"
38+
- name: DISABLE_MOUNTS
39+
value: "true"
40+
restartPolicy: OnFailure
41+
backoffLimit: 3
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tools for concurrency/parallelism."""
15+
from concurrent import futures
16+
import contextlib
17+
import multiprocessing
18+
19+
from clusterfuzz._internal.system import environment
20+
21+
POOL_SIZE = multiprocessing.cpu_count()
22+
23+
24+
@contextlib.contextmanager
25+
def make_pool(pool_size=POOL_SIZE):
26+
# Don't use processes on Windows and unittests to avoid hangs.
27+
if (environment.get_value('PY_UNITTESTS') or
28+
environment.platform() == 'WINDOWS'):
29+
yield futures.ThreadPoolExecutor(pool_size)
30+
else:
31+
yield futures.ProcessPoolExecutor(pool_size)

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ def __init__(self,
368368
eta=None,
369369
is_command_override=False,
370370
high_end=False,
371-
extra_info=None):
371+
extra_info=None,
372+
is_from_queue=False):
372373
self.command = command
373374
self.argument = argument
374375
self.job = job
@@ -377,6 +378,19 @@ def __init__(self,
377378
self.high_end = high_end
378379
self.extra_info = extra_info
379380

381+
# is_from_queue is a temporary hack to keep track of which fuzz tasks came
382+
# from the queue. Previously all fuzz tasks were picked by the bot when
383+
# there was nothing on the queue. With the rearchitecture, we want fuzz
384+
# tasks that were put on the queue by the schedule_fuzz cron job to be
385+
# executed on batch. is_from_queue is used to do this.
386+
# TODO(b/378684001): This code is very ugly, get rid of it when no more
387+
# fuzz tasks are executed on the bots themselves (i.e. when the rearch
388+
# is complete).
389+
self.is_from_queue = is_from_queue
390+
391+
def __repr__(self):
392+
return f'Task: {self.command} {self.argument} {self.job}'
393+
380394
def attribute(self, _):
381395
return None
382396

@@ -414,11 +428,13 @@ def lease(self):
414428
class PubSubTask(Task):
415429
"""A Pub/Sub task."""
416430

417-
def __init__(self, pubsub_message):
431+
def __init__(self, pubsub_message, is_from_queue=False):
418432
self._pubsub_message = pubsub_message
419433
super().__init__(
420-
self.attribute('command'), self.attribute('argument'),
421-
self.attribute('job'))
434+
self.attribute('command'),
435+
self.attribute('argument'),
436+
self.attribute('job'),
437+
is_from_queue=is_from_queue)
422438

423439
self.extra_info = {
424440
key: value
@@ -524,7 +540,7 @@ def initialize_task(message) -> PubSubTask:
524540
"""Creates a task from |messages|."""
525541

526542
if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
527-
return PubSubTask(message)
543+
return PubSubTask(message, is_from_queue=True)
528544

529545
# Handle postprocess task.
530546
# The GCS API for pub/sub notifications uses the data field unlike
@@ -533,7 +549,7 @@ def initialize_task(message) -> PubSubTask:
533549
name = data['name']
534550
bucket = data['bucket']
535551
output_url_argument = storage.get_cloud_storage_file_path(bucket, name)
536-
return PostprocessPubSubTask(output_url_argument, message)
552+
return PostprocessPubSubTask(output_url_argument, message, is_from_queue=True)
537553

538554

539555
class PostprocessPubSubTask(PubSubTask):
@@ -542,14 +558,21 @@ class PostprocessPubSubTask(PubSubTask):
542558
def __init__(self,
543559
output_url_argument,
544560
pubsub_message,
545-
is_command_override=False):
561+
is_command_override=False,
562+
is_from_queue=False):
546563
command = 'postprocess'
547564
job_type = 'none'
548565
eta = None
549566
high_end = False
550567
grandparent_class = super(PubSubTask, self)
551-
grandparent_class.__init__(command, output_url_argument, job_type, eta,
552-
is_command_override, high_end)
568+
grandparent_class.__init__(
569+
command,
570+
output_url_argument,
571+
job_type,
572+
eta,
573+
is_command_override,
574+
high_end,
575+
is_from_queue=is_from_queue)
553576
self._pubsub_message = pubsub_message
554577

555578

@@ -609,18 +632,36 @@ def add_utask_main(command, input_url, job_type, wait_time=None):
609632
extra_info={'initial_command': initial_command})
610633

611634

635+
def bulk_add_tasks(tasks, queue=None, eta_now=False):
636+
"""Adds |tasks| in bulk to |queue|."""
637+
638+
# Old testcases may pass in queue=None explicitly, so we must check this here.
639+
if queue is None:
640+
queue = default_queue()
641+
642+
# If callers want delays, they must do it themselves, because this function is
643+
# meant to be used for batch tasks which don't need this.
644+
# Use an ETA of right now for batch because we don't need extra delay, there
645+
# is natural delay added by batch, waiting for utask_main_scheduler,
646+
# postprocess etc.
647+
if eta_now:
648+
now = utils.utcnow()
649+
for task in tasks:
650+
task.eta = now
651+
652+
pubsub_client = pubsub.PubSubClient()
653+
pubsub_messages = [task.to_pubsub_message() for task in tasks]
654+
pubsub_client.publish(
655+
pubsub.topic_name(utils.get_application_id(), queue), pubsub_messages)
656+
657+
612658
def add_task(command,
613659
argument,
614660
job_type,
615661
queue=None,
616662
wait_time=None,
617663
extra_info=None):
618664
"""Add a new task to the job queue."""
619-
# Old testcases may pass in queue=None explicitly,
620-
# so we must check this here.
621-
if not queue:
622-
queue = default_queue()
623-
624665
if wait_time is None:
625666
wait_time = random.randint(1, TASK_CREATION_WAIT_INTERVAL)
626667

@@ -636,10 +677,8 @@ def add_task(command,
636677
# Add the task.
637678
eta = utils.utcnow() + datetime.timedelta(seconds=wait_time)
638679
task = Task(command, argument, job_type, eta=eta, extra_info=extra_info)
639-
pubsub_client = pubsub.PubSubClient()
640-
pubsub_client.publish(
641-
pubsub.topic_name(utils.get_application_id(), queue),
642-
[task.to_pubsub_message()])
680+
681+
bulk_add_tasks([task], queue=queue)
643682

644683

645684
def get_task_lease_timeout():

src/clusterfuzz/_internal/bot/tasks/commands.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ def cleanup_task_state():
9595

9696
# Call python's garbage collector.
9797
utils.python_gc()
98-
if 'CF_TASK_ID' in os.environ:
99-
del os.environ['CF_TASK_ID']
10098

10199

102100
def is_supported_cpu_arch_for_job():
@@ -255,8 +253,10 @@ def process_command(task):
255253
logs.error('Empty task received.')
256254
return None
257255

256+
# TODO(b/378684001): Remove is_from_queue kludge.
258257
return process_command_impl(task.command, task.argument, task.job,
259-
task.high_end, task.is_command_override)
258+
task.high_end, task.is_command_override,
259+
task.is_from_queue)
260260

261261

262262
def _get_task_id(task_name, task_argument, job_name):
@@ -267,12 +267,13 @@ def _get_task_id(task_name, task_argument, job_name):
267267
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
268268
@set_task_payload
269269
def process_command_impl(task_name, task_argument, job_name, high_end,
270-
is_command_override):
270+
is_command_override, is_from_queue):
271271
"""Implementation of process_command."""
272272
uworker_env = None
273273
environment.set_value('TASK_NAME', task_name)
274274
environment.set_value('TASK_ARGUMENT', task_argument)
275275
environment.set_value('JOB_NAME', job_name)
276+
environment.set_value('IS_FROM_QUEUE', is_from_queue)
276277
if task_name in {'uworker_main', 'postprocess'}:
277278
# We want the id of the task we are processing, not "uworker_main", or
278279
# "postprocess".
@@ -452,3 +453,7 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
452453
finally:
453454
# Final clean up.
454455
cleanup_task_state()
456+
if 'CF_TASK_ID' in os.environ:
457+
del os.environ['CF_TASK_ID']
458+
if 'IS_FROM_QUEUE' in os.environ:
459+
del os.environ['IS_FROM_QUEUE']

src/clusterfuzz/_internal/bot/tasks/setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,6 @@ def update_data_bundle(
480480
data_bundle = uworker_io.entity_from_protobuf(data_bundle_corpus.data_bundle,
481481
data_types.DataBundle)
482482
logs.info('Setting up data bundle %s.' % data_bundle)
483-
484483
data_bundle_directory = _prepare_update_data_bundle(fuzzer, data_bundle)
485484

486485
if not _should_update_data_bundle(data_bundle, data_bundle_directory):

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ def preprocess(self, task_argument, job_type, uworker_env):
156156
return download_url
157157

158158

159+
# TODO(b/378684001): Remove this, it's needed for testing but is otherwise a bad
160+
# design.
161+
class UTaskMostlyLocalExecutor(UTask):
162+
163+
@staticmethod
164+
def is_execution_remote(command=None):
165+
del command
166+
if environment.get_value('IS_FROM_QUEUE'):
167+
logs.info('IS FROM QUEUE')
168+
return True
169+
return False
170+
171+
159172
class PostprocessTask(BaseTask):
160173
"""Represents postprocessing of an untrusted task."""
161174

@@ -198,7 +211,7 @@ def execute(self, task_argument, job_type, uworker_env):
198211
'analyze': UTask,
199212
'blame': TrustedTask,
200213
'corpus_pruning': UTask,
201-
'fuzz': UTaskLocalExecutor,
214+
'fuzz': UTaskMostlyLocalExecutor,
202215
'impact': TrustedTask,
203216
'minimize': UTask,
204217
'progression': UTask,

0 commit comments

Comments
 (0)