Skip to content

Commit a66d944

Browse files
committed
further fixes to OwnerMetadata
1 parent 87c63ef commit a66d944

File tree

6 files changed

+29
-33
lines changed

6 files changed

+29
-33
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
FunctionJobCacheNotFoundError,
4848
)
4949
from .models.api_resources import JobLinks
50-
from .models.domain.celery_models import ApiWorkerTaskFilter
50+
from .models.domain.celery_models import ApiServerOwnerMetadata
5151
from .models.schemas.functions import FunctionJobCreationTaskStatus
5252
from .models.schemas.jobs import JobInputs, JobPricingSpecification
5353
from .services_http.webserver import AuthSession
@@ -79,7 +79,7 @@ async def _celery_task_status(
7979
) -> FunctionJobCreationTaskStatus:
8080
if job_creation_task_id is None:
8181
return FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED
82-
task_filter = ApiWorkerTaskFilter(
82+
task_filter = ApiServerOwnerMetadata(
8383
user_id=user_id,
8484
product_name=product_name,
8585
)
@@ -379,7 +379,7 @@ async def create_function_job_creation_task(
379379
)
380380

381381
# run function in celery task
382-
task_filter = ApiWorkerTaskFilter(
382+
task_filter = ApiServerOwnerMetadata(
383383
user_id=user_identity.user_id, product_name=user_identity.product_name
384384
)
385385

services/api-server/src/simcore_service_api_server/api/routes/tasks.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
from models_library.users import UserID
1818
from servicelib.celery.models import TaskState, TaskUUID
1919
from servicelib.fastapi.dependencies import get_app
20-
from simcore_service_api_server.models.domain.celery_models import ApiWorkerTaskFilter
20+
from simcore_service_api_server.models.domain.celery_models import (
21+
ApiServerOwnerMetadata,
22+
)
2123

2224
from ...models.schemas.base import ApiServerEnvelope
2325
from ...models.schemas.errors import ErrorGet
@@ -58,7 +60,7 @@ async def list_tasks(
5860
product_name: Annotated[ProductName, Depends(get_product_name)],
5961
):
6062
task_manager = get_task_manager(app)
61-
task_filter = ApiWorkerTaskFilter(
63+
task_filter = ApiServerOwnerMetadata(
6264
user_id=user_id,
6365
product_name=product_name,
6466
)
@@ -103,7 +105,7 @@ async def get_task_status(
103105
product_name: Annotated[ProductName, Depends(get_product_name)],
104106
):
105107
task_manager = get_task_manager(app)
106-
task_filter = ApiWorkerTaskFilter(
108+
task_filter = ApiServerOwnerMetadata(
107109
user_id=user_id,
108110
product_name=product_name,
109111
)
@@ -141,7 +143,7 @@ async def cancel_task(
141143
product_name: Annotated[ProductName, Depends(get_product_name)],
142144
):
143145
task_manager = get_task_manager(app)
144-
task_filter = ApiWorkerTaskFilter(
146+
task_filter = ApiServerOwnerMetadata(
145147
user_id=user_id,
146148
product_name=product_name,
147149
)
@@ -176,7 +178,7 @@ async def get_task_result(
176178
product_name: Annotated[ProductName, Depends(get_product_name)],
177179
):
178180
task_manager = get_task_manager(app)
179-
task_filter = ApiWorkerTaskFilter(
181+
task_filter = ApiServerOwnerMetadata(
180182
user_id=user_id,
181183
product_name=product_name,
182184
)

services/api-server/src/simcore_service_api_server/models/domain/celery_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434

3535

36-
class ApiWorkerTaskFilter(OwnerMetadata):
36+
class ApiServerOwnerMetadata(OwnerMetadata):
3737
user_id: UserID
3838
product_name: ProductName
3939
owner: Annotated[

services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
55
AsyncJobGet,
66
AsyncJobId,
7-
AsyncJobOwnerMetadata,
87
AsyncJobResult,
98
AsyncJobStatus,
109
)
@@ -15,6 +14,7 @@
1514
JobSchedulerError,
1615
)
1716
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
17+
from servicelib.celery.models import OwnerMetadata
1818
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
1919
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
2020

@@ -41,13 +41,13 @@ class AsyncJobClient:
4141
}
4242
)
4343
async def cancel(
44-
self, *, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
44+
self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata
4545
) -> None:
4646
return await async_jobs.cancel(
4747
self._rabbitmq_rpc_client,
4848
rpc_namespace=STORAGE_RPC_NAMESPACE,
4949
job_id=job_id,
50-
owner_metadata=job_filter,
50+
owner_metadata=owner_metadata,
5151
)
5252

5353
@_exception_mapper(
@@ -56,13 +56,13 @@ async def cancel(
5656
}
5757
)
5858
async def status(
59-
self, *, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
59+
self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata
6060
) -> AsyncJobStatus:
6161
return await async_jobs.status(
6262
self._rabbitmq_rpc_client,
6363
rpc_namespace=STORAGE_RPC_NAMESPACE,
6464
job_id=job_id,
65-
job_filter=job_filter,
65+
owner_metadata=owner_metadata,
6666
)
6767

6868
@_exception_mapper(
@@ -74,25 +74,23 @@ async def status(
7474
}
7575
)
7676
async def result(
77-
self, *, job_id: AsyncJobId, job_filter: AsyncJobOwnerMetadata
77+
self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata
7878
) -> AsyncJobResult:
7979
return await async_jobs.result(
8080
self._rabbitmq_rpc_client,
8181
rpc_namespace=STORAGE_RPC_NAMESPACE,
8282
job_id=job_id,
83-
job_filter=job_filter,
83+
owner_metadata=owner_metadata,
8484
)
8585

8686
@_exception_mapper(
8787
rpc_exception_map={
8888
JobSchedulerError: TaskSchedulerError,
8989
}
9090
)
91-
async def list_jobs(
92-
self, *, job_filter: AsyncJobOwnerMetadata
93-
) -> list[AsyncJobGet]:
91+
async def list_jobs(self, *, owner_metadata: OwnerMetadata) -> list[AsyncJobGet]:
9492
return await async_jobs.list_jobs(
9593
self._rabbitmq_rpc_client,
9694
rpc_namespace=STORAGE_RPC_NAMESPACE,
97-
job_filter=job_filter,
95+
owner_metadata=owner_metadata,
9896
)

services/api-server/src/simcore_service_api_server/services_rpc/storage.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,21 @@
33

44
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
55
AsyncJobGet,
6-
AsyncJobOwnerMetadata,
76
)
87
from models_library.api_schemas_webserver.storage import PathToExport
98
from models_library.products import ProductName
109
from models_library.users import UserID
1110
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
1211
from servicelib.rabbitmq.rpc_interfaces.storage import simcore_s3 as storage_rpc
12+
from simcore_service_api_server.models.domain.celery_models import (
13+
ApiServerOwnerMetadata,
14+
)
1315

14-
from .._meta import APP_NAME
1516
from ..exceptions.service_errors_utils import service_exception_mapper
1617

1718
_exception_mapper = partial(service_exception_mapper, service_name="Storage")
1819

1920

20-
def get_job_filter(user_id: UserID, product_name: ProductName) -> AsyncJobOwnerMetadata:
21-
return AsyncJobOwnerMetadata(
22-
user_id=user_id, product_name=product_name, owner=APP_NAME
23-
)
24-
25-
2621
@dataclass(frozen=True, kw_only=True)
2722
class StorageService:
2823
_rpc_client: RabbitMQRPCClient
@@ -38,9 +33,8 @@ async def start_data_export(
3833
self._rpc_client,
3934
paths_to_export=paths_to_export,
4035
export_as="download_link",
41-
job_filter=get_job_filter(
42-
user_id=self._user_id,
43-
product_name=self._product_name,
36+
owner_metadata=ApiServerOwnerMetadata(
37+
user_id=self._user_id, product_name=self._product_name
4438
),
4539
)
4640
return async_job_get

services/api-server/tests/unit/api_functions/celery/test_functions_celery.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
)
5757
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
5858
from simcore_service_api_server.models.api_resources import JobLinks
59-
from simcore_service_api_server.models.domain.celery_models import ApiWorkerTaskFilter
59+
from simcore_service_api_server.models.domain.celery_models import (
60+
ApiServerOwnerMetadata,
61+
)
6062
from simcore_service_api_server.models.domain.functions import (
6163
PreRegisteredFunctionJobData,
6264
)
@@ -280,7 +282,7 @@ async def test_celery_error_propagation(
280282
with_api_server_celery_worker: TestWorkController,
281283
):
282284

283-
task_filter = ApiWorkerTaskFilter(
285+
task_filter = ApiServerOwnerMetadata(
284286
user_id=user_identity.user_id,
285287
product_name=user_identity.product_name,
286288
)

0 commit comments

Comments
 (0)