Skip to content

Commit 44d02ae

Browse files
Update get_task to return the queue of the task (google#4699)
get_task has been updated to return the queue from which the task was originally obtained. This is requried so that in case the task cannot be currently executed, it is added back to the right queue. Currently observed that a progression task for a old device like flame from jobs-android queue was being pushed to jobs-android-mte-pixel8 queue
1 parent ebcde24 commit 44d02ae

File tree

3 files changed

+45
-16
lines changed

3 files changed

+45
-16
lines changed

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def get_regular_task(queue=None):
197197
if not messages:
198198
return None
199199

200-
task = get_task_from_message(messages[0])
200+
task = get_task_from_message(messages[0], queue)
201201
if task:
202202
return task
203203

@@ -296,7 +296,7 @@ def get_postprocess_task():
296296
messages = pubsub_puller.get_messages(max_messages=1)
297297
if not messages:
298298
return None
299-
task = get_task_from_message(messages[0])
299+
task = get_task_from_message(messages[0], POSTPROCESS_QUEUE)
300300
if task:
301301
logs.info('Pulled from postprocess queue.')
302302
return task
@@ -311,7 +311,7 @@ def get_preprocess_task():
311311
messages = pubsub_puller.get_messages(max_messages=1)
312312
if not messages:
313313
return None
314-
task = get_task_from_message(messages[0])
314+
task = get_task_from_message(messages[0], PREPROCESS_QUEUE)
315315
if task:
316316
logs.info('Pulled from preprocess queue.')
317317
return task
@@ -377,9 +377,9 @@ def get_task():
377377
return task
378378

379379

380-
def construct_payload(command, argument, job):
380+
def construct_payload(command, argument, job, queue=None):
381381
"""Constructs payload for task, a standard description of tasks."""
382-
return ' '.join([command, str(argument), str(job)])
382+
return ' '.join([command, str(argument), str(job), str(queue)])
383383

384384

385385
class Task:
@@ -392,24 +392,26 @@ def __init__(self,
392392
eta=None,
393393
is_command_override=False,
394394
high_end=False,
395-
extra_info=None):
395+
extra_info=None,
396+
queue=None):
396397
self.command = command
397398
self.argument = argument
398399
self.job = job
399400
self.eta = eta
400401
self.is_command_override = is_command_override
401402
self.high_end = high_end
402403
self.extra_info = extra_info
404+
self.queue = queue
403405

404406
def __repr__(self):
405-
return f'Task: {self.command} {self.argument} {self.job}'
407+
return f'Task: {self.command} {self.argument} {self.job} {self.queue}'
406408

407409
def attribute(self, _):
408410
return None
409411

410412
def payload(self):
411413
"""Get the payload."""
412-
return construct_payload(self.command, self.argument, self.job)
414+
return construct_payload(self.command, self.argument, self.job, self.queue)
413415

414416
def to_pubsub_message(self):
415417
"""Convert the task to a pubsub message."""
@@ -437,6 +439,10 @@ def lease(self):
437439
yield
438440
track_task_end()
439441

442+
def set_queue(self, queue):
443+
self.queue = queue
444+
return self
445+
440446

441447
class PubSubTask(Task):
442448
"""A Pub/Sub task."""
@@ -503,7 +509,7 @@ def dont_retry(self):
503509
self._pubsub_message.ack()
504510

505511

506-
def get_task_from_message(message) -> Optional[PubSubTask]:
512+
def get_task_from_message(message, queue=None) -> Optional[PubSubTask]:
507513
"""Returns a task constructed from the first of |messages| if possible."""
508514
if message is None:
509515
return None
@@ -514,6 +520,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]:
514520
message.ack()
515521
return None
516522

523+
task = task.set_queue(queue)
517524
# Check that this task should be run now (past the ETA). Otherwise we defer
518525
# its execution.
519526
if task.defer():
@@ -528,15 +535,15 @@ def get_utask_mains() -> List[PubSubTask]:
528535
pubsub_puller = PubSubPuller(UTASK_MAINS_QUEUE)
529536
messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS,
530537
UTASK_QUEUE_PULL_SECONDS)
531-
return handle_multiple_utask_main_messages(messages)
538+
return handle_multiple_utask_main_messages(messages, UTASK_MAINS_QUEUE)
532539

533540

534-
def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
541+
def handle_multiple_utask_main_messages(messages, queue) -> List[PubSubTask]:
535542
"""Merges tasks specified in |messages| into a list for processing on this
536543
bot."""
537544
tasks = []
538545
for message in messages:
539-
task = get_task_from_message(message)
546+
task = get_task_from_message(message, queue)
540547
if task is None:
541548
continue
542549
tasks.append(task)

src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ def test_high_end(self):
8585
self.assertEqual('test', task.command)
8686
self.assertEqual('high', task.argument)
8787
self.assertEqual('job', task.job)
88-
self.assertEqual('test high job', task.payload())
88+
self.assertEqual('high-end-jobs-linux', task.queue)
89+
self.assertEqual('test high job high-end-jobs-linux', task.payload())
8990

9091
def test_regular(self):
9192
"""Test regular tasks."""
@@ -98,7 +99,8 @@ def test_regular(self):
9899
self.assertEqual('test', task.command)
99100
self.assertEqual('normal', task.argument)
100101
self.assertEqual('job', task.job)
101-
self.assertEqual('test normal job', task.payload())
102+
self.assertEqual('jobs-linux', task.queue)
103+
self.assertEqual('test normal job jobs-linux', task.payload())
102104

103105
def test_preemptible(self):
104106
"""Test preemptible bot tasks."""
@@ -124,7 +126,8 @@ def test_defer(self):
124126
self.assertEqual('test', task.command)
125127
self.assertEqual('normal4', task.argument)
126128
self.assertEqual('job', task.job)
127-
self.assertEqual('test normal4 job', task.payload())
129+
self.assertEqual('jobs-linux', task.queue)
130+
self.assertEqual('test normal4 job jobs-linux', task.payload())
128131

129132
self.assertEqual(3, mock_modify.call_count)
130133
mock_modify.assert_has_calls([
@@ -142,7 +145,8 @@ def test_command_override(self):
142145
self.assertEqual('test', task.command)
143146
self.assertEqual('override', task.argument)
144147
self.assertEqual('job', task.job)
145-
self.assertEqual('test override job', task.payload())
148+
self.assertEqual(None, task.queue)
149+
self.assertEqual('test override job None', task.payload())
146150

147151

148152
class LeaseTaskTest(unittest.TestCase):

src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def test_no_message(self):
259259

260260
def test_success(self):
261261
mock_task = mock.Mock(defer=mock.Mock(return_value=False))
262+
mock_task.set_queue.return_value = mock_task
262263
with mock.patch(
263264
'clusterfuzz._internal.base.tasks.initialize_task',
264265
return_value=mock_task):
@@ -278,3 +279,20 @@ def test_defer(self):
278279
'clusterfuzz._internal.base.tasks.initialize_task',
279280
return_value=mock_task):
280281
self.assertEqual(tasks.get_task_from_message(mock.Mock()), None)
282+
283+
def test_set_queue(self):
284+
"""Tests the set_queue method of a task."""
285+
mock_queue = mock.Mock()
286+
mock_task = mock.Mock()
287+
288+
mock_task.configure_mock(
289+
queue=mock_queue,
290+
set_queue=mock.Mock(return_value=mock_task),
291+
defer=mock.Mock(return_value=False))
292+
293+
with mock.patch(
294+
'clusterfuzz._internal.base.tasks.initialize_task',
295+
return_value=mock_task):
296+
task = tasks.get_task_from_message(mock.Mock())
297+
298+
self.assertEqual(task.queue, mock_queue)

0 commit comments

Comments
 (0)