@@ -375,7 +375,7 @@ def start(self) -> None:
375375
376376 self .zmq_context .destroy ()
377377 delta = time .time () - start
378- logger .info ("Processed {} tasks in {} seconds" . format ( self . count , delta ) )
378+ logger .info (f "Processed { self . count } tasks in { delta } seconds" )
379379 logger .warning ("Exiting" )
380380
381381 def process_task_outgoing_incoming (
@@ -396,17 +396,16 @@ def process_task_outgoing_incoming(
396396 try :
397397 msg = json .loads (message [1 ].decode ('utf-8' ))
398398 except Exception :
399- logger .warning ("Got Exception reading message from manager: {!r}" .format (
400- manager_id ), exc_info = True )
401- logger .debug ("Message: \n {!r}\n " .format (message [1 ]))
399+ logger .warning (f"Got Exception reading message from manager: { manager_id !r} " , exc_info = True )
400+ logger .debug ("Message:\n %r\n " , message [1 ])
402401 return
403402
404403 # perform a bit of validation on the structure of the deserialized
405404 # object, at least enough to behave like a deserialization error
406405 # in obviously malformed cases
407406 if not isinstance (msg , dict ) or 'type' not in msg :
408407 logger .error (f"JSON message was not correctly formatted from manager: { manager_id !r} " )
409- logger .debug ("Message: \n {!r} \n " . format ( message [1 ]) )
408+ logger .debug ("Message:\n %r \n " , message [1 ])
410409 return
411410
412411 if msg ['type' ] == 'registration' :
@@ -425,7 +424,7 @@ def process_task_outgoing_incoming(
425424 self .connected_block_history .append (msg ['block_id' ])
426425
427426 interesting_managers .add (manager_id )
428- logger .info ("Adding manager: {!r} to ready queue" . format ( manager_id ) )
427+ logger .info (f "Adding manager: { manager_id !r} to ready queue" )
429428 m = self ._ready_managers [manager_id ]
430429
431430 # m is a ManagerRecord, but msg is a dict[Any,Any] and so can
@@ -434,12 +433,12 @@ def process_task_outgoing_incoming(
434433 # later.
435434 m .update (msg ) # type: ignore[typeddict-item]
436435
437- logger .info ("Registration info for manager {!r}: {}" . format ( manager_id , msg ) )
436+ logger .info (f "Registration info for manager { manager_id !r} : { msg } " )
438437 self ._send_monitoring_info (monitoring_radio , m )
439438
440439 if (msg ['python_v' ].rsplit ("." , 1 )[0 ] != self .current_platform ['python_v' ].rsplit ("." , 1 )[0 ] or
441440 msg ['parsl_v' ] != self .current_platform ['parsl_v' ]):
442- logger .error ("Manager {!r} has incompatible version info with the interchange" . format ( manager_id ) )
441+ logger .error (f "Manager { manager_id !r} has incompatible version info with the interchange" )
443442 logger .debug ("Setting kill event" )
444443 kill_event .set ()
445444 e = VersionMismatch ("py.v={} parsl.v={}" .format (self .current_platform ['python_v' ].rsplit ("." , 1 )[0 ],
@@ -452,16 +451,15 @@ def process_task_outgoing_incoming(
452451 self .results_outgoing .send (pkl_package )
453452 logger .error ("Sent failure reports, shutting down interchange" )
454453 else :
455- logger .info ("Manager {!r} has compatible Parsl version {}" .format (manager_id , msg ['parsl_v' ]))
456- logger .info ("Manager {!r} has compatible Python version {}" .format (manager_id ,
457- msg ['python_v' ].rsplit ("." , 1 )[0 ]))
454+ logger .info (f"Manager { manager_id !r} has compatible Parsl version { msg ['parsl_v' ]} " )
455+ logger .info (f"Manager { manager_id !r} has compatible Python version { msg ['python_v' ].rsplit ('.' , 1 )[0 ]} " )
458456 elif msg ['type' ] == 'heartbeat' :
459457 self ._ready_managers [manager_id ]['last_heartbeat' ] = time .time ()
460- logger .debug ("Manager {!r} sent heartbeat via tasks connection" . format ( manager_id ) )
458+ logger .debug ("Manager %r sent heartbeat via tasks connection" , manager_id )
461459 self .task_outgoing .send_multipart ([manager_id , b'' , PKL_HEARTBEAT_CODE ])
462460 elif msg ['type' ] == 'drain' :
463461 self ._ready_managers [manager_id ]['draining' ] = True
464- logger .debug (f "Manager { manager_id !r } requested drain" )
462+ logger .debug ("Manager %r requested drain" , manager_id )
465463 else :
466464 logger .error (f"Unexpected message type received from manager: { msg ['type' ]} " )
467465 logger .debug ("leaving task_outgoing section" )
@@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
484482 def process_tasks_to_send (self , interesting_managers : Set [bytes ]) -> None :
485483 # Check if there are tasks that could be sent to managers
486484
487- logger .debug ("Managers count (interesting/total): {interesting}/{total}" .format (
488- total = len (self ._ready_managers ),
489- interesting = len (interesting_managers )))
485+ logger .debug (
486+ "Managers count (interesting/total): {}/{}" ,
487+ len (interesting_managers ),
488+ len (self ._ready_managers )
489+ )
490490
491491 if interesting_managers and not self .pending_task_queue .empty ():
492492 shuffled_managers = self .manager_selector .sort_managers (self ._ready_managers , interesting_managers )
@@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
497497 tasks_inflight = len (m ['tasks' ])
498498 real_capacity = m ['max_capacity' ] - tasks_inflight
499499
500- if ( real_capacity and m [' active' ] and not m [' draining' ]) :
500+ if real_capacity and m [" active" ] and not m [" draining" ] :
501501 tasks = self .get_tasks (real_capacity )
502502 if tasks :
503503 self .task_outgoing .send_multipart ([manager_id , b'' , pickle .dumps (tasks )])
@@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
506506 tids = [t ['task_id' ] for t in tasks ]
507507 m ['tasks' ].extend (tids )
508508 m ['idle_since' ] = None
509- logger .debug ("Sent tasks: {} to manager {!r}" . format ( tids , manager_id ) )
509+ logger .debug ("Sent tasks: %s to manager %r" , tids , manager_id )
510510 # recompute real_capacity after sending tasks
511511 real_capacity = m ['max_capacity' ] - tasks_inflight
512512 if real_capacity > 0 :
513- logger .debug ("Manager {!r} has free capacity {}" . format ( manager_id , real_capacity ) )
513+ logger .debug ("Manager %r has free capacity %s" , manager_id , real_capacity )
514514 # ... so keep it in the interesting_managers list
515515 else :
516- logger .debug ("Manager {!r} is now saturated" . format ( manager_id ) )
516+ logger .debug ("Manager %r is now saturated" , manager_id )
517517 interesting_managers .remove (manager_id )
518518 else :
519519 interesting_managers .remove (manager_id )
520520 # logger.debug("Nothing to send to manager {}".format(manager_id))
521- logger .debug ("leaving _ready_managers section, with {} managers still interesting" . format ( len (interesting_managers ) ))
521+ logger .debug ("leaving _ready_managers section, with %s managers still interesting" , len (interesting_managers ))
522522 else :
523523 logger .debug ("either no interesting managers or no tasks, so skipping manager pass" )
524524
@@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
528528 logger .debug ("entering results_incoming section" )
529529 manager_id , * all_messages = self .results_incoming .recv_multipart ()
530530 if manager_id not in self ._ready_managers :
531- logger .warning ("Received a result from a un-registered manager: {!r}" . format ( manager_id ) )
531+ logger .warning (f "Received a result from a un-registered manager: { manager_id !r} " )
532532 else :
533- logger .debug (f "Got { len ( all_messages ) } result items in batch from manager { manager_id !r } " )
533+ logger .debug ("Got %s result items in batch from manager %r" , len ( all_messages ), manager_id )
534534
535535 b_messages = []
536536
@@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
548548
549549 monitoring_radio .send (r ['payload' ])
550550 elif r ['type' ] == 'heartbeat' :
551- logger .debug (f "Manager { manager_id !r } sent heartbeat via results connection" )
551+ logger .debug ("Manager %r sent heartbeat via results connection" , manager_id )
552552 b_messages .append ((p_message , r ))
553553 else :
554- logger .error ("Interchange discarding result_queue message of unknown type: {}" . format ( r [ ' type' ]) )
554+ logger .error ("Interchange discarding result_queue message of unknown type: %s" , r [ " type" ] )
555555
556556 got_result = False
557557 m = self ._ready_managers [manager_id ]
@@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
560560 if r ['type' ] == 'result' :
561561 got_result = True
562562 try :
563- logger .debug (f "Removing task { r [ 'task_id' ] } from manager record { manager_id !r } " )
563+ logger .debug ("Removing task %s from manager record %r" , r [ "task_id" ], manager_id )
564564 m ['tasks' ].remove (r ['task_id' ])
565565 except Exception :
566566 # If we reach here, there's something very wrong.
567- logger .exception ("Ignoring exception removing task_id {} for manager {!r} with task list {}" .format (
567+ logger .exception (
568+ "Ignoring exception removing task_id %s for manager %r with task list %s" ,
568569 r ['task_id' ],
569570 manager_id ,
570- m ['tasks' ]))
571+ m ["tasks" ]
572+ )
571573
572574 b_messages_to_send = []
573575 for (b_message , _ ) in b_messages :
@@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
578580 self .results_outgoing .send_multipart (b_messages_to_send )
579581 logger .debug ("Sent messages on results_outgoing" )
580582
581- logger .debug (f "Current tasks on manager { manager_id !r } : { m [ ' tasks' ] } " )
583+ logger .debug ("Current tasks on manager %r: %s" , manager_id , m [ " tasks" ] )
582584 if len (m ['tasks' ]) == 0 and m ['idle_since' ] is None :
583585 m ['idle_since' ] = time .time ()
584586
0 commit comments