diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ea0abd261f..c7be1c7493 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -710,12 +710,23 @@ def run(self): def add_utask_main(command, input_url, job_type, wait_time=None): """Adds the utask_main portion of a utask to the utasks queue for scheduling on batch. This should only be done after preprocessing.""" + # The platform of the job is loaded into the environment when this is called. + platform = environment.get_value('PLATFORM') + + if platform.upper() == 'LINUX': + queue = UTASK_MAIN_QUEUE + else: + thread_multiplier = environment.get_value('THREAD_MULTIPLIER') + is_high_end = thread_multiplier and thread_multiplier > 1 + queue = queue_for_platform( + platform, is_high_end=is_high_end, force_true_queue=True) + initial_command = environment.get_value('TASK_PAYLOAD') add_task( command, input_url, job_type, - queue=UTASK_MAIN_QUEUE, + queue=queue, wait_time=wait_time, extra_info={'initial_command': initial_command}) @@ -787,9 +798,9 @@ def full_utask_task_model() -> bool: return local_config.ProjectConfig().get('full_utask_model.enabled', False) -def queue_for_platform(platform, is_high_end=False): +def queue_for_platform(platform, is_high_end=False, force_true_queue=False): """Return the queue for the platform.""" - if full_utask_task_model(): + if full_utask_task_model() and not force_true_queue: return PREPROCESS_QUEUE prefix = HIGH_END_JOBS_PREFIX if is_high_end else JOBS_PREFIX return prefix + queue_suffix_for_platform(platform) @@ -802,13 +813,13 @@ def queue_for_testcase(testcase): return queue_for_job(testcase.job_type, is_high_end=is_high_end) -def queue_for_job(job_name, is_high_end=False): +def queue_for_job(job_name, is_high_end=False, force_true_queue=False): """Queue for job.""" job = data_types.Job.query(data_types.Job.name == job_name).get() if not job: raise Error('Job {} not found.'.format(job_name)) - return queue_for_platform(job.platform, is_high_end) + return queue_for_platform(job.platform, is_high_end, force_true_queue) def redo_testcase(testcase, tasks, user_email): diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index b66f1764c2..b42e21aee9 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -296,3 +296,107 @@ def test_set_queue(self): task = tasks.get_task_from_message(mock.Mock()) self.assertEqual(task.queue, mock_queue) + + +class AddUTaskMainTest(unittest.TestCase): + """Tests for add_utask_main.""" + + def setUp(self): + self.mock_add_task = mock.patch( + 'clusterfuzz._internal.base.tasks.add_task').start() + self.mock_environment = mock.patch( + 'clusterfuzz._internal.system.environment.get_value').start() + self.mock_queue_for_platform = mock.patch( + 'clusterfuzz._internal.base.tasks.queue_for_platform', + return_value='jobs-windows').start() + + def tearDown(self): + self.mock_add_task.stop() + self.mock_environment.stop() + self.mock_queue_for_platform.stop() + + def test_add_utask_main_linux(self): + """Test that linux jobs are added to the utask_main queue.""" + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'LINUX', + 'TASK_PAYLOAD': 'initial_command' + }.get(key, default) + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue=tasks.UTASK_MAIN_QUEUE, + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + + def test_add_utask_main_non_linux(self): + """Test that non-linux jobs are added to their specific queue.""" + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'WINDOWS', + 'TASK_PAYLOAD': 'initial_command', + 'THREAD_MULTIPLIER': 1 + }.get(key, default) + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue='jobs-windows', + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + self.mock_queue_for_platform.assert_called_with( + 'WINDOWS', is_high_end=False, force_true_queue=True) + + def test_add_utask_main_non_linux_high_end(self): + """Test that non-linux high-end jobs are added to their specific queue.""" + self.mock_environment.side_effect = \ + lambda key, default=None: { + 'PLATFORM': 'WINDOWS', + 'TASK_PAYLOAD': 'initial_command', + 'THREAD_MULTIPLIER': 2 + }.get(key, default) + self.mock_queue_for_platform.return_value = 'high-end-jobs-windows' + tasks.add_utask_main('command', 'input_url', 'job_type') + self.mock_add_task.assert_called_with( + 'command', + 'input_url', + 'job_type', + queue='high-end-jobs-windows', + wait_time=None, + extra_info={'initial_command': 'initial_command'}) + self.mock_queue_for_platform.assert_called_with( + 'WINDOWS', is_high_end=True, force_true_queue=True) + + +class QueueForJobTest(unittest.TestCase): + """Tests for queue_for_job.""" + + def setUp(self): + self.mock_job = mock.MagicMock() + self.mock_job_query = mock.patch( + 'clusterfuzz._internal.datastore.data_types.Job.query', + return_value=mock.MagicMock(get=lambda: self.mock_job)) + self.mock_full_utask_task_model = mock.patch( + 'clusterfuzz._internal.base.tasks.full_utask_task_model').start() + self.mock_job_query.start() + + def tearDown(self): + self.mock_job_query.stop() + self.mock_full_utask_task_model.stop() + + def test_queue_for_job_force_true_queue(self): + """Test that force_true_queue gets the true queue.""" + self.mock_job.platform = 'WINDOWS' + self.mock_full_utask_task_model.return_value = True + queue = tasks.queue_for_job('job_type', force_true_queue=True) + self.assertEqual(queue, 'jobs-windows') + + def test_queue_for_job_no_force(self): + """Test that no force gets the preprocess queue.""" + self.mock_job.platform = 'WINDOWS' + self.mock_full_utask_task_model.return_value = True + queue = tasks.queue_for_job('job_type') + self.assertEqual(queue, tasks.PREPROCESS_QUEUE)