diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index cf16afc2b5f..65572f52c43 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -136,7 +136,8 @@ async def _list_cluster_known_tasks( ) -> _DaskClusterTasks: def _list_on_scheduler( dask_scheduler: distributed.Scheduler, - ) -> _DaskClusterTasks: + ) -> dict[str, Any]: + # NOTE: _DaskClusterTasks uses cannot be used here because of serialization issues worker_to_processing_tasks = defaultdict(list) unrunnable_tasks = {} for task_key, task_state in dask_scheduler.tasks.items(): @@ -153,10 +154,10 @@ def _list_on_scheduler( task_state.resource_restrictions or {} ) | {DASK_WORKER_THREAD_RESOURCE_NAME: 1} - return _DaskClusterTasks( - processing=worker_to_processing_tasks, # type: ignore[typeddict-item] - unrunnable=unrunnable_tasks, # type: ignore[typeddict-item] - ) + return { + "processing": worker_to_processing_tasks, + "unrunnable": unrunnable_tasks, + } list_of_tasks: _DaskClusterTasks = await client.run_on_scheduler(_list_on_scheduler) _logger.debug("found tasks: %s", list_of_tasks)