@@ -283,7 +283,7 @@ def __init__(self,
283283 ):
284284
285285 self .workflow_end = False
286- self .workflow_start_message = None # type : Optional[MonitoringMessage]
286+ self .workflow_start_message : Optional [MonitoringMessage ] = None
287287 self .logdir = logdir
288288 os .makedirs (self .logdir , exist_ok = True )
289289
@@ -299,10 +299,10 @@ def __init__(self,
299299 self .batching_interval = batching_interval
300300 self .batching_threshold = batching_threshold
301301
302- self .pending_priority_queue = queue .Queue () # type: queue.Queue[TaggedMonitoringMessage]
303- self .pending_node_queue = queue .Queue () # type: queue.Queue[MonitoringMessage]
304- self .pending_block_queue = queue .Queue () # type: queue.Queue[MonitoringMessage]
305- self .pending_resource_queue = queue .Queue () # type: queue.Queue[MonitoringMessage]
302+ self .pending_priority_queue : queue .Queue [ TaggedMonitoringMessage ] = queue .Queue ()
303+ self .pending_node_queue : queue .Queue [ MonitoringMessage ] = queue .Queue ()
304+ self .pending_block_queue : queue .Queue [ MonitoringMessage ] = queue .Queue ()
305+ self .pending_resource_queue : queue .Queue [ MonitoringMessage ] = queue .Queue ()
306306
307307 def start (self ,
308308 priority_queue : "queue.Queue[TaggedMonitoringMessage]" ,
@@ -351,18 +351,18 @@ def start(self,
351351 If that happens, the message will be added to deferred_resource_messages and processed later.
352352
353353 """
354- inserted_tasks = set () # type : Set[object]
354+ inserted_tasks : Set [object ] = set ()
355355
356356 """
357357 like inserted_tasks but for task,try tuples
358358 """
359- inserted_tries = set () # type : Set[Any]
359+ inserted_tries : Set [Any ] = set ()
360360
361361 # for any task ID, we can defer exactly one message, which is the
362362 # assumed-to-be-unique first message (with first message flag set).
363363 # The code prior to this patch will discard previous message in
364364 # the case of multiple messages to defer.
365- deferred_resource_messages = {} # type: MonitoringMessage
365+ deferred_resource_messages : MonitoringMessage = {}
366366
367367 exception_happened = False
368368
@@ -505,7 +505,7 @@ def start(self,
505505 "Got {} messages from block queue" .format (len (block_info_messages )))
506506 # block_info_messages is possibly a nested list of dict (at different polling times)
507507 # Each dict refers to the info of a job/block at one polling time
508- block_messages_to_insert = [] # type: List[Any ]
508+ block_messages_to_insert : List [ Any ] = [ ]
509509 for block_msg in block_info_messages :
510510 block_messages_to_insert .extend (block_msg )
511511 self ._insert (table = BLOCK , messages = block_messages_to_insert )
@@ -686,7 +686,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None:
686686 logger .exception ("Rollback failed" )
687687
688688 def _get_messages_in_batch (self , msg_queue : "queue.Queue[X]" ) -> List [X ]:
689- messages = [] # type: List[X ]
689+ messages : List [ X ] = [ ]
690690 start = time .time ()
691691 while True :
692692 if time .time () - start >= self .batching_interval or len (messages ) >= self .batching_threshold :
0 commit comments