|
20 | 20 | from models_library.projects_nodes_io import LocationID, StorageFileID |
21 | 21 | from pydantic import AnyUrl, ByteSize, TypeAdapter |
22 | 22 | from servicelib.aiohttp import status |
23 | | -from servicelib.celery.models import TaskMetadata, TaskUUID |
| 23 | +from servicelib.celery.models import TaskFilter, TaskMetadata, TaskUUID |
24 | 24 | from servicelib.celery.task_manager import TaskManager |
25 | 25 | from servicelib.logging_utils import log_context |
26 | 26 | from yarl import URL |
@@ -287,17 +287,18 @@ async def complete_upload_file( |
287 | 287 | # NOTE: completing a multipart upload on AWS can take up to several minutes |
288 | 288 | # if it returns slow we return a 202 - Accepted, the client will have to check later |
289 | 289 | # for completeness |
290 | | - async_job_name_data = AsyncJobFilter( |
| 290 | + job_filter = AsyncJobFilter( |
291 | 291 | user_id=query_params.user_id, |
292 | 292 | product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here |
293 | 293 | client_name=_ASYNC_JOB_CLIENT_NAME, |
294 | 294 | ) |
| 295 | + task_filter = TaskFilter.from_async_job_filter(job_filter) |
295 | 296 | task_uuid = await task_manager.submit_task( |
296 | 297 | TaskMetadata( |
297 | 298 | name=remote_complete_upload_file.__name__, |
298 | 299 | ), |
299 | | - task_filter=async_job_name_data, |
300 | | - user_id=async_job_name_data.user_id, |
| 300 | + task_filter=task_filter, |
| 301 | + user_id=job_filter.user_id, |
301 | 302 | location_id=location_id, |
302 | 303 | file_id=file_id, |
303 | 304 | body=body, |
@@ -344,19 +345,20 @@ async def is_completed_upload_file( |
344 | 345 | # therefore we wait a bit to see if it completes fast and return a 204 |
345 | 346 | # if it returns slow we return a 202 - Accepted, the client will have to check later |
346 | 347 | # for completeness |
347 | | - async_job_name_data = AsyncJobFilter( |
| 348 | + job_filter = AsyncJobFilter( |
348 | 349 | user_id=query_params.user_id, |
349 | 350 | product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here |
350 | 351 | client_name=_ASYNC_JOB_CLIENT_NAME, |
351 | 352 | ) |
| 353 | + task_filter = TaskFilter.from_async_job_filter(job_filter) |
352 | 354 | task_status = await task_manager.get_task_status( |
353 | | - task_filter=async_job_name_data, task_uuid=TaskUUID(future_id) |
| 355 | + task_filter=task_filter, task_uuid=TaskUUID(future_id) |
354 | 356 | ) |
355 | 357 | # first check if the task is in the app |
356 | 358 | if task_status.is_done: |
357 | 359 | task_result = TypeAdapter(FileMetaData).validate_python( |
358 | 360 | await task_manager.get_task_result( |
359 | | - task_filter=async_job_name_data, |
| 361 | + task_filter=task_filter, |
360 | 362 | task_uuid=TaskUUID(future_id), |
361 | 363 | ) |
362 | 364 | ) |
|
0 commit comments