Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,23 @@ def run(self):
def add_utask_main(command, input_url, job_type, wait_time=None):
"""Adds the utask_main portion of a utask to the utasks queue for scheduling
on batch. This should only be done after preprocessing."""
# The platform of the job is loaded into the environment when this is called.
platform = environment.get_value('PLATFORM')

if platform.upper() == 'LINUX':
queue = UTASK_MAIN_QUEUE
else:
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that still used?

is_high_end = thread_multiplier and thread_multiplier > 1
queue = queue_for_platform(
platform, is_high_end=is_high_end, force_true_queue=True)

initial_command = environment.get_value('TASK_PAYLOAD')
add_task(
command,
input_url,
job_type,
queue=UTASK_MAIN_QUEUE,
queue=queue,
wait_time=wait_time,
extra_info={'initial_command': initial_command})

Expand Down Expand Up @@ -787,9 +798,9 @@ def full_utask_task_model() -> bool:
return local_config.ProjectConfig().get('full_utask_model.enabled', False)


def queue_for_platform(platform, is_high_end=False):
def queue_for_platform(platform, is_high_end=False, force_true_queue=False):
"""Return the queue for the platform."""
if full_utask_task_model():
if full_utask_task_model() and not force_true_queue:
return PREPROCESS_QUEUE
prefix = HIGH_END_JOBS_PREFIX if is_high_end else JOBS_PREFIX
return prefix + queue_suffix_for_platform(platform)
Expand All @@ -802,13 +813,13 @@ def queue_for_testcase(testcase):
return queue_for_job(testcase.job_type, is_high_end=is_high_end)


def queue_for_job(job_name, is_high_end=False):
def queue_for_job(job_name, is_high_end=False, force_true_queue=False):
"""Queue for job."""
job = data_types.Job.query(data_types.Job.name == job_name).get()
if not job:
raise Error('Job {} not found.'.format(job_name))

return queue_for_platform(job.platform, is_high_end)
return queue_for_platform(job.platform, is_high_end, force_true_queue)


def redo_testcase(testcase, tasks, user_email):
Expand Down
104 changes: 104 additions & 0 deletions src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,107 @@ def test_set_queue(self):
task = tasks.get_task_from_message(mock.Mock())

self.assertEqual(task.queue, mock_queue)


class AddUTaskMainTest(unittest.TestCase):
"""Tests for add_utask_main."""

def setUp(self):
self.mock_add_task = mock.patch(
'clusterfuzz._internal.base.tasks.add_task').start()
self.mock_environment = mock.patch(
'clusterfuzz._internal.system.environment.get_value').start()
self.mock_queue_for_platform = mock.patch(
'clusterfuzz._internal.base.tasks.queue_for_platform',
return_value='jobs-windows').start()

def tearDown(self):
self.mock_add_task.stop()
self.mock_environment.stop()
self.mock_queue_for_platform.stop()

def test_add_utask_main_linux(self):
"""Test that linux jobs are added to the utask_main queue."""
self.mock_environment.side_effect = \
lambda key, default=None: {
'PLATFORM': 'LINUX',
'TASK_PAYLOAD': 'initial_command'
}.get(key, default)
tasks.add_utask_main('command', 'input_url', 'job_type')
self.mock_add_task.assert_called_with(
'command',
'input_url',
'job_type',
queue=tasks.UTASK_MAIN_QUEUE,
wait_time=None,
extra_info={'initial_command': 'initial_command'})

def test_add_utask_main_non_linux(self):
"""Test that non-linux jobs are added to their specific queue."""
self.mock_environment.side_effect = \
lambda key, default=None: {
'PLATFORM': 'WINDOWS',
'TASK_PAYLOAD': 'initial_command',
'THREAD_MULTIPLIER': 1
}.get(key, default)
tasks.add_utask_main('command', 'input_url', 'job_type')
self.mock_add_task.assert_called_with(
'command',
'input_url',
'job_type',
queue='jobs-windows',
wait_time=None,
extra_info={'initial_command': 'initial_command'})
self.mock_queue_for_platform.assert_called_with(
'WINDOWS', is_high_end=False, force_true_queue=True)

def test_add_utask_main_non_linux_high_end(self):
"""Test that non-linux high-end jobs are added to their specific queue."""
self.mock_environment.side_effect = \
lambda key, default=None: {
'PLATFORM': 'WINDOWS',
'TASK_PAYLOAD': 'initial_command',
'THREAD_MULTIPLIER': 2
}.get(key, default)
self.mock_queue_for_platform.return_value = 'high-end-jobs-windows'
tasks.add_utask_main('command', 'input_url', 'job_type')
self.mock_add_task.assert_called_with(
'command',
'input_url',
'job_type',
queue='high-end-jobs-windows',
wait_time=None,
extra_info={'initial_command': 'initial_command'})
self.mock_queue_for_platform.assert_called_with(
'WINDOWS', is_high_end=True, force_true_queue=True)


class QueueForJobTest(unittest.TestCase):
"""Tests for queue_for_job."""

def setUp(self):
self.mock_job = mock.MagicMock()
self.mock_job_query = mock.patch(
'clusterfuzz._internal.datastore.data_types.Job.query',
return_value=mock.MagicMock(get=lambda: self.mock_job))
self.mock_full_utask_task_model = mock.patch(
'clusterfuzz._internal.base.tasks.full_utask_task_model').start()
self.mock_job_query.start()

def tearDown(self):
self.mock_job_query.stop()
self.mock_full_utask_task_model.stop()

def test_queue_for_job_force_true_queue(self):
"""Test that force_true_queue gets the true queue."""
self.mock_job.platform = 'WINDOWS'
self.mock_full_utask_task_model.return_value = True
queue = tasks.queue_for_job('job_type', force_true_queue=True)
self.assertEqual(queue, 'jobs-windows')

def test_queue_for_job_no_force(self):
"""Test that no force gets the preprocess queue."""
self.mock_job.platform = 'WINDOWS'
self.mock_full_utask_task_model.return_value = True
queue = tasks.queue_for_job('job_type')
self.assertEqual(queue, tasks.PREPROCESS_QUEUE)
Loading