@@ -118,6 +118,7 @@ def __init__(
118118
119119 self ._port_key_tracker = _PortKeyTracker ()
120120 self ._task_uploading : Task | None = None
121+ self ._task_uploading_followup : Task | None = None
121122 self ._task_scheduler_worker : Task | None = None
122123 self ._schedule_all_ports_for_upload : bool = False
123124
@@ -148,9 +149,6 @@ async def _upload_ports() -> None:
148149 task_name = f"outputs_manager_port_keys-{ '_' .join (port_keys )} "
149150 self ._task_uploading = create_task (_upload_ports (), name = task_name )
150151
151- # used to retain task to avoid early garabage collection
152- cleanup_task : Task | None = None
153-
154152 def _remove_downloads (future : Future ) -> None :
155153 # pylint: disable=protected-access
156154 if future ._exception is not None :
@@ -174,15 +172,20 @@ def _remove_downloads(future: Future) -> None:
174172 except Exception as e : # pylint: disable=broad-except
175173 self ._last_upload_error_tracker [port_key ] = e
176174
177- nonlocal cleanup_task
178- cleanup_task = create_task (self ._port_key_tracker .remove_all_uploading ())
175+ self ._task_uploading_followup = create_task (
176+ self ._port_key_tracker .remove_all_uploading ()
177+ )
179178
180179 self ._task_uploading .add_done_callback (_remove_downloads )
181180
182181 async def _uploading_task_cancel (self ) -> None :
183182 if self ._task_uploading is not None :
184183 await _cancel_task (self ._task_uploading , self .task_cancellation_timeout_s )
185184 await self ._port_key_tracker .move_all_uploading_to_pending ()
185+ if self ._task_uploading_followup is not None :
186+ await _cancel_task (
187+ self ._task_uploading_followup , self .task_cancellation_timeout_s
188+ )
186189
187190 async def _scheduler_worker (self ) -> None :
188191 if await self ._port_key_tracker .are_pending_ports_uploading ():
0 commit comments