Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# pylint: disable=too-many-instance-attributes
import contextlib
import logging
from dataclasses import dataclass
from typing import Final

from celery_library.errors import TaskNotFoundError
from common_library.exclude import as_dict_exclude_none
from models_library.functions import (
FunctionClass,
Expand Down Expand Up @@ -32,6 +33,10 @@
from models_library.users import UserID
from servicelib.celery.models import TaskMetadata, TasksQueue, TaskUUID
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_errors import create_troubleshooting_log_kwargs
from simcore_service_api_server.models.schemas.functions import (
FunctionJobCreationTaskStatus,
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ._service_function_jobs import FunctionJobService
Expand All @@ -43,15 +48,13 @@
)
from .models.api_resources import JobLinks
from .models.domain.celery_models import ApiWorkerTaskFilter
from .models.schemas.functions import FunctionJobCreationTaskStatus
from .models.schemas.jobs import JobInputs, JobPricingSpecification
from .services_http.webserver import AuthSession
from .services_rpc.storage import StorageService
from .services_rpc.wb_api_server import WbApiRpcClient

_JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_"
_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS: Final[str] = (
f"{_JOB_CREATION_TASK_STATUS_PREFIX}NOT_YET_SCHEDULED"
)
_logger = logging.getLogger(__name__)


def join_inputs(
Expand All @@ -75,15 +78,26 @@ async def _celery_task_status(
product_name: ProductName,
) -> str:
if job_creation_task_id is None:
return _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS
return FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED
task_filter = ApiWorkerTaskFilter(
user_id=user_id,
product_name=product_name,
)
task_status = await task_manager.get_task_status(
task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter
)
return f"{_JOB_CREATION_TASK_STATUS_PREFIX}{task_status.task_state}"
try:
task_status = await task_manager.get_task_status(
task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter
)
return FunctionJobCreationTaskStatus[task_status.task_state]
except TaskNotFoundError as err:
user_msg = f"Job creation task not found for task_uuid={TaskUUID(job_creation_task_id)}"
_logger.exception(
**create_troubleshooting_log_kwargs(
user_msg,
error=err,
tip="This probably means the celery task failed, because the task should have created the project_id.",
)
)
return FunctionJobCreationTaskStatus.ERROR


@dataclass(frozen=True, kw_only=True)
Expand Down Expand Up @@ -367,7 +381,7 @@ async def create_function_job_creation_task(
task_uuid = await self._celery_task_manager.submit_task(
TaskMetadata(
name="run_function",
ephemeral=True,
ephemeral=False,
queue=TasksQueue.API_WORKER_QUEUE,
),
task_filter=task_filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
FunctionJobCollectionsListFilters,
FunctionJobID,
)
from simcore_service_api_server.models.schemas.functions_filters import (
from simcore_service_api_server.models.schemas.functions import (
FunctionJobsListFilters,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ...models.domain.functions import PageRegisteredFunctionJobWithorWithoutStatus
from ...models.pagination import PaginationParams
from ...models.schemas.errors import ErrorGet
from ...models.schemas.functions_filters import FunctionJobsListFilters
from ...models.schemas.functions import FunctionJobsListFilters
from ...services_rpc.wb_api_server import WbApiRpcClient
from ..dependencies.authentication import get_current_user_id, get_product_name
from ..dependencies.functions import (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# pylint: disable=protected-access

from enum import StrEnum
from typing import Annotated, Final

from models_library.functions import FunctionID, FunctionJobCollectionID, FunctionJobID
from pydantic import BaseModel, ConfigDict, Field
from servicelib.celery.models import TaskState

_JOB_TASK_RUN_STATUS_PREFIX: Final[str] = "JOB_TASK_RUN_STATUS_"


class FunctionJobsListFilters(BaseModel):
"""Filters for listing function jobs"""

function_id: Annotated[
FunctionID | None,
Field(
description="Filter by function ID pattern",
),
] = None

function_job_ids: Annotated[
list[FunctionJobID] | None,
Field(
description="Filter by function job IDs",
),
] = None

function_job_collection_id: Annotated[
FunctionJobCollectionID | None,
Field(
description="Filter by function job collection ID",
),
] = None

model_config = ConfigDict(
extra="ignore",
)


class FunctionJobCreationTaskStatus(StrEnum):
PENDING = f"{_JOB_TASK_RUN_STATUS_PREFIX}PENDING"
STARTED = f"{_JOB_TASK_RUN_STATUS_PREFIX}STARTED"
RETRY = f"{_JOB_TASK_RUN_STATUS_PREFIX}RETRY"
SUCCESS = f"{_JOB_TASK_RUN_STATUS_PREFIX}SUCCESS"
FAILURE = f"{_JOB_TASK_RUN_STATUS_PREFIX}FAILURE"
NOT_YET_SCHEDULED = "JOB_TASK_NOT_YET_SCHEDULED" # api-server custom status
ERROR = "JOB_TASK_CREATION_FAILURE" # api-server custom status


assert {elm._name_ for elm in TaskState}.union({"NOT_YET_SCHEDULED", "ERROR"}) == {
elm._name_ for elm in FunctionJobCreationTaskStatus
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
from servicelib.celery.models import TaskFilter, TaskState, TaskStatus, TaskUUID
from simcore_service_api_server._meta import API_VTAG
from simcore_service_api_server._service_function_jobs_task_client import (
_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS,
_JOB_CREATION_TASK_STATUS_PREFIX,
FunctionJobTaskClientService,
)
from simcore_service_api_server.api.dependencies import services as service_dependencies
from simcore_service_api_server.models.schemas.functions import (
FunctionJobCreationTaskStatus,
)
from simcore_service_api_server.models.schemas.jobs import JobStatus

_faker = Faker()
Expand Down Expand Up @@ -368,11 +369,9 @@ async def _update_function_job_status_side_effect(*args, **kwargs):
):
assert data["status"] == job_status
elif project_job_id is None and job_creation_task_id is None:
assert data["status"] == _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS
assert data["status"] == FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED
elif project_job_id is None and job_creation_task_id is not None:
assert (
data["status"] == f"{_JOB_CREATION_TASK_STATUS_PREFIX}{celery_task_state}"
)
assert data["status"] == FunctionJobCreationTaskStatus[celery_task_state.name]
else:
pytest.fail("Unexpected combination of parameters")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# pylint: disable=redefined-outer-name

from collections.abc import Callable

import pytest
from celery_library.errors import TaskNotFoundError
from faker import Faker
from models_library.products import ProductName
from models_library.progress_bar import ProgressReport
from models_library.users import UserID
from pytest_mock import MockerFixture, MockType
from servicelib.celery.models import TaskState, TaskStatus, TaskUUID
from servicelib.celery.task_manager import TaskManager
from simcore_service_api_server._service_function_jobs_task_client import (
_celery_task_status,
)
from simcore_service_api_server.models.schemas.functions import (
FunctionJobCreationTaskStatus,
)

_faker = Faker()


@pytest.fixture
async def create_mock_task_manager(
mocker: MockerFixture,
) -> Callable[[TaskStatus | Exception], MockType]:

def _(status_or_exception: TaskStatus | Exception) -> MockType:
mock_task_manager = mocker.Mock(spec=TaskManager)
if isinstance(status_or_exception, Exception):

async def _raise(*args, **kwargs):
raise status_or_exception

mock_task_manager.get_task_status.side_effect = _raise
else:
mock_task_manager.get_task_status.return_value = status_or_exception
return mock_task_manager

return _


@pytest.mark.parametrize(
"status_or_exception",
[
TaskStatus(
task_uuid=TaskUUID(_faker.uuid4()),
task_state=state,
progress_report=ProgressReport(actual_value=3.14),
)
for state in list(TaskState)
]
+ [TaskNotFoundError(task_id=_faker.uuid4())],
)
@pytest.mark.parametrize("job_creation_task_id", [_faker.uuid4(), None])
async def test_celery_status_conversion(
status_or_exception: TaskStatus | Exception,
job_creation_task_id: str | None,
create_mock_task_manager: Callable[[TaskStatus | Exception], MockType],
user_id: UserID,
product_name: ProductName,
):

mock_task_manager = create_mock_task_manager(status_or_exception)

status = await _celery_task_status(
job_creation_task_id=job_creation_task_id,
task_manager=mock_task_manager,
user_id=user_id,
product_name=product_name,
)

if job_creation_task_id is None:
assert status == FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED
elif isinstance(status_or_exception, TaskNotFoundError):
assert status == FunctionJobCreationTaskStatus.ERROR
elif isinstance(status_or_exception, TaskStatus):
assert (
status == FunctionJobCreationTaskStatus[status_or_exception.task_state.name]
)
else:
pytest.fail("Unexpected test input")
Loading