Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(

self._port_key_tracker = _PortKeyTracker()
self._task_uploading: Task | None = None
self._task_uploading_followup: Task | None = None
self._task_scheduler_worker: Task | None = None
self._schedule_all_ports_for_upload: bool = False

Expand Down Expand Up @@ -171,14 +172,22 @@ def _remove_downloads(future: Future) -> None:
except Exception as e: # pylint: disable=broad-except
self._last_upload_error_tracker[port_key] = e

create_task(self._port_key_tracker.remove_all_uploading())
self._task_uploading_followup = create_task(
self._port_key_tracker.remove_all_uploading()
)

self._task_uploading.add_done_callback(_remove_downloads)

async def _uploading_task_cancel(self) -> None:
if self._task_uploading is not None:
await _cancel_task(self._task_uploading, self.task_cancellation_timeout_s)
await self._port_key_tracker.move_all_uploading_to_pending()
self._task_uploading = None
if self._task_uploading_followup is not None:
await _cancel_task(
self._task_uploading_followup, self.task_cancellation_timeout_s
)
self._task_uploading_followup = None

async def _scheduler_worker(self) -> None:
if await self._port_key_tracker.are_pending_ports_uploading():
Expand Down
Loading