Skip to content

Commit 35318e5

Browse files
authored
🎨Dynamic-sidecar: report progress on sidecar task status (#8687)
1 parent eabe396 commit 35318e5

File tree

2 files changed

+55
-12
lines changed

2 files changed

+55
-12
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import re
3-
from collections.abc import Mapping
3+
from collections.abc import Awaitable, Callable, Mapping
44
from typing import Any, Final
55

66
import aiodocker
@@ -32,7 +32,12 @@
3232
)
3333
from ....models.dynamic_services_scheduler import NetworkId, SchedulerData, ServiceId
3434
from ....utils.dict_utils import get_leaf_key_paths, nested_update
35-
from ..docker_states import TASK_STATES_RUNNING, extract_task_state
35+
from ..docker_states import (
36+
TASK_STATES_COMPLETE,
37+
TASK_STATES_FAILED,
38+
TASK_STATES_RUNNING,
39+
extract_task_state,
40+
)
3641
from ..errors import DockerServiceNotFoundError, DynamicSidecarError, GenericDockerError
3742
from ._utils import docker_client
3843

@@ -176,9 +181,24 @@ async def _get_service_latest_task(service_id: str) -> Mapping[str, Any]:
176181
raise
177182

178183

184+
_TASK_STATE_TO_PROGRESS_MAPPING: Final[dict[str, float]] = (
185+
{
186+
"pending": 0.2,
187+
"assigned": 0.4,
188+
"preparing": 0.6,
189+
"starting": 0.8,
190+
"running": 1.0,
191+
}
192+
| dict.fromkeys(TASK_STATES_COMPLETE, 1.0)
193+
| dict.fromkeys(TASK_STATES_FAILED, 1.0)
194+
)
195+
196+
179197
async def get_dynamic_sidecar_placement(
180198
service_id: str,
181199
dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings,
200+
*,
201+
progress_update: Callable[[float], Awaitable[None]] | None = None,
182202
) -> DockerNodeID:
183203
"""
184204
Waits until the service has a task in `running` state and
@@ -207,6 +227,12 @@ async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, A
207227
"""
208228
task = await _get_service_latest_task(service_id)
209229
service_state = task["Status"]["State"]
230+
if progress_update:
231+
# estimate progress based on task state
232+
233+
await progress_update(
234+
_TASK_STATE_TO_PROGRESS_MAPPING.get(service_state, 0.0)
235+
)
210236

211237
if service_state not in TASK_STATES_RUNNING:
212238
raise TryAgain

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -275,33 +275,50 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
275275
dynamic_sidecar_service_final_spec = _merge_service_base_and_user_specs(
276276
dynamic_sidecar_service_spec_base, user_specific_service_spec
277277
)
278-
rabbit_message = ProgressRabbitMessageNode.model_construct(
278+
sidecar_pull_started_msg = ProgressRabbitMessageNode.model_construct(
279279
user_id=scheduler_data.user_id,
280280
project_id=scheduler_data.project_id,
281281
node_id=scheduler_data.node_uuid,
282282
progress_type=ProgressType.SIDECARS_PULLING,
283283
report=ProgressReport(actual_value=0, total=1),
284284
)
285-
await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message)
285+
await rabbitmq_client.publish(
286+
ProgressRabbitMessageNode.get_channel_name(), sidecar_pull_started_msg
287+
)
286288
dynamic_sidecar_id = await create_service_and_get_id(
287289
dynamic_sidecar_service_final_spec,
288290
app_settings.DIRECTOR_V2_DOCKER_HUB_REGISTRY,
289291
)
292+
await rabbitmq_client.publish(
293+
ProgressRabbitMessageNode.get_channel_name(),
294+
sidecar_pull_started_msg.model_copy(
295+
update={"report": ProgressReport(actual_value=0.1, total=1)}
296+
),
297+
)
298+
290299
# constrain service to the same node
300+
async def progress_update(current: float) -> None:
301+
await rabbitmq_client.publish(
302+
ProgressRabbitMessageNode.get_channel_name(),
303+
sidecar_pull_started_msg.model_copy(
304+
update={"report": ProgressReport(actual_value=current, total=1)}
305+
),
306+
)
307+
291308
scheduler_data.dynamic_sidecar.docker_node_id = (
292309
await get_dynamic_sidecar_placement(
293-
dynamic_sidecar_id, dynamic_services_scheduler_settings
310+
dynamic_sidecar_id,
311+
dynamic_services_scheduler_settings,
312+
progress_update=progress_update,
294313
)
295314
)
296315

297-
rabbit_message = ProgressRabbitMessageNode.model_construct(
298-
user_id=scheduler_data.user_id,
299-
project_id=scheduler_data.project_id,
300-
node_id=scheduler_data.node_uuid,
301-
progress_type=ProgressType.SIDECARS_PULLING,
302-
report=ProgressReport(actual_value=1, total=1),
316+
await rabbitmq_client.publish(
317+
ProgressRabbitMessageNode.get_channel_name(),
318+
sidecar_pull_started_msg.model_copy(
319+
update={"report": ProgressReport(actual_value=1, total=1)}
320+
),
303321
)
304-
await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message)
305322

306323
await constrain_service_to_node(
307324
service_name=scheduler_data.service_name,

0 commit comments

Comments
 (0)