Skip to content

Commit 2e0b5c6

Browse files
authored
feat(tasks): Implement OS-based task routing logic (#4995)
This change implements the core application logic to support routing ClusterFuzz tasks to different worker pools based on the target operating system. This is achieved via two main changes: 1. **Task Publishing (`tasks.add_task`):** The `add_task` function now determines the required `base_os_version` for a task by checking the `Job` and `OssFuzzProject` entities. This OS version is then added as a `base_os_version` attribute to the message published on Pub/Sub, effectively "tagging" the task for consumption by the correct worker fleet. 2. **Task Consuming (`get_*_task` functions):** The consumer functions (`get_preprocess_task`, `get_postprocess_task`, `get_utask_mains`) have been updated to be OS-aware. They now read a `BASE_OS_VERSION` environment variable (expected to be set in the Docker image). If this variable is present, they dynamically construct the name of the Pub/Sub subscription to pull from (e.g., `preprocess` becomes `preprocess-ubuntu-24-04`). Unit tests have been added to cover both the publishing and consuming logic. **Dependencies:** This change depends on corresponding infrastructure changes in the `clusterfuzz-config` repository, where the filtered Pub/Sub subscriptions (e.g., `preprocess-ubuntu-24-04`) are created. The code will fail to connect if the subscriptions do not exist. Fixes: [b/441792657](b/441792657)
1 parent 27b53e8 commit 2e0b5c6

File tree

2 files changed

+194
-6
lines changed

2 files changed

+194
-6
lines changed

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,16 @@ def default_queue_suffix():
123123
logs.info(f'QUEUE_OVERRIDE is [{queue_override}]. '
124124
f'Platform is {environment.platform()}')
125125
if queue_override:
126-
return queue_suffix_for_platform(queue_override)
126+
platform = queue_override
127+
else:
128+
platform = environment.platform()
129+
130+
platform_suffix = queue_suffix_for_platform(platform)
131+
base_os_version = environment.get_value('BASE_OS_VERSION')
132+
if base_os_version and 'LINUX' in platform.upper():
133+
platform_suffix = f'{platform_suffix}-{base_os_version}'
127134

128-
return queue_suffix_for_platform(environment.platform())
135+
return platform_suffix
129136

130137

131138
def regular_queue(prefix=JOBS_PREFIX):
@@ -296,7 +303,13 @@ def get_postprocess_task():
296303
# wasting our precious non-linux bots on generic postprocess tasks.
297304
if not environment.platform().lower() == 'linux':
298305
return None
299-
pubsub_puller = PubSubPuller(POSTPROCESS_QUEUE)
306+
307+
queue_name = POSTPROCESS_QUEUE
308+
base_os_version = environment.get_value('BASE_OS_VERSION')
309+
if base_os_version:
310+
queue_name = f'{queue_name}-{base_os_version}'
311+
312+
pubsub_puller = PubSubPuller(queue_name)
300313
logs.info('Pulling from postprocess queue')
301314
messages = pubsub_puller.get_messages(max_messages=1)
302315
if not messages:
@@ -312,7 +325,12 @@ def allow_all_tasks():
312325

313326

314327
def get_preprocess_task():
315-
pubsub_puller = PubSubPuller(PREPROCESS_QUEUE)
328+
queue_name = PREPROCESS_QUEUE
329+
base_os_version = environment.get_value('BASE_OS_VERSION')
330+
if base_os_version:
331+
queue_name = f'{queue_name}-{base_os_version}'
332+
333+
pubsub_puller = PubSubPuller(queue_name)
316334
messages = pubsub_puller.get_messages(max_messages=1)
317335
if not messages:
318336
return None
@@ -587,7 +605,12 @@ def get_task_from_message(message, queue=None, can_defer=True,
587605
def get_utask_mains() -> List[PubSubTask]:
588606
"""Returns a list of tasks for preprocessing many utasks on this bot and then
589607
running the uworker_mains in the same batch job."""
590-
pubsub_puller = PubSubPuller(UTASK_MAIN_QUEUE)
608+
queue_name = UTASK_MAIN_QUEUE
609+
base_os_version = environment.get_value('BASE_OS_VERSION')
610+
if base_os_version:
611+
queue_name = f'{queue_name}-{base_os_version}'
612+
613+
pubsub_puller = PubSubPuller(queue_name)
591614
messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS,
592615
UTASK_QUEUE_PULL_SECONDS)
593616
return handle_multiple_utask_main_messages(messages, UTASK_MAIN_QUEUE)
@@ -758,6 +781,18 @@ def add_task(command,
758781
if not job:
759782
raise Error(f'Job {job_type} not found.')
760783

784+
# Determine base_os_version.
785+
base_os_version = job.base_os_version
786+
if job.is_external():
787+
oss_fuzz_project = data_types.OssFuzzProject.get_by_id(job.project)
788+
if oss_fuzz_project and oss_fuzz_project.base_os_version:
789+
base_os_version = oss_fuzz_project.base_os_version
790+
791+
if base_os_version:
792+
if extra_info is None:
793+
extra_info = {}
794+
extra_info['base_os_version'] = base_os_version
795+
761796
if job.is_external():
762797
external_tasks.add_external_task(command, argument, job)
763798
return

src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
"""Tests for tasks."""
15-
1615
import unittest
1716
from unittest import mock
1817

1918
from clusterfuzz._internal.base import tasks
19+
from clusterfuzz._internal.datastore import data_types
20+
from clusterfuzz._internal.tests.test_libs import test_utils
2021

2122

2223
class InitializeTaskTest(unittest.TestCase):
@@ -296,3 +297,155 @@ def test_set_queue(self):
296297
task = tasks.get_task_from_message(mock.Mock())
297298

298299
self.assertEqual(task.queue, mock_queue)
300+
301+
302+
@test_utils.with_cloud_emulators('datastore')
303+
@mock.patch('clusterfuzz._internal.base.tasks.bulk_add_tasks')
304+
@mock.patch('clusterfuzz._internal.base.external_tasks.add_external_task')
305+
class AddTaskTest(unittest.TestCase):
306+
"""Tests for add_task."""
307+
308+
def setUp(self):
309+
self.oss_fuzz_project = data_types.OssFuzzProject(
310+
name='d8', base_os_version='ubuntu-24-04')
311+
self.oss_fuzz_project.put()
312+
313+
@mock.patch('clusterfuzz._internal.base.tasks.data_types.Job.query')
314+
def test_add_task_internal_job_with_os_version(
315+
self, mock_job_query, mock_add_external, mock_bulk_add):
316+
"""Test add_task with an internal job and an OS version."""
317+
mock_job = mock.MagicMock()
318+
mock_job.base_os_version = 'ubuntu-20-04'
319+
mock_job.project = 'd8'
320+
mock_job.is_external.return_value = False
321+
mock_job_query.return_value.get.return_value = mock_job
322+
323+
tasks.add_task('regression', '123', 'linux_asan_d8_dbg')
324+
325+
mock_add_external.assert_not_called()
326+
mock_bulk_add.assert_called_once()
327+
task_payload = mock_bulk_add.call_args[0][0][0]
328+
self.assertEqual(task_payload.extra_info['base_os_version'], 'ubuntu-20-04')
329+
330+
@mock.patch('clusterfuzz._internal.base.tasks.data_types.Job.query')
331+
def test_add_task_external_job_with_os_version(
332+
self, mock_job_query, mock_add_external, mock_bulk_add):
333+
"""Test add_task with an external (OSS-Fuzz) job and an OS version."""
334+
mock_job = mock.MagicMock()
335+
mock_job.base_os_version = 'ubuntu-20-04'
336+
mock_job.project = 'd8'
337+
mock_job.is_external.return_value = True
338+
mock_job_query.return_value.get.return_value = mock_job
339+
340+
tasks.add_task('regression', '123', 'linux_asan_d8_dbg')
341+
342+
mock_bulk_add.assert_not_called()
343+
mock_add_external.assert_called_once()
344+
345+
346+
@mock.patch('clusterfuzz._internal.base.tasks.PubSubPuller')
347+
@mock.patch('clusterfuzz._internal.system.environment.get_value')
348+
class GetTaskQueueSelectionTest(unittest.TestCase):
349+
"""Tests for dynamic queue selection in get_*_task functions."""
350+
351+
def test_get_preprocess_task_without_os_version(self, mock_env_get,
352+
mock_puller):
353+
"""Tests that get_preprocess_task selects the default queue."""
354+
mock_puller.return_value.get_messages.return_value = []
355+
mock_env_get.return_value = None
356+
tasks.get_preprocess_task()
357+
mock_puller.assert_called_with('preprocess')
358+
359+
def test_get_preprocess_task_with_os_version(self, mock_env_get, mock_puller):
360+
"""Tests that get_preprocess_task selects the suffixed queue."""
361+
mock_puller.return_value.get_messages.return_value = []
362+
mock_env_get.return_value = 'ubuntu-24-04'
363+
tasks.get_preprocess_task()
364+
mock_puller.assert_called_with('preprocess-ubuntu-24-04')
365+
366+
@mock.patch(
367+
'clusterfuzz._internal.base.tasks.task_utils.is_remotely_executing_utasks'
368+
)
369+
def test_get_postprocess_task_without_os_version(self, mock_is_remote,
370+
mock_env_get, mock_puller):
371+
"""Tests that get_postprocess_task selects the default queue."""
372+
mock_is_remote.return_value = True
373+
mock_puller.return_value.get_messages.return_value = []
374+
with mock.patch(
375+
'clusterfuzz._internal.system.environment.platform') as mock_platform:
376+
mock_platform.return_value.lower.return_value = 'linux'
377+
mock_env_get.return_value = None
378+
tasks.get_postprocess_task()
379+
mock_puller.assert_called_with('postprocess')
380+
381+
@mock.patch(
382+
'clusterfuzz._internal.base.tasks.task_utils.is_remotely_executing_utasks'
383+
)
384+
def test_get_postprocess_task_with_os_version(self, mock_is_remote,
385+
mock_env_get, mock_puller):
386+
"""Tests that get_postprocess_task selects the suffixed queue."""
387+
mock_is_remote.return_value = True
388+
mock_puller.return_value.get_messages.return_value = []
389+
with mock.patch(
390+
'clusterfuzz._internal.system.environment.platform') as mock_platform:
391+
mock_platform.return_value.lower.return_value = 'linux'
392+
mock_env_get.return_value = 'ubuntu-24-04'
393+
tasks.get_postprocess_task()
394+
mock_puller.assert_called_with('postprocess-ubuntu-24-04')
395+
396+
def test_get_utask_mains_without_os_version(self, mock_env_get, mock_puller):
397+
"""Tests that get_utask_mains selects the default queue."""
398+
mock_puller.return_value.get_messages_time_limited.return_value = []
399+
mock_env_get.return_value = None
400+
tasks.get_utask_mains()
401+
mock_puller.assert_called_with('utask_main')
402+
403+
def test_get_utask_mains_with_os_version(self, mock_env_get, mock_puller):
404+
"""Tests that get_utask_mains selects the suffixed queue."""
405+
mock_puller.return_value.get_messages_time_limited.return_value = []
406+
mock_env_get.return_value = 'ubuntu-24-04'
407+
tasks.get_utask_mains()
408+
mock_puller.assert_called_with('utask_main-ubuntu-24-04')
409+
410+
411+
@mock.patch('clusterfuzz._internal.system.environment.get_value')
412+
@mock.patch('clusterfuzz._internal.system.environment.platform')
413+
class QueueNameGenerationTest(unittest.TestCase):
414+
"""Tests for queue name generation functions."""
415+
416+
def test_default_queue_suffix_linux_no_os_version(self, mock_platform,
417+
mock_env_get):
418+
"""Tests queue suffix for Linux without an OS version."""
419+
mock_env_get.side_effect = lambda key, default='': {'QUEUE_OVERRIDE': ''}.get(key, default)
420+
mock_platform.return_value = 'LINUX'
421+
self.assertEqual(tasks.default_queue_suffix(), '-linux')
422+
423+
def test_default_queue_suffix_linux_with_os_version(self, mock_platform,
424+
mock_env_get):
425+
"""Tests queue suffix for Linux with an OS version."""
426+
mock_env_get.side_effect = lambda key, default='': {
427+
'BASE_OS_VERSION': 'ubuntu-24-04',
428+
'QUEUE_OVERRIDE': ''
429+
}.get(key, default)
430+
mock_platform.return_value = 'LINUX'
431+
self.assertEqual(tasks.default_queue_suffix(), '-linux-ubuntu-24-04')
432+
433+
def test_default_queue_suffix_mac_no_os_version(self, mock_platform,
434+
mock_env_get):
435+
"""Tests queue suffix for Mac without an OS version."""
436+
mock_env_get.side_effect = lambda key, default='': {
437+
'BASE_OS_VERSION': '',
438+
'QUEUE_OVERRIDE': ''
439+
}.get(key, default)
440+
mock_platform.return_value = 'MAC'
441+
self.assertEqual(tasks.default_queue_suffix(), '-mac')
442+
443+
def test_default_queue_suffix_mac_with_os_version(self, mock_platform,
444+
mock_env_get):
445+
"""Tests queue suffix for Mac with an OS version (should be ignored)."""
446+
mock_env_get.side_effect = lambda key, default='': {
447+
'BASE_OS_VERSION': 'ubuntu-24-04',
448+
'QUEUE_OVERRIDE': ''
449+
}.get(key, default)
450+
mock_platform.return_value = 'MAC'
451+
self.assertEqual(tasks.default_queue_suffix(), '-mac')

0 commit comments

Comments
 (0)