Skip to content

Commit ccb5ef6

Browse files
Add generic tworker. (#4418)
This tworker will only run preprocess and postprocess. It can do this regardless of the platform of the task. Also, get rid of some kludges used to test utasks in oss-fuzz. b/380104573
1 parent cef1e49 commit ccb5ef6

File tree

12 files changed

+103
-126
lines changed

12 files changed

+103
-126
lines changed

docker/build.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ IMAGES=(
2727
gcr.io/clusterfuzz-images/oss-fuzz/worker
2828
gcr.io/clusterfuzz-images/ci
2929
gcr.io/clusterfuzz-images/utask-main-scheduler
30+
gcr.io/clusterfuzz-images/tworker
3031
gcr.io/clusterfuzz-images/fuchsia
3132
)
3233

docker/tworker/Dockerfile

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
FROM gcr.io/clusterfuzz-images/base
15+
16+
# Worker that only reads from queues, preprocesses and postprocesses.
17+
ENV TWORKER=1

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383

8484
POSTPROCESS_QUEUE = 'postprocess'
8585
UTASK_MAINS_QUEUE = 'utask_main'
86+
PREPROCESS_QUEUE = 'preprocess'
8687

8788
# See https://github.com/google/clusterfuzz/issues/3347 for usage
8889
SUBQUEUE_IDENTIFIER = ':'
@@ -281,8 +282,7 @@ def is_done_collecting_messages():
281282
def get_postprocess_task():
282283
"""Gets a postprocess task if one exists."""
283284
# This should only be run on non-preemptible bots.
284-
if not (task_utils.is_remotely_executing_utasks() or
285-
task_utils.get_opted_in_tasks()):
285+
if not task_utils.is_remotely_executing_utasks():
286286
return None
287287
# Postprocess is platform-agnostic, so we run all such tasks on our
288288
# most generic and plentiful bots only. In other words, we avoid
@@ -304,9 +304,29 @@ def allow_all_tasks():
304304
return not environment.get_value('PREEMPTIBLE')
305305

306306

307+
def get_preprocess_task():
308+
pubsub_puller = PubSubPuller(PREPROCESS_QUEUE)
309+
messages = pubsub_puller.get_messages(max_messages=1)
310+
if not messages:
311+
return None
312+
task = get_task_from_message(messages[0])
313+
if task:
314+
logs.info('Pulled from preprocess queue.')
315+
return task
316+
317+
318+
def tworker_get_task():
319+
assert environment.is_tworker()
320+
task = get_postprocess_task()
321+
if task:
322+
return task
323+
324+
return get_preprocess_task()
325+
326+
307327
def get_task():
308-
"""Returns an ordinary (non-postprocess, non-utask_main) task that is pulled
309-
from a ClusterFuzz task queue."""
328+
"""Returns an ordinary (non-utask_main) task that is pulled from a ClusterFuzz
329+
task queue."""
310330
task = get_command_override()
311331
if task:
312332
return task
@@ -319,6 +339,7 @@ def get_task():
319339
task = get_postprocess_task()
320340
if task:
321341
return task
342+
322343
# Check the high-end jobs queue for bots with multiplier greater than 1.
323344
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
324345
if thread_multiplier and thread_multiplier > 1:
@@ -368,8 +389,7 @@ def __init__(self,
368389
eta=None,
369390
is_command_override=False,
370391
high_end=False,
371-
extra_info=None,
372-
is_from_queue=False):
392+
extra_info=None):
373393
self.command = command
374394
self.argument = argument
375395
self.job = job
@@ -378,16 +398,6 @@ def __init__(self,
378398
self.high_end = high_end
379399
self.extra_info = extra_info
380400

381-
# is_from_queue is a temporary hack to keep track of which fuzz tasks came
382-
# from the queue. Previously all fuzz tasks were picked by the bot when
383-
# there was nothing on the queue. With the rearchitecture, we want fuzz
384-
# tasks that were put on the queue by the schedule_fuzz cron job to be
385-
# executed on batch. is_from_queue is used to do this.
386-
# TODO(b/378684001): This code is very ugly, get rid of it when no more
387-
# fuzz tasks are executed on the bots themselves (i.e. when the rearch
388-
# is complete).
389-
self.is_from_queue = is_from_queue
390-
391401
def __repr__(self):
392402
return f'Task: {self.command} {self.argument} {self.job}'
393403

@@ -428,13 +438,11 @@ def lease(self):
428438
class PubSubTask(Task):
429439
"""A Pub/Sub task."""
430440

431-
def __init__(self, pubsub_message, is_from_queue=False):
441+
def __init__(self, pubsub_message):
432442
self._pubsub_message = pubsub_message
433443
super().__init__(
434-
self.attribute('command'),
435-
self.attribute('argument'),
436-
self.attribute('job'),
437-
is_from_queue=is_from_queue)
444+
self.attribute('command'), self.attribute('argument'),
445+
self.attribute('job'))
438446

439447
self.extra_info = {
440448
key: value
@@ -540,7 +548,7 @@ def initialize_task(message) -> PubSubTask:
540548
"""Creates a task from |messages|."""
541549

542550
if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
543-
return PubSubTask(message, is_from_queue=True)
551+
return PubSubTask(message)
544552

545553
# Handle postprocess task.
546554
# The GCS API for pub/sub notifications uses the data field unlike
@@ -549,7 +557,7 @@ def initialize_task(message) -> PubSubTask:
549557
name = data['name']
550558
bucket = data['bucket']
551559
output_url_argument = storage.get_cloud_storage_file_path(bucket, name)
552-
return PostprocessPubSubTask(output_url_argument, message, is_from_queue=True)
560+
return PostprocessPubSubTask(output_url_argument, message)
553561

554562

555563
class PostprocessPubSubTask(PubSubTask):
@@ -558,21 +566,14 @@ class PostprocessPubSubTask(PubSubTask):
558566
def __init__(self,
559567
output_url_argument,
560568
pubsub_message,
561-
is_command_override=False,
562-
is_from_queue=False):
569+
is_command_override=False):
563570
command = 'postprocess'
564571
job_type = 'none'
565572
eta = None
566573
high_end = False
567574
grandparent_class = super(PubSubTask, self)
568-
grandparent_class.__init__(
569-
command,
570-
output_url_argument,
571-
job_type,
572-
eta,
573-
is_command_override,
574-
high_end,
575-
is_from_queue=is_from_queue)
575+
grandparent_class.__init__(command, output_url_argument, job_type, eta,
576+
is_command_override, high_end)
576577
self._pubsub_message = pubsub_message
577578

578579

src/clusterfuzz/_internal/base/tasks/task_utils.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
any other module in tasks to prevent circular imports and issues with
1616
appengine."""
1717

18-
from clusterfuzz._internal.config import local_config
1918
from clusterfuzz._internal.system import environment
2019

2120

@@ -26,25 +25,12 @@ def get_command_from_module(full_module_name: str) -> str:
2625
return module_name[:-len('_task')]
2726

2827

29-
def is_remotely_executing_utasks(task=None) -> bool:
28+
def is_remotely_executing_utasks() -> bool:
3029
"""Returns True if the utask_main portions of utasks are being remotely
3130
executed on Google cloud batch."""
32-
if bool(environment.is_production() and
33-
environment.get_value('REMOTE_UTASK_EXECUTION')):
34-
return True
35-
if task is None:
36-
return False
37-
return bool(is_task_opted_into_uworker_execution(task))
38-
39-
40-
def get_opted_in_tasks():
41-
return local_config.ProjectConfig().get('uworker_tasks', [])
42-
43-
44-
def is_task_opted_into_uworker_execution(task: str) -> bool:
45-
# TODO(metzman): Remove this after OSS-Fuzz and Chrome are at parity.
46-
uworker_tasks = get_opted_in_tasks()
47-
return task in uworker_tasks
31+
# TODO(metzman): REMOTE_UTASK_EXECUTION should be a config not an env var.
32+
return (environment.is_production() and
33+
environment.get_value('REMOTE_UTASK_EXECUTION'))
4834

4935

5036
class UworkerMsgParseError(RuntimeError):

src/clusterfuzz/_internal/bot/tasks/commands.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,25 @@ def start_web_server_if_needed():
190190
logs.error('Failed to start web server, skipping.')
191191

192192

193+
def get_command_object(task_name):
194+
"""Returns the command object that execute can be called on."""
195+
task = COMMAND_MAP.get(task_name)
196+
if not environment.is_tworker():
197+
return task
198+
199+
if isinstance(task, task_types.TrustedTask):
200+
# We don't need to execute this remotely.
201+
return task
202+
203+
# Force remote execution.
204+
return task_types.UTask(task_name)
205+
206+
193207
def run_command(task_name, task_argument, job_name, uworker_env):
194208
"""Runs the command."""
195-
task = COMMAND_MAP.get(task_name)
209+
task = get_command_object(task_name)
196210
if not task:
197-
logs.error("Unknown command '%s'" % task_name)
211+
logs.error(f'Unknown command "{task_name}"')
198212
return None
199213

200214
# If applicable, ensure this is the only instance of the task running.
@@ -253,10 +267,8 @@ def process_command(task):
253267
logs.error('Empty task received.')
254268
return None
255269

256-
# TODO(b/378684001): Remove is_from_queue kludge.
257270
return process_command_impl(task.command, task.argument, task.job,
258-
task.high_end, task.is_command_override,
259-
task.is_from_queue)
271+
task.high_end, task.is_command_override)
260272

261273

262274
def _get_task_id(task_name, task_argument, job_name):
@@ -267,13 +279,12 @@ def _get_task_id(task_name, task_argument, job_name):
267279
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
268280
@set_task_payload
269281
def process_command_impl(task_name, task_argument, job_name, high_end,
270-
is_command_override, is_from_queue):
282+
is_command_override):
271283
"""Implementation of process_command."""
272284
uworker_env = None
273285
environment.set_value('TASK_NAME', task_name)
274286
environment.set_value('TASK_ARGUMENT', task_argument)
275287
environment.set_value('JOB_NAME', job_name)
276-
environment.set_value('IS_FROM_QUEUE', is_from_queue)
277288
if task_name in {'uworker_main', 'postprocess'}:
278289
# We want the id of the task we are processing, not "uworker_main", or
279290
# "postprocess".
@@ -456,5 +467,3 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
456467
cleanup_task_state()
457468
if 'CF_TASK_ID' in os.environ:
458469
del os.environ['CF_TASK_ID']
459-
if 'IS_FROM_QUEUE' in os.environ:
460-
del os.environ['IS_FROM_QUEUE']

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TrustedTask(BaseTask):
4646
def execute(self, task_argument, job_type, uworker_env):
4747
# Simple tasks can just use the environment they don't need the uworker env.
4848
del uworker_env
49+
assert not environment.is_tworker()
4950
self.module.execute_task(task_argument, job_type)
5051

5152

@@ -58,6 +59,8 @@ def execute(self, task_argument, job_type, uworker_env):
5859
raise NotImplementedError('Child class must implement.')
5960

6061
def execute_locally(self, task_argument, job_type, uworker_env):
62+
"""Executes the utask locally (on this machine, not on batch)."""
63+
assert not environment.is_tworker()
6164
uworker_input = utasks.tworker_preprocess_no_io(self.module, task_argument,
6265
job_type, uworker_env)
6366
if uworker_input is None:
@@ -119,7 +122,7 @@ class UTask(BaseUTask):
119122

120123
@staticmethod
121124
def is_execution_remote(command=None):
122-
return task_utils.is_remotely_executing_utasks(command)
125+
return task_utils.is_remotely_executing_utasks()
123126

124127
def execute(self, task_argument, job_type, uworker_env):
125128
"""Executes a utask."""
@@ -156,19 +159,6 @@ def preprocess(self, task_argument, job_type, uworker_env):
156159
return download_url
157160

158161

159-
# TODO(b/378684001): Remove this, it's needed for testing but is otherwise a bad
160-
# design.
161-
class UTaskMostlyLocalExecutor(UTask):
162-
163-
@staticmethod
164-
def is_execution_remote(command=None):
165-
del command
166-
if environment.get_value('IS_FROM_QUEUE'):
167-
logs.info('IS FROM QUEUE')
168-
return True
169-
return False
170-
171-
172162
class PostprocessTask(BaseTask):
173163
"""Represents postprocessing of an untrusted task."""
174164

@@ -211,7 +201,7 @@ def execute(self, task_argument, job_type, uworker_env):
211201
'analyze': UTask,
212202
'blame': TrustedTask,
213203
'corpus_pruning': UTask,
214-
'fuzz': UTaskMostlyLocalExecutor,
204+
'fuzz': UTaskLocalExecutor,
215205
'impact': TrustedTask,
216206
'minimize': UTask,
217207
'progression': UTask,

src/clusterfuzz/_internal/cron/schedule_fuzz.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
from googleapiclient import discovery
2222

23-
from clusterfuzz._internal.base import concurrency
2423
from clusterfuzz._internal.base import tasks
2524
from clusterfuzz._internal.base import utils
2625
from clusterfuzz._internal.config import local_config
@@ -167,14 +166,12 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
167166

168167
choices = random.choices(
169168
fuzz_task_candidates, weights=weights, k=num_instances)
170-
queues_to_tasks = collections.defaultdict(list)
171-
for fuzz_task_candidate in choices:
172-
queue_tasks = queues_to_tasks[fuzz_task_candidate.queue]
173-
174-
task = tasks.Task('fuzz', fuzz_task_candidate.fuzzer,
175-
fuzz_task_candidate.job)
176-
queue_tasks.append(task)
177-
return queues_to_tasks
169+
fuzz_tasks = [
170+
tasks.Task('fuzz', fuzz_task_candidate.fuzzer, fuzz_task_candidate.job)
171+
for fuzz_task_candidate in choices
172+
]
173+
# TODO(metzman): Remove the queue stuff if it's uneeded for Chrome.
174+
return fuzz_tasks
178175

179176

180177
def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]:
@@ -210,10 +207,8 @@ def schedule_fuzz_tasks() -> bool:
210207
logs.error('No fuzz tasks found to schedule.')
211208
return False
212209

213-
# TODO(b/378684001): Change this to using one queue when oss-fuzz's untrusted
214-
# worker model is deleted.
215-
with concurrency.make_pool() as pool:
216-
list(pool.map(bulk_add, fuzz_tasks.items()))
210+
logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
211+
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
217212
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')
218213

219214
end = time.time()
@@ -222,11 +217,5 @@ def schedule_fuzz_tasks() -> bool:
222217
return True
223218

224219

225-
def bulk_add(queue_and_tasks):
226-
queue, task_list = queue_and_tasks
227-
logs.info(f'Adding {task_list} to {queue}.')
228-
tasks.bulk_add_tasks(task_list, queue=queue, eta_now=True)
229-
230-
231220
def main():
232221
return schedule_fuzz_tasks()

src/clusterfuzz/_internal/system/environment.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,3 +1177,7 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id):
11771177
current_platform_id)
11781178

11791179
return False
1180+
1181+
1182+
def is_tworker():
1183+
return get_value('TWORKER', False)

0 commit comments

Comments
 (0)