Skip to content

Commit b965df1

Browse files
add task deletion
1 parent 391ca3b commit b965df1

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333

3434
@router.expose(reraise_if_error_type=(JobSchedulerError,))
35-
async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
35+
async def abort(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
3636
assert app # nosec
3737
assert job_id_data # nosec
3838
try:
@@ -44,6 +44,19 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
4444
raise JobSchedulerError(exc=f"{exc}") from exc
4545

4646

47+
@router.expose(reraise_if_error_type=(JobSchedulerError,))
48+
async def deletet(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
49+
assert app # nosec
50+
assert job_id_data # nosec
51+
try:
52+
await get_celery_client(app).delete_task(
53+
task_context=job_id_data.model_dump(),
54+
task_uuid=job_id,
55+
)
56+
except CeleryError as exc:
57+
raise JobSchedulerError(exc=f"{exc}") from exc
58+
59+
4760
@router.expose(reraise_if_error_type=(JobSchedulerError,))
4861
async def status(
4962
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,20 @@ async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> No
7878
with log_context(
7979
_logger,
8080
logging.DEBUG,
81-
msg=f"Abort task: {task_context=} {task_uuid=}",
81+
msg=f"task abortion: {task_context=} {task_uuid=}",
8282
):
8383
await self._abort_task(task_context, task_uuid)
8484

85+
async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None:
86+
with log_context(
87+
_logger,
88+
logging.DEBUG,
89+
msg=f"task deletion: {task_context=} {task_uuid=}",
90+
):
91+
task_id = build_task_id(task_context, task_uuid)
92+
await self._task_info_store.remove_task(task_id)
93+
await self._forget_task(task_id)
94+
8595
@make_async()
8696
def _forget_task(self, task_id: TaskID) -> None:
8797
AbortableAsyncResult(task_id, app=self._celery_app).forget()
@@ -100,8 +110,7 @@ async def get_task_result(
100110
if async_result.ready():
101111
task_metadata = await self._task_info_store.get_task_metadata(task_id)
102112
if task_metadata is not None and task_metadata.ephemeral:
103-
await self._task_info_store.remove_task(task_id)
104-
await self._forget_task(task_id)
113+
await self.delete_task(task_context, task_uuid)
105114
return result
106115

107116
async def _get_task_progress_report(

0 commit comments

Comments
 (0)