From 9796861a5b3e82c162211fce5a4035a2cd95c19f Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 8 Aug 2025 17:57:43 +0000 Subject: [PATCH 1/2] I've updated the code to route non-Linux tasks to their correct queue. Here's a summary of the changes: I modified the task queuing logic for `utask_main` tasks. Previously, all `utask_main` tasks were sent to the `UTASK_MAIN_QUEUE`, which was problematic for non-Linux jobs that need to run on specific platforms. I've updated `add_utask_main` to check the job's platform. If the platform isn't Linux, it now uses `queue_for_job` with a new `force_true_queue` parameter to find the correct, platform-specific queue. This ensures that non-Linux `utask_main` tasks are routed to the appropriate bots. Linux jobs will continue to use the `UTASK_MAIN_QUEUE` as before. Finally, I added tests to verify the new queuing logic. --- .../_internal/base/tasks/__init__.py | 22 ++++-- .../tests/core/base/tasks/tasks_test.py | 76 +++++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ea0abd261f..07f0659637 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -710,12 +710,24 @@ def run(self): def add_utask_main(command, input_url, job_type, wait_time=None): """Adds the utask_main portion of a utask to the utasks queue for scheduling on batch. This should only be done after preprocessing.""" + job = data_types.Job.query(data_types.Job.name == job_type).get() + if not job: + raise Error(f'Job {job_type} not found.') + + # If the job is a linux job, we want to use the generic utask_main queue. + # Otherwise, we should use the job's dedicated queue so that it runs on a bot + # with the correct platform. + if job.platform.upper() == 'LINUX': + queue = UTASK_MAIN_QUEUE + else: + queue = queue_for_job(job_type, force_true_queue=True) + initial_command = environment.get_value('TASK_PAYLOAD') add_task( command, input_url, job_type, - queue=UTASK_MAIN_QUEUE, + queue=queue, wait_time=wait_time, extra_info={'initial_command': initial_command}) @@ -787,9 +799,9 @@ def full_utask_task_model() -> bool: return local_config.ProjectConfig().get('full_utask_model.enabled', False) -def queue_for_platform(platform, is_high_end=False): +def queue_for_platform(platform, is_high_end=False, force_true_queue=False): """Return the queue for the platform.""" - if full_utask_task_model(): + if full_utask_task_model() and not force_true_queue: return PREPROCESS_QUEUE prefix = HIGH_END_JOBS_PREFIX if is_high_end else JOBS_PREFIX return prefix + queue_suffix_for_platform(platform) @@ -802,13 +814,13 @@ def queue_for_testcase(testcase): return queue_for_job(testcase.job_type, is_high_end=is_high_end) -def queue_for_job(job_name, is_high_end=False): +def queue_for_job(job_name, is_high_end=False, force_true_queue=False): """Queue for job.""" job = data_types.Job.query(data_types.Job.name == job_name).get() if not job: raise Error('Job {} not found.'.format(job_name)) - return queue_for_platform(job.platform, is_high_end) + return queue_for_platform(job.platform, is_high_end, force_true_queue) def redo_testcase(testcase, tasks, user_email): diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index b66f1764c2..a10f318378 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -296,3 +296,79 @@ def test_set_queue(self): task = tasks.get_task_from_message(mock.Mock()) self.assertEqual(task.queue, mock_queue) + + +class AddUTaskMainTest(unittest.TestCase): + """Tests for add_utask_main.""" + + def setUp(self): + self.mock_job = mock.MagicMock() + self.mock_job_query = mock.patch( + 'clusterfuzz._internal.datastore.data_types.Job.query', + return_value=mock.MagicMock(get=lambda: self.mock_job)) + self.mock_add_task = mock.patch( + 'clusterfuzz._internal.base.tasks.add_task').start() + self.mock_environment = mock.patch( + 'clusterfuzz._internal.system.environment.get_value', + return_value='initial_command').start() + self.mock_job_query.start() + + def tearDown(self): + self.mock_job_query.stop() + self.mock_add_task.stop() + self.mock_environment.stop() + + def test_add_utask_main_linux(self): + """Test that linux jobs are added to the utask_main queue.""" + self.mock_job.platform = 'LINUX' + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue=tasks.UTASK_MAIN_QUEUE, + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + + def test_add_utask_main_non_linux(self): + """Test that non-linux jobs are added to their specific queue.""" + self.mock_job.platform = 'WINDOWS' + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue='jobs-windows', + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + + +class QueueForJobTest(unittest.TestCase): + """Tests for queue_for_job.""" + + def setUp(self): + self.mock_job = mock.MagicMock() + self.mock_job_query = mock.patch( + 'clusterfuzz._internal.datastore.data_types.Job.query', + return_value=mock.MagicMock(get=lambda: self.mock_job)) + self.mock_full_utask_task_model = mock.patch( + 'clusterfuzz._internal.base.tasks.full_utask_task_model').start() + self.mock_job_query.start() + + def tearDown(self): + self.mock_job_query.stop() + self.mock_full_utask_task_model.stop() + + def test_queue_for_job_force_true_queue(self): + """Test that force_true_queue gets the true queue.""" + self.mock_job.platform = 'WINDOWS' + self.mock_full_utask_task_model.return_value = True + queue = tasks.queue_for_job('job_type', force_true_queue=True) + self.assertEqual(queue, 'jobs-windows') + + def test_queue_for_job_no_force(self): + """Test that no force gets the preprocess queue.""" + self.mock_job.platform = 'WINDOWS' + self.mock_full_utask_task_model.return_value = True + queue = tasks.queue_for_job('job_type') + self.assertEqual(queue, tasks.PREPROCESS_QUEUE) From 80c82eec14cdd6607b69761052ab7fdf8eeb561c Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 8 Aug 2025 18:21:21 +0000 Subject: [PATCH 2/2] I've updated the code to route non-Linux `utask`s to their correct queue. I modified the task queuing logic for `utask_main` tasks. Previously, all `utask_main` tasks were sent to the `UTASK_MAIN_QUEUE`, which was problematic for non-Linux jobs that need to run on bots with the correct platform. Now, I've updated `add_utask_main` to get the platform from the environment and use `queue_for_platform` to get the correct, platform-specific queue. This ensures that non-Linux `utask_main` tasks are routed to the correct bots. Linux jobs will continue to use the `UTASK_MAIN_QUEUE` as before. I also updated the tests to verify the new queuing logic. --- .../_internal/base/tasks/__init__.py | 15 +++--- .../tests/core/base/tasks/tasks_test.py | 48 +++++++++++++++---- 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 07f0659637..c7be1c7493 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -710,17 +710,16 @@ def run(self): def add_utask_main(command, input_url, job_type, wait_time=None): """Adds the utask_main portion of a utask to the utasks queue for scheduling on batch. This should only be done after preprocessing.""" - job = data_types.Job.query(data_types.Job.name == job_type).get() - if not job: - raise Error(f'Job {job_type} not found.') + # The platform of the job is loaded into the environment when this is called. + platform = environment.get_value('PLATFORM') - # If the job is a linux job, we want to use the generic utask_main queue. - # Otherwise, we should use the job's dedicated queue so that it runs on a bot - # with the correct platform. - if job.platform.upper() == 'LINUX': + if platform.upper() == 'LINUX': queue = UTASK_MAIN_QUEUE else: - queue = queue_for_job(job_type, force_true_queue=True) + thread_multiplier = environment.get_value('THREAD_MULTIPLIER') + is_high_end = thread_multiplier and thread_multiplier > 1 + queue = queue_for_platform( + platform, is_high_end=is_high_end, force_true_queue=True) initial_command = environment.get_value('TASK_PAYLOAD') add_task( diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index a10f318378..b42e21aee9 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -302,25 +302,26 @@ class AddUTaskMainTest(unittest.TestCase): """Tests for add_utask_main.""" def setUp(self): - self.mock_job = mock.MagicMock() - self.mock_job_query = mock.patch( - 'clusterfuzz._internal.datastore.data_types.Job.query', - return_value=mock.MagicMock(get=lambda: self.mock_job)) self.mock_add_task = mock.patch( 'clusterfuzz._internal.base.tasks.add_task').start() self.mock_environment = mock.patch( - 'clusterfuzz._internal.system.environment.get_value', - return_value='initial_command').start() - self.mock_job_query.start() + 'clusterfuzz._internal.system.environment.get_value').start() + self.mock_queue_for_platform = mock.patch( + 'clusterfuzz._internal.base.tasks.queue_for_platform', + return_value='jobs-windows').start() def tearDown(self): - self.mock_job_query.stop() self.mock_add_task.stop() self.mock_environment.stop() + self.mock_queue_for_platform.stop() def test_add_utask_main_linux(self): """Test that linux jobs are added to the utask_main queue.""" - self.mock_job.platform = 'LINUX' + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'LINUX', + 'TASK_PAYLOAD': 'initial_command' + }.get(key, default) tasks.add_utask_main('command', 'input_url', 'job_type') self.mock_add_task.assert_called_with( 'command', @@ -332,7 +333,12 @@ def test_add_utask_main_linux(self): def test_add_utask_main_non_linux(self): """Test that non-linux jobs are added to their specific queue.""" - self.mock_job.platform = 'WINDOWS' + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'WINDOWS', + 'TASK_PAYLOAD': 'initial_command', + 'THREAD_MULTIPLIER': 1 + }.get(key, default) tasks.add_utask_main('command', 'input_url', 'job_type') self.mock_add_task.assert_called_with( 'command', @@ -341,6 +347,28 @@ def test_add_utask_main_non_linux(self): queue='jobs-windows', wait_time=None, extra_info={'initial_command': 'initial_command'}) + self.mock_queue_for_platform.assert_called_with( + 'WINDOWS', is_high_end=False, force_true_queue=True) + + def test_add_utask_main_non_linux_high_end(self): + """Test that non-linux high-end jobs are added to their specific queue.""" + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'WINDOWS', + 'TASK_PAYLOAD': 'initial_command', + 'THREAD_MULTIPLIER': 2 + }.get(key, default) + self.mock_queue_for_platform.return_value = 'high-end-jobs-windows' + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue='high-end-jobs-windows', + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + self.mock_queue_for_platform.assert_called_with( + 'WINDOWS', is_high_end=True, force_true_queue=True) class QueueForJobTest(unittest.TestCase):