|
3 | 3 | from urllib.parse import quote |
4 | 4 |
|
5 | 5 | from fastapi import APIRouter, Depends, Header, Request |
6 | | -from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter |
7 | 6 | from models_library.api_schemas_storage.storage_schemas import ( |
8 | 7 | FileMetaDataGet, |
9 | 8 | FileMetaDataGetv010, |
|
18 | 17 | ) |
19 | 18 | from models_library.generics import Envelope |
20 | 19 | from models_library.projects_nodes_io import LocationID, StorageFileID |
| 20 | +from models_library.users import UserID |
21 | 21 | from pydantic import AnyUrl, ByteSize, TypeAdapter |
22 | 22 | from servicelib.aiohttp import status |
23 | 23 | from servicelib.celery.models import TaskFilter, TaskMetadata, TaskUUID |
24 | 24 | from servicelib.celery.task_manager import TaskManager |
25 | 25 | from servicelib.logging_utils import log_context |
26 | 26 | from yarl import URL |
27 | 27 |
|
| 28 | +from ..._meta import APP_NAME |
28 | 29 | from ...dsm import get_dsm_provider |
29 | 30 | from ...exceptions.errors import FileMetaDataNotFoundError |
30 | 31 | from ...models import ( |
|
41 | 42 | from .._worker_tasks._files import complete_upload_file as remote_complete_upload_file |
42 | 43 | from .dependencies.celery import get_task_manager |
43 | 44 |
|
44 | | -_ASYNC_JOB_CLIENT_NAME: Final[str] = "storage" |
| 45 | + |
| 46 | +def _get_task_filter(*, user_id: UserID) -> TaskFilter: |
| 47 | + _data = { |
| 48 | + "user_id": user_id, |
| 49 | + "client_name": APP_NAME, |
| 50 | + } |
| 51 | + return TaskFilter().model_validate(_data) |
45 | 52 |
|
46 | 53 |
|
47 | 54 | _logger = logging.getLogger(__name__) |
@@ -287,18 +294,13 @@ async def complete_upload_file( |
287 | 294 | # NOTE: completing a multipart upload on AWS can take up to several minutes |
288 | 295 | # if it returns slow we return a 202 - Accepted, the client will have to check later |
289 | 296 | # for completeness |
290 | | - job_filter = AsyncJobFilter( |
291 | | - user_id=query_params.user_id, |
292 | | - product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here |
293 | | - client_name=_ASYNC_JOB_CLIENT_NAME, |
294 | | - ) |
295 | | - task_filter = TaskFilter.model_validate(job_filter.model_dump()) |
| 297 | + task_filter = _get_task_filter(user_id=query_params.user_id) |
296 | 298 | task_uuid = await task_manager.submit_task( |
297 | 299 | TaskMetadata( |
298 | 300 | name=remote_complete_upload_file.__name__, |
299 | 301 | ), |
300 | 302 | task_filter=task_filter, |
301 | | - user_id=job_filter.user_id, |
| 303 | + user_id=query_params.user_id, |
302 | 304 | location_id=location_id, |
303 | 305 | file_id=file_id, |
304 | 306 | body=body, |
@@ -345,12 +347,7 @@ async def is_completed_upload_file( |
345 | 347 | # therefore we wait a bit to see if it completes fast and return a 204 |
346 | 348 | # if it returns slow we return a 202 - Accepted, the client will have to check later |
347 | 349 | # for completeness |
348 | | - job_filter = AsyncJobFilter( |
349 | | - user_id=query_params.user_id, |
350 | | - product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here |
351 | | - client_name=_ASYNC_JOB_CLIENT_NAME, |
352 | | - ) |
353 | | - task_filter = TaskFilter.model_validate(job_filter.model_dump()) |
| 350 | + task_filter = _get_task_filter(user_id=query_params.user_id) |
354 | 351 | task_status = await task_manager.get_task_status( |
355 | 352 | task_filter=task_filter, task_uuid=TaskUUID(future_id) |
356 | 353 | ) |
|
0 commit comments