@@ -197,7 +197,7 @@ def get_regular_task(queue=None):
197197 if not messages :
198198 return None
199199
200- task = get_task_from_message (messages [0 ])
200+ task = get_task_from_message (messages [0 ], queue )
201201 if task :
202202 return task
203203
@@ -296,7 +296,7 @@ def get_postprocess_task():
296296 messages = pubsub_puller .get_messages (max_messages = 1 )
297297 if not messages :
298298 return None
299- task = get_task_from_message (messages [0 ])
299+ task = get_task_from_message (messages [0 ], POSTPROCESS_QUEUE )
300300 if task :
301301 logs .info ('Pulled from postprocess queue.' )
302302 return task
@@ -311,7 +311,7 @@ def get_preprocess_task():
311311 messages = pubsub_puller .get_messages (max_messages = 1 )
312312 if not messages :
313313 return None
314- task = get_task_from_message (messages [0 ])
314+ task = get_task_from_message (messages [0 ], PREPROCESS_QUEUE )
315315 if task :
316316 logs .info ('Pulled from preprocess queue.' )
317317 return task
@@ -377,9 +377,9 @@ def get_task():
377377 return task
378378
379379
380- def construct_payload (command , argument , job ):
380+ def construct_payload (command , argument , job , queue = None ):
381381 """Constructs payload for task, a standard description of tasks."""
382- return ' ' .join ([command , str (argument ), str (job )])
382+ return ' ' .join ([command , str (argument ), str (job ), str ( queue ) ])
383383
384384
385385class Task :
@@ -392,24 +392,26 @@ def __init__(self,
392392 eta = None ,
393393 is_command_override = False ,
394394 high_end = False ,
395- extra_info = None ):
395+ extra_info = None ,
396+ queue = None ):
396397 self .command = command
397398 self .argument = argument
398399 self .job = job
399400 self .eta = eta
400401 self .is_command_override = is_command_override
401402 self .high_end = high_end
402403 self .extra_info = extra_info
404+ self .queue = queue
403405
404406 def __repr__ (self ):
405- return f'Task: { self .command } { self .argument } { self .job } '
407+ return f'Task: { self .command } { self .argument } { self .job } { self . queue } '
406408
407409 def attribute (self , _ ):
408410 return None
409411
410412 def payload (self ):
411413 """Get the payload."""
412- return construct_payload (self .command , self .argument , self .job )
414+ return construct_payload (self .command , self .argument , self .job , self . queue )
413415
414416 def to_pubsub_message (self ):
415417 """Convert the task to a pubsub message."""
@@ -437,6 +439,9 @@ def lease(self):
437439 yield
438440 track_task_end ()
439441
442+ def set_queue (self , queue ):
443+ self .queue = queue
444+ return self
440445
441446class PubSubTask (Task ):
442447 """A Pub/Sub task."""
@@ -503,7 +508,7 @@ def dont_retry(self):
503508 self ._pubsub_message .ack ()
504509
505510
506- def get_task_from_message (message ) -> Optional [PubSubTask ]:
511+ def get_task_from_message (message , queue = None ) -> Optional [PubSubTask ]:
507512 """Returns a task constructed from the first of |messages| if possible."""
508513 if message is None :
509514 return None
@@ -514,6 +519,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]:
514519 message .ack ()
515520 return None
516521
522+ task = task .set_queue (queue )
517523 # Check that this task should be run now (past the ETA). Otherwise we defer
518524 # its execution.
519525 if task .defer ():
@@ -528,15 +534,15 @@ def get_utask_mains() -> List[PubSubTask]:
528534 pubsub_puller = PubSubPuller (UTASK_MAINS_QUEUE )
529535 messages = pubsub_puller .get_messages_time_limited (MAX_UTASKS ,
530536 UTASK_QUEUE_PULL_SECONDS )
531- return handle_multiple_utask_main_messages (messages )
537+ return handle_multiple_utask_main_messages (messages , UTASK_MAINS_QUEUE )
532538
533539
534- def handle_multiple_utask_main_messages (messages ) -> List [PubSubTask ]:
540+ def handle_multiple_utask_main_messages (messages , queue ) -> List [PubSubTask ]:
535541 """Merges tasks specified in |messages| into a list for processing on this
536542 bot."""
537543 tasks = []
538544 for message in messages :
539- task = get_task_from_message (message )
545+ task = get_task_from_message (message , queue )
540546 if task is None :
541547 continue
542548 tasks .append (task )
0 commit comments