diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ea0abd261f5..236ffe84fe6 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -123,9 +123,16 @@ def default_queue_suffix(): logs.info(f'QUEUE_OVERRIDE is [{queue_override}]. ' f'Platform is {environment.platform()}') if queue_override: - return queue_suffix_for_platform(queue_override) + platform = queue_override + else: + platform = environment.platform() + + platform_suffix = queue_suffix_for_platform(platform) + base_os_version = environment.get_value('BASE_OS_VERSION') + if base_os_version and 'LINUX' in platform.upper(): + platform_suffix = f'{platform_suffix}-{base_os_version}' - return queue_suffix_for_platform(environment.platform()) + return platform_suffix def regular_queue(prefix=JOBS_PREFIX): @@ -296,7 +303,13 @@ def get_postprocess_task(): # wasting our precious non-linux bots on generic postprocess tasks. if not environment.platform().lower() == 'linux': return None - pubsub_puller = PubSubPuller(POSTPROCESS_QUEUE) + + queue_name = POSTPROCESS_QUEUE + base_os_version = environment.get_value('BASE_OS_VERSION') + if base_os_version: + queue_name = f'{queue_name}-{base_os_version}' + + pubsub_puller = PubSubPuller(queue_name) logs.info('Pulling from postprocess queue') messages = pubsub_puller.get_messages(max_messages=1) if not messages: @@ -312,7 +325,12 @@ def allow_all_tasks(): def get_preprocess_task(): - pubsub_puller = PubSubPuller(PREPROCESS_QUEUE) + queue_name = PREPROCESS_QUEUE + base_os_version = environment.get_value('BASE_OS_VERSION') + if base_os_version: + queue_name = f'{queue_name}-{base_os_version}' + + pubsub_puller = PubSubPuller(queue_name) messages = pubsub_puller.get_messages(max_messages=1) if not messages: return None @@ -587,7 +605,12 @@ def get_task_from_message(message, queue=None, can_defer=True, def get_utask_mains() -> List[PubSubTask]: """Returns a list of tasks for preprocessing many utasks on this bot and then running the uworker_mains in the same batch job.""" - pubsub_puller = PubSubPuller(UTASK_MAIN_QUEUE) + queue_name = UTASK_MAIN_QUEUE + base_os_version = environment.get_value('BASE_OS_VERSION') + if base_os_version: + queue_name = f'{queue_name}-{base_os_version}' + + pubsub_puller = PubSubPuller(queue_name) messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS, UTASK_QUEUE_PULL_SECONDS) return handle_multiple_utask_main_messages(messages, UTASK_MAIN_QUEUE) @@ -758,6 +781,18 @@ def add_task(command, if not job: raise Error(f'Job {job_type} not found.') + # Determine base_os_version. + base_os_version = job.base_os_version + if job.is_external(): + oss_fuzz_project = data_types.OssFuzzProject.get_by_id(job.project) + if oss_fuzz_project and oss_fuzz_project.base_os_version: + base_os_version = oss_fuzz_project.base_os_version + + if base_os_version: + if extra_info is None: + extra_info = {} + extra_info['base_os_version'] = base_os_version + if job.is_external(): external_tasks.add_external_task(command, argument, job) return diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index b66f1764c2d..1e51434b661 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. """Tests for tasks.""" - import unittest from unittest import mock from clusterfuzz._internal.base import tasks +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils import pubsub +from clusterfuzz._internal.tests.test_libs import test_utils class InitializeTaskTest(unittest.TestCase): @@ -296,3 +298,145 @@ def test_set_queue(self): task = tasks.get_task_from_message(mock.Mock()) self.assertEqual(task.queue, mock_queue) + + +@test_utils.with_cloud_emulators('datastore') +@mock.patch('clusterfuzz._internal.base.tasks.bulk_add_tasks') +@mock.patch('clusterfuzz._internal.base.external_tasks.add_external_task') +class AddTaskTest(unittest.TestCase): + """Tests for add_task.""" + + def setUp(self): + self.oss_fuzz_project = data_types.OssFuzzProject( + name='d8', base_os_version='ubuntu-24-04') + self.oss_fuzz_project.put() + + @mock.patch('clusterfuzz._internal.base.tasks.data_types.Job.query') + def test_add_task_with_os_version(self, mock_job_query, mock_add_external, + mock_bulk_add): + """Test that the base_os_version attribute is correctly added.""" + mock_job = mock.MagicMock() + mock_job.base_os_version = 'ubuntu-20-04' + mock_job.project = 'd8' + mock_job_query.return_value.get.return_value = mock_job + + # Scenario 1: Not an external job. Should use the job's OS version and + # call bulk_add_tasks. + mock_job.is_external.return_value = False + tasks.add_task('regression', '123', 'linux_asan_d8_dbg') + mock_add_external.assert_not_called() + mock_bulk_add.assert_called() + task_payload = mock_bulk_add.call_args[0][0][0] + self.assertEqual(task_payload.extra_info['base_os_version'], 'ubuntu-20-04') + + mock_bulk_add.reset_mock() + mock_add_external.reset_mock() + + # Scenario 2: External (OSS-Fuzz) job. Should use the project's OS version + # and call add_external_task. + mock_job.is_external.return_value = True + tasks.add_task('regression', '123', 'linux_asan_d8_dbg') + mock_bulk_add.assert_not_called() + mock_add_external.assert_called() + + +@mock.patch('clusterfuzz._internal.base.tasks.PubSubPuller') +@mock.patch('clusterfuzz._internal.system.environment.get_value') +class GetTaskQueueSelectionTest(unittest.TestCase): + """Tests for dynamic queue selection in get_*_task functions.""" + + def test_get_preprocess_task_queue_selection(self, mock_env_get, mock_puller): + """Tests that get_preprocess_task selects the correct queue.""" + mock_puller.return_value.get_messages.return_value = [] + + # Scenario 1: No OS version ENV. Should use the default queue. + mock_env_get.return_value = None + tasks.get_preprocess_task() + mock_puller.assert_called_with('preprocess') + + mock_puller.reset_mock() + + # Scenario 2: With OS version ENV. Should use the suffixed queue. + mock_env_get.return_value = 'ubuntu-24-04' + tasks.get_preprocess_task() + mock_puller.assert_called_with('preprocess-ubuntu-24-04') + + @mock.patch( + 'clusterfuzz._internal.base.tasks.task_utils.is_remotely_executing_utasks' + ) + def test_get_postprocess_task_queue_selection(self, mock_is_remote, + mock_env_get, mock_puller): + """Tests that get_postprocess_task selects the correct queue.""" + mock_is_remote.return_value = True + mock_puller.return_value.get_messages.return_value = [] + with mock.patch( + 'clusterfuzz._internal.system.environment.platform') as mock_platform: + mock_platform.return_value.lower.return_value = 'linux' + + # Scenario 1: No OS version ENV. + mock_env_get.return_value = None + tasks.get_postprocess_task() + mock_puller.assert_called_with('postprocess') + + mock_puller.reset_mock() + + # Scenario 2: With OS version ENV. + mock_env_get.return_value = 'ubuntu-24-04' + tasks.get_postprocess_task() + mock_puller.assert_called_with('postprocess-ubuntu-24-04') + + def test_get_utask_mains_queue_selection(self, mock_env_get, mock_puller): + """Tests that get_utask_mains selects the correct queue.""" + mock_puller.return_value.get_messages_time_limited.return_value = [] + + # Scenario 1: No OS version ENV. + mock_env_get.return_value = None + tasks.get_utask_mains() + mock_puller.assert_called_with('utask_main') + + mock_puller.reset_mock() + + # Scenario 2: With OS version ENV. + mock_env_get.return_value = 'ubuntu-24-04' + tasks.get_utask_mains() + mock_puller.assert_called_with('utask_main-ubuntu-24-04') + + +@mock.patch('clusterfuzz._internal.system.environment.get_value') +@mock.patch('clusterfuzz._internal.system.environment.platform') +class QueueNameGenerationTest(unittest.TestCase): + """Tests for queue name generation functions.""" + + def test_default_queue_suffix_generation(self, mock_platform, mock_env_get): + """Tests the logic of default_queue_suffix.""" + # Mock QUEUE_OVERRIDE to be unset. + mock_env_get.side_effect = lambda key, default='': { + 'BASE_OS_VERSION': '', + 'QUEUE_OVERRIDE': '' + }.get(key, default) + + # Scenario 1: Linux platform, no OS version. + mock_platform.return_value = 'LINUX' + self.assertEqual(tasks.default_queue_suffix(), '-linux') + + # Scenario 2: Linux platform, with OS version. + mock_env_get.side_effect = lambda key, default='': { + 'BASE_OS_VERSION': 'ubuntu-24-04', + 'QUEUE_OVERRIDE': '' + }.get(key, default) + self.assertEqual(tasks.default_queue_suffix(), '-linux-ubuntu-24-04') + + # Scenario 3: Mac platform, no OS version. + mock_platform.return_value = 'MAC' + mock_env_get.side_effect = lambda key, default='': { + 'BASE_OS_VERSION': '', + 'QUEUE_OVERRIDE': '' + }.get(key, default) + self.assertEqual(tasks.default_queue_suffix(), '-mac') + + # Scenario 4: Mac platform, with OS version (should be ignored). + mock_env_get.side_effect = lambda key, default='': { + 'BASE_OS_VERSION': 'ubuntu-24-04', + 'QUEUE_OVERRIDE': '' + }.get(key, default) + self.assertEqual(tasks.default_queue_suffix(), '-mac')