@@ -505,13 +505,24 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
505505 else :
506506 logger .debug ("Got %s result items in batch from manager %r" , len (all_messages ), manager_id )
507507
508- b_messages = []
508+ m = self ._ready_managers [manager_id ]
509+ b_messages_to_send = []
509510
510511 for p_message in all_messages :
511512 r = pickle .loads (p_message )
512513 if r ['type' ] == 'result' :
513514 # process this for task ID and forward to executor
514- b_messages .append ((p_message , r ))
515+ logger .debug ("Removing task %s from manager record %r" , r ["task_id" ], manager_id )
516+ try :
517+ m ['tasks' ].remove (r ['task_id' ])
518+ b_messages_to_send .append (p_message )
519+ except Exception :
520+ logger .exception (
521+ "Ignoring exception removing task_id %s for manager %r with task list %s" ,
522+ r ['task_id' ],
523+ manager_id ,
524+ m ["tasks" ]
525+ )
515526 elif r ['type' ] == 'monitoring' :
516527 # the monitoring code makes the assumption that no
517528 # monitoring messages will be received if monitoring
@@ -525,43 +536,19 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
525536 else :
526537 logger .error ("Interchange discarding result_queue message of unknown type: %s" , r ["type" ])
527538
528- got_result = False
529- m = self ._ready_managers [manager_id ]
530- for (_ , r ) in b_messages :
531- assert 'type' in r , f"Message is missing type entry: { r } "
532- if r ['type' ] == 'result' :
533- got_result = True
534- try :
535- logger .debug ("Removing task %s from manager record %r" , r ["task_id" ], manager_id )
536- m ['tasks' ].remove (r ['task_id' ])
537- except Exception :
538- # If we reach here, there's something very wrong.
539- logger .exception (
540- "Ignoring exception removing task_id %s for manager %r with task list %s" ,
541- r ['task_id' ],
542- manager_id ,
543- m ["tasks" ]
544- )
545-
546- b_messages_to_send = []
547- for (b_message , _ ) in b_messages :
548- b_messages_to_send .append (b_message )
549-
550539 if b_messages_to_send :
551540 logger .debug ("Sending messages on results_outgoing" )
552541 self .results_outgoing .send_multipart (b_messages_to_send )
553542 logger .debug ("Sent messages on results_outgoing" )
554543
555- logger .debug ("Current tasks on manager %r: %s" , manager_id , m ["tasks" ])
556- if len (m ['tasks' ]) == 0 and m ['idle_since' ] is None :
557- m ['idle_since' ] = time .time ()
558-
559- # A manager is only made interesting here if a result was
560- # received, which means there should be capacity for a new
561- # task now. Heartbeats and monitoring messages do not make a
562- # manager become interesting.
563- if got_result :
544+ # At least one result received, so manager now has idle capacity
564545 interesting_managers .add (manager_id )
546+
547+ if len (m ['tasks' ]) == 0 and m ['idle_since' ] is None :
548+ m ['idle_since' ] = time .time ()
549+
550+ logger .debug ("Current tasks on manager %r: %s" , manager_id , m ["tasks" ])
551+
565552 logger .debug ("leaving results_incoming section" )
566553
567554 def expire_bad_managers (self , interesting_managers : Set [bytes ], monitoring_radio : Optional [MonitoringRadioSender ]) -> None :
0 commit comments