Skip to content

Commit afb6eac

Browse files
author
Andrei Neagu
committed
progress updates are now async and saved to storage
1 parent a027afd commit afb6eac

File tree

11 files changed

+86
-45
lines changed

11 files changed

+86
-45
lines changed

packages/models-library/src/models_library/api_schemas_long_running_tasks/base.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from collections.abc import Awaitable, Callable
23
from typing import Annotated, TypeAlias
34

45
from pydantic import BaseModel, Field, field_validator, validate_call
@@ -22,8 +23,16 @@ class TaskProgress(BaseModel):
2223
message: ProgressMessage = ""
2324
percent: ProgressPercent = 0.0
2425

26+
# used to propagate progress updates internally
27+
_update_callback: Callable[["TaskProgress"], Awaitable[None]] | None = None
28+
29+
def set_update_callback(
30+
self, callback: Callable[["TaskProgress"], Awaitable[None]]
31+
) -> None:
32+
self._update_callback = callback
33+
2534
@validate_call
26-
def update(
35+
async def update(
2736
self,
2837
*,
2938
message: ProgressMessage | None = None,
@@ -40,6 +49,14 @@ def update(
4049

4150
_logger.debug("Progress update: %s", f"{self}")
4251

52+
if self._update_callback is not None:
53+
await self._update_callback(self)
54+
else:
55+
_logger.warning(
56+
"No update callback set for TaskProgress %s, progress will not be propagated",
57+
self.task_id,
58+
)
59+
4360
@classmethod
4461
def create(cls, task_id: TaskId | None = None) -> "TaskProgress":
4562
return cls(task_id=task_id)

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import datetime
3+
import functools
34
import inspect
45
import logging
56
import traceback
@@ -378,6 +379,16 @@ def _get_task_id(self, task_name: str, *, is_unique: bool) -> TaskId:
378379
unique_part = "unique" if is_unique else f"{uuid4()}"
379380
return f"{self.namespace}.{task_name}.{unique_part}"
380381

382+
async def _update_progress(
383+
self,
384+
task_id: TaskId,
385+
task_context: TaskContext | None,
386+
task_progress: TaskProgress,
387+
) -> None:
388+
tracked_data = await self._get_tracked_task(task_id, task_context)
389+
tracked_data.task_progress = task_progress
390+
await self._tasks_data.set_task_data(task_id=task_id, value=tracked_data)
391+
381392
async def start_task(
382393
self,
383394
registered_task_name: RegisteredTaskName,
@@ -410,14 +421,18 @@ async def start_task(
410421
)
411422

412423
task_progress = TaskProgress.create(task_id=task_id)
424+
# set update callback
425+
task_progress.set_update_callback(
426+
functools.partial(self._update_progress, task_id, task_context)
427+
)
413428

414429
# bind the task with progress 0 and 1
415430
async def _progress_task(progress: TaskProgress, handler: TaskProtocol):
416-
progress.update(message="starting", percent=0)
431+
await progress.update(message="starting", percent=0)
417432
try:
418433
return await handler(progress, **task_kwargs)
419434
finally:
420-
progress.update(message="finished", percent=1)
435+
await progress.update(message="finished", percent=1)
421436

422437
async_task = asyncio.create_task(
423438
_progress_task(task_progress, task), name=task_name

packages/service-library/tests/aiohttp/long_running_tasks/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async def _string_list_task(
3636
for index in range(num_strings):
3737
generated_strings.append(f"{index}")
3838
await asyncio.sleep(sleep_time)
39-
progress.update(message="generated item", percent=index / num_strings)
39+
await progress.update(message="generated item", percent=index / num_strings)
4040
if fail:
4141
msg = "We were asked to fail!!"
4242
raise RuntimeError(msg)

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
"redis",
3434
]
3535

36+
pytest_simcore_ops_services_selection = [
37+
"redis-commander",
38+
]
39+
3640

3741
@pytest.fixture
3842
def app(

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def _string_list_task(
5252
for index in range(num_strings):
5353
generated_strings.append(f"{index}")
5454
await asyncio.sleep(sleep_time)
55-
progress.update(message="generated item", percent=index / num_strings)
55+
await progress.update(message="generated item", percent=index / num_strings)
5656
if fail:
5757
msg = "We were asked to fail!!"
5858
raise RuntimeError(msg)

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async def a_background_task(
5151
"""sleeps and raises an error or returns 42"""
5252
for i in range(total_sleep):
5353
await asyncio.sleep(1)
54-
progress.update(percent=(i + 1) / total_sleep)
54+
await progress.update(percent=(i + 1) / total_sleep)
5555
if raise_when_finished:
5656
msg = "raised this error as instructed"
5757
raise RuntimeError(msg)

services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_scheduler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def _task_remove_service_containers(
106106
async def _progress_callback(
107107
message: ProgressMessage, percent: ProgressPercent | None, _: TaskId
108108
) -> None:
109-
progress.update(message=message, percent=percent)
109+
await progress.update(message=message, percent=percent)
110110

111111
await dynamic_sidecars_scheduler.remove_service_containers(
112112
node_uuid=node_uuid, progress_callback=_progress_callback
@@ -171,7 +171,7 @@ async def _task_save_service_state(
171171
async def _progress_callback(
172172
message: ProgressMessage, percent: ProgressPercent | None, _: TaskId
173173
) -> None:
174-
progress.update(message=message, percent=percent)
174+
await progress.update(message=message, percent=percent)
175175

176176
await dynamic_sidecars_scheduler.save_service_state(
177177
node_uuid=node_uuid, progress_callback=_progress_callback
@@ -218,7 +218,7 @@ async def _task_push_service_outputs(
218218
async def _progress_callback(
219219
message: ProgressMessage, percent: ProgressPercent | None, _: TaskId
220220
) -> None:
221-
progress.update(message=message, percent=percent)
221+
await progress.update(message=message, percent=percent)
222222

223223
await dynamic_sidecars_scheduler.push_service_outputs(
224224
node_uuid=node_uuid, progress_callback=_progress_callback

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
217217
if set_were_state_and_outputs_saved is not None:
218218
scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True
219219

220-
task_progress.update(
220+
await task_progress.update(
221221
message="removing dynamic sidecar stack", percent=ProgressPercent(0.1)
222222
)
223223
await remove_dynamic_sidecar_stack(
@@ -232,7 +232,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
232232
node_id=scheduler_data.node_uuid,
233233
)
234234

235-
task_progress.update(message="removing network", percent=ProgressPercent(0.2))
235+
await task_progress.update(message="removing network", percent=ProgressPercent(0.2))
236236
await remove_dynamic_sidecar_network(scheduler_data.dynamic_sidecar_network_name)
237237

238238
if scheduler_data.dynamic_sidecar.were_state_and_outputs_saved:
@@ -243,7 +243,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
243243
)
244244
else:
245245
# Remove all dy-sidecar associated volumes from node
246-
task_progress.update(
246+
await task_progress.update(
247247
message="removing volumes", percent=ProgressPercent(0.3)
248248
)
249249
with log_context(_logger, logging.DEBUG, f"removing volumes '{node_uuid}'"):
@@ -265,7 +265,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
265265
scheduler_data.service_name,
266266
)
267267

268-
task_progress.update(
268+
await task_progress.update(
269269
message="removing project networks", percent=ProgressPercent(0.8)
270270
)
271271
used_projects_networks = await get_projects_networks_containers(
@@ -284,7 +284,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
284284
await app.state.dynamic_sidecar_scheduler.scheduler.remove_service_from_observation(
285285
scheduler_data.node_uuid
286286
)
287-
task_progress.update(
287+
await task_progress.update(
288288
message="finished removing resources", percent=ProgressPercent(1)
289289
)
290290

services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@
5555
"RABBIT_PORT",
5656
"RABBIT_SECURE",
5757
"RABBIT_USER",
58+
"REDIS_HOST",
59+
"REDIS_PASSWORD",
60+
"REDIS_PORT",
61+
"REDIS_SECURE",
62+
"REDIS_USER",
5863
"S3_ACCESS_KEY",
5964
"S3_BUCKET_NAME",
6065
"S3_ENDPOINT",

0 commit comments

Comments
 (0)