Skip to content

Commit 0c10248

Browse files
fix methods
1 parent c390a3e commit 0c10248

File tree

3 files changed

+31
-19
lines changed

3 files changed

+31
-19
lines changed

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,18 @@ async def compute_path_size(
2525
location_id: LocationID,
2626
path: Path,
2727
) -> AsyncJobGet:
28+
task_name = remote_compute_path_size.__name__
2829
task_uuid = await get_celery_client(app).submit_task(
2930
task_metadata=TaskMetadata(
30-
name=remote_compute_path_size.__name__,
31+
name=task_name,
3132
),
3233
task_context=job_id_data.model_dump(),
3334
user_id=job_id_data.user_id,
3435
location_id=location_id,
3536
path=path,
3637
)
3738

38-
return AsyncJobGet(job_id=task_uuid)
39+
return AsyncJobGet(job_id=task_uuid, job_name=task_name)
3940

4041

4142
@router.expose(reraise_if_error_type=None)
@@ -45,13 +46,14 @@ async def delete_paths(
4546
location_id: LocationID,
4647
paths: set[Path],
4748
) -> AsyncJobGet:
49+
task_name = remote_delete_paths.__name__
4850
task_uuid = await get_celery_client(app).submit_task(
4951
task_metadata=TaskMetadata(
50-
name=remote_delete_paths.__name__,
52+
name=task_name,
5153
),
5254
task_context=job_id_data.model_dump(),
5355
user_id=job_id_data.user_id,
5456
location_id=location_id,
5557
paths=paths,
5658
)
57-
return AsyncJobGet(job_id=task_uuid)
59+
return AsyncJobGet(job_id=task_uuid, job_name=task_name)

services/storage/src/simcore_service_storage/modules/celery/backends/_redis.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ async def list_tasks(self, task_context: TaskContext) -> list[Task]:
6969
+ build_task_id_prefix(task_context)
7070
+ _CELERY_TASK_ID_KEY_SEPARATOR
7171
)
72+
search_key_len = len(search_key)
73+
7274
keys: list[str] = []
73-
tasks = []
7475
pipe = self._redis_client_sdk.redis.pipeline()
7576
async for key in self._redis_client_sdk.redis.scan_iter(
7677
match=search_key + "*", count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
@@ -82,19 +83,18 @@ async def list_tasks(self, task_context: TaskContext) -> list[Task]:
8283
else key
8384
)
8485
keys.append(_key)
85-
86-
for key in keys:
87-
pipe.hget(key, _CELERY_TASK_METADATA_KEY)
86+
pipe.hget(_key, _CELERY_TASK_METADATA_KEY)
8887

8988
results = await pipe.execute()
90-
for key, task_metadata in zip(keys, results, strict=False):
91-
tasks.append(
92-
Task(
93-
uuid=TaskUUID(key.removeprefix(search_key)),
94-
metadata=TaskMetadata.model_validate_json(task_metadata),
95-
)
89+
90+
return [
91+
Task(
92+
uuid=TaskUUID(key[search_key_len:]),
93+
metadata=TaskMetadata.model_validate_json(metadata),
9694
)
97-
return tasks
95+
for key, metadata in zip(keys, results, strict=True)
96+
if metadata is not None
97+
]
9898

9999
async def remove_task(self, task_id: TaskID) -> None:
100100
await self._redis_client_sdk.redis.delete(_build_key(task_id)) # type: ignore

services/storage/tests/unit/test_modules_celery.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from simcore_service_storage.modules.celery.errors import TransferrableCeleryError
2424
from simcore_service_storage.modules.celery.models import (
2525
TaskContext,
26+
TaskMetadata,
2627
TaskState,
2728
)
2829
from simcore_service_storage.modules.celery.utils import (
@@ -111,7 +112,9 @@ async def test_submitting_task_calling_async_function_results_with_success_state
111112
task_context = TaskContext(user_id=42)
112113

113114
task_uuid = await celery_client.submit_task(
114-
fake_file_processor.__name__,
115+
TaskMetadata(
116+
name=fake_file_processor.__name__,
117+
),
115118
task_context=task_context,
116119
files=[f"file{n}" for n in range(5)],
117120
)
@@ -139,7 +142,10 @@ async def test_submitting_task_with_failure_results_with_error(
139142
task_context = TaskContext(user_id=42)
140143

141144
task_uuid = await celery_client.submit_task(
142-
failure_task.__name__, task_context=task_context
145+
TaskMetadata(
146+
name=failure_task.__name__,
147+
),
148+
task_context=task_context,
143149
)
144150

145151
for attempt in Retrying(
@@ -162,7 +168,9 @@ async def test_aborting_task_results_with_aborted_state(
162168
task_context = TaskContext(user_id=42)
163169

164170
task_uuid = await celery_client.submit_task(
165-
dreamer_task.__name__,
171+
TaskMetadata(
172+
name=dreamer_task.__name__,
173+
),
166174
task_context=task_context,
167175
)
168176

@@ -188,7 +196,9 @@ async def test_listing_task_uuids_contains_submitted_task(
188196
task_context = TaskContext(user_id=42)
189197

190198
task_uuid = await celery_client.submit_task(
191-
dreamer_task.__name__,
199+
TaskMetadata(
200+
name=dreamer_task.__name__,
201+
),
192202
task_context=task_context,
193203
)
194204

0 commit comments

Comments
 (0)