Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4c88efb
rename
giancarloromeo Apr 23, 2025
391ca3b
add delete endpoint
giancarloromeo Apr 23, 2025
b965df1
add task deletion
giancarloromeo Apr 23, 2025
79126a8
fix api specs
giancarloromeo Apr 23, 2025
ae5581a
update openapi-spec
giancarloromeo Apr 23, 2025
3357e59
update method
giancarloromeo Apr 23, 2025
49b7f0d
update method
giancarloromeo Apr 23, 2025
36f7c51
fix name
giancarloromeo Apr 23, 2025
11b44b3
revert
giancarloromeo Apr 23, 2025
bd3149f
rename
giancarloromeo Apr 23, 2025
cfd9074
remove aborted task
giancarloromeo Apr 23, 2025
920ff30
update specs
giancarloromeo Apr 23, 2025
3b320db
rename
giancarloromeo Apr 23, 2025
6403a98
rename
giancarloromeo Apr 23, 2025
0bd2df3
rename
giancarloromeo Apr 23, 2025
84a64fc
rename
giancarloromeo Apr 24, 2025
507eecd
rename
giancarloromeo Apr 24, 2025
9b7b15c
update description
giancarloromeo Apr 24, 2025
5c6661d
Merge branch 'master' into is7563/clean-export-data-tasks-once-really…
giancarloromeo Apr 24, 2025
5e4cd00
Merge remote-tracking branch 'upstream/master' into is7563/clean-expo…
giancarloromeo Apr 24, 2025
5d2e114
Merge branch 'is7563/clean-export-data-tasks-once-really-consumed' of…
giancarloromeo Apr 24, 2025
c30c8d8
continue
giancarloromeo Apr 24, 2025
9969b7d
update test
giancarloromeo Apr 24, 2025
004b02a
rename
giancarloromeo Apr 24, 2025
4001b92
update name
giancarloromeo Apr 24, 2025
b0fa11b
move forget
giancarloromeo Apr 24, 2025
f4aed48
simplify
giancarloromeo Apr 24, 2025
c192715
continue
giancarloromeo Apr 24, 2025
c031079
fix test
giancarloromeo Apr 24, 2025
e63162c
Merge branch 'master' into is7563/clean-export-data-tasks-once-really…
giancarloromeo Apr 24, 2025
7fccacb
move build_task_id
giancarloromeo Apr 24, 2025
7cfffd7
Merge branch 'is7563/clean-export-data-tasks-once-really-consumed' of…
giancarloromeo Apr 24, 2025
75dc910
test is gone
giancarloromeo Apr 24, 2025
12a64d2
more test
giancarloromeo Apr 24, 2025
525cb15
Merge branch 'master' into is7563/clean-export-data-tasks-once-really…
giancarloromeo Apr 24, 2025
f8c88ae
Merge branch 'master' into is7563/clean-export-data-tasks-once-really…
giancarloromeo Apr 24, 2025
2c6d047
Merge branch 'master' into is7563/clean-export-data-tasks-once-really…
giancarloromeo Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,20 @@ def get_async_job_status(

@router.delete(
"/tasks/{task_id}",
name="cancel_and_delete_task",
description="Cancels and deletes a task",
name="delete_task",
description="Deletes a task",
responses=_export_data_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
def delete_async_job(
_path_params: Annotated[_PathParam, Depends()],
): ...


@router.post(
"/tasks/{task_id}:cancel",
name="cancel_task",
description="Cancels a task",
responses=_export_data_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ async def cancel(
)


async def delete(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
job_id_data: AsyncJobNameData,
) -> None:
await rabbitmq_rpc_client.request(
rpc_namespace,
TypeAdapter(RPCMethodName).validate_python("delete"),
job_id=job_id,
job_id_data=job_id_data,
timeout_s=_DEFAULT_TIMEOUT_S,
)


async def status(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
Expand Down Expand Up @@ -222,7 +238,7 @@ async def wait_and_get_result(
)
except (TimeoutError, CancelledError) as error:
try:
await cancel(
await abort(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
job_id=job_id,
Expand Down Expand Up @@ -254,7 +270,7 @@ async def submit_and_wait(
except (TimeoutError, CancelledError) as error:
if async_job_rpc_get is not None:
try:
await cancel(
await abort(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
job_id=async_job_rpc_get.job_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
raise JobSchedulerError(exc=f"{exc}") from exc


@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def delete(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
assert app # nosec
assert job_id_data # nosec
try:
await get_celery_client(app).delete_task(
task_context=job_id_data.model_dump(),
task_uuid=job_id,
)
except CeleryError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc


@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def status(
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class CeleryTaskClient:
_celery_app: Celery
_celery_settings: CelerySettings
_task_store: TaskInfoStore
_task_info_store: TaskInfoStore

async def submit_task(
self,
Expand Down Expand Up @@ -63,22 +63,34 @@ async def submit_task(
if task_metadata.ephemeral
else self._celery_settings.CELERY_RESULT_EXPIRES
)
await self._task_store.create_task(task_id, task_metadata, expiry=expiry)
await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
)
return task_uuid

@make_async()
def _abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None:
AbortableAsyncResult(
build_task_id(task_context, task_uuid), app=self._celery_app
).abort()
def _abort_task(self, task_id: TaskID) -> None:
AbortableAsyncResult(task_id, app=self._celery_app).abort()

async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"Abort task: {task_context=} {task_uuid=}",
msg=f"task abortion: {task_context=} {task_uuid=}",
):
task_id = build_task_id(task_context, task_uuid)
await self._abort_task(task_id)
await self._task_info_store.remove_task(task_id)

async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"task deletion: {task_context=} {task_uuid=}",
):
await self._abort_task(task_context, task_uuid)
task_id = build_task_id(task_context, task_uuid)
await self._task_info_store.remove_task(task_id)
await self._forget_task(task_id)

@make_async()
def _forget_task(self, task_id: TaskID) -> None:
Expand All @@ -96,18 +108,17 @@ async def get_task_result(
async_result = self._celery_app.AsyncResult(task_id)
result = async_result.result
if async_result.ready():
task_metadata = await self._task_store.get_task_metadata(task_id)
task_metadata = await self._task_info_store.get_task_metadata(task_id)
if task_metadata is not None and task_metadata.ephemeral:
await self._task_store.remove_task(task_id)
await self._forget_task(task_id)
await self.delete_task(task_context, task_uuid)
return result

async def _get_task_progress_report(
self, task_context: TaskContext, task_uuid: TaskUUID, task_state: TaskState
) -> ProgressReport:
if task_state in (TaskState.STARTED, TaskState.RETRY, TaskState.ABORTED):
task_id = build_task_id(task_context, task_uuid)
progress = await self._task_store.get_task_progress(task_id)
progress = await self._task_info_store.get_task_progress(task_id)
if progress is not None:
return progress
if task_state in (
Expand Down Expand Up @@ -153,4 +164,4 @@ async def list_tasks(self, task_context: TaskContext) -> list[Task]:
logging.DEBUG,
msg=f"Listing tasks: {task_context=}",
):
return await self._task_store.list_tasks(task_context)
return await self._task_info_store.list_tasks(task_context)
Original file line number Diff line number Diff line change
Expand Up @@ -3127,8 +3127,49 @@ paths:
delete:
tags:
- long-running-tasks
summary: Cancel And Delete Task
description: Cancels and deletes a task
summary: Delete Task
description: Deletes a task
operationId: delete_async_job
parameters:
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
responses:
'204':
description: Successful Response
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/EnvelopedError'
description: Not Found
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/EnvelopedError'
description: Forbidden
'410':
content:
application/json:
schema:
$ref: '#/components/schemas/EnvelopedError'
description: Gone
'500':
content:
application/json:
schema:
$ref: '#/components/schemas/EnvelopedError'
description: Internal Server Error
/v0/tasks/{task_id}:cancel:
post:
tags:
- long-running-tasks
summary: Cancel Task
description: Cancels a task
operationId: abort_async_job
parameters:
- name: task_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ async def get_async_job_status(request: web.Request) -> web.Response:

@routes.delete(
_task_prefix + "/{task_id}",
name="delete_async_job",
)
@login_required
@permission_required("storage.files.*")
@handle_export_data_exceptions
async def delete_async_job(request: web.Request) -> web.Response:

_req_ctx = RequestContext.model_validate(request)

rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app)
async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request)
await async_jobs.delete(
rabbitmq_rpc_client=rabbitmq_rpc_client,
rpc_namespace=STORAGE_RPC_NAMESPACE,
job_id=async_job_get.task_id,
job_id_data=AsyncJobNameData(
user_id=_req_ctx.user_id, product_name=_req_ctx.product_name
),
)
return web.Response(status=status.HTTP_204_NO_CONTENT)


@routes.post(
_task_prefix + "/{task_id}:cancel",
name="abort_async_job",
)
@login_required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,39 @@ async def test_abort_async_jobs(
backend_result_or_exception,
)

response = await client.post(f"/{API_VERSION}/tasks/{_job_id}:cancel")
assert response.status == expected_status


@pytest.mark.parametrize("user_role", _user_roles)
@pytest.mark.parametrize(
"backend_result_or_exception, expected_status",
[
(
AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())),
status.HTTP_204_NO_CONTENT,
),
(JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR),
(JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND),
],
ids=lambda x: type(x).__name__,
)
async def test_delete_async_jobs(
user_role: UserRole,
logged_user: UserInfoDict,
client: TestClient,
create_storage_rpc_client_mock: Callable[[str, str, Any], None],
faker: Faker,
backend_result_or_exception: Any,
expected_status: int,
):
_job_id = AsyncJobId(faker.uuid4())
create_storage_rpc_client_mock(
"simcore_service_webserver.tasks._rest",
f"async_jobs.{async_jobs.delete.__name__}",
backend_result_or_exception,
)

response = await client.delete(f"/{API_VERSION}/tasks/{_job_id}")
assert response.status == expected_status

Expand Down Expand Up @@ -645,7 +678,7 @@ async def test_get_user_async_jobs(
TaskStatus,
),
(
"DELETE",
"POST",
"abort_href",
async_jobs.cancel.__name__,
AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())),
Expand Down
Loading