diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py index 005c9c95412d..7dc2b03c41b7 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py @@ -1,9 +1,10 @@ from datetime import datetime -from typing import Annotated, Any, NamedTuple +from typing import Any, NamedTuple +from models_library.services_types import ServiceRunID from pydantic import ( + AnyUrl, BaseModel, - BeforeValidator, ConfigDict, PositiveInt, ) @@ -62,20 +63,16 @@ class ComputationRunRpcGetPage(NamedTuple): total: PositiveInt -def _none_to_zero_float_pre_validator(value: Any): - if value is None: - return 0.0 - return value - - class ComputationTaskRpcGet(BaseModel): project_uuid: ProjectID node_id: NodeID state: RunningState - progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)] + progress: float image: dict[str, Any] started_at: datetime | None ended_at: datetime | None + log_download_link: AnyUrl | None + service_run_id: ServiceRunID model_config = ConfigDict( json_schema_extra={ @@ -92,6 +89,8 @@ class ComputationTaskRpcGet(BaseModel): }, "started_at": "2023-01-11 13:11:47.293595", "ended_at": "2023-01-11 13:11:47.293595", + "log_download_link": "https://example.com/logs", + "service_run_id": "comp_1_12e0c8b2-bad6-40fb-9948-8dec4f65d4d9_1", } ] } diff --git a/packages/models-library/src/models_library/api_schemas_webserver/computations.py b/packages/models-library/src/models_library/api_schemas_webserver/computations.py index 6a49129034fa..4fdb56a5d4e8 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/computations.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/computations.py @@ -1,8 +1,10 @@ from datetime import datetime +from decimal import Decimal from typing import Annotated, Any from common_library.basic_types import DEFAULT_FACTORY from pydantic import ( + AnyUrl, BaseModel, ConfigDict, Field, @@ -123,3 +125,6 @@ class ComputationTaskRestGet(OutputSchema): image: dict[str, Any] started_at: datetime | None ended_at: datetime | None + log_download_link: AnyUrl | None + node_name: str + osparc_credits: Decimal | None diff --git a/packages/models-library/src/models_library/computations.py b/packages/models-library/src/models_library/computations.py new file mode 100644 index 000000000000..6da595db5e19 --- /dev/null +++ b/packages/models-library/src/models_library/computations.py @@ -0,0 +1,24 @@ +from datetime import datetime +from decimal import Decimal +from typing import Any + +from pydantic import AnyUrl, BaseModel + +from .projects import ProjectID +from .projects_nodes_io import NodeID +from .projects_state import RunningState + + +class ComputationTaskWithAttributes(BaseModel): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: float + image: dict[str, Any] + started_at: datetime | None + ended_at: datetime | None + log_download_link: AnyUrl | None + + # Attributes added by the webserver + node_name: str + osparc_credits: Decimal | None diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py index 855f6c055dc1..17740881bb69 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py @@ -1,4 +1,5 @@ import logging +from decimal import Decimal from typing import Final from models_library.api_schemas_resource_usage_tracker import ( @@ -12,6 +13,7 @@ from models_library.projects import ProjectID from models_library.rabbitmq_basic_types import RPCMethodName from models_library.resource_tracker import CreditTransactionStatus +from models_library.services_types import ServiceRunID from models_library.wallets import WalletID from pydantic import NonNegativeInt, TypeAdapter @@ -82,3 +84,21 @@ async def pay_project_debt( new_wallet_transaction=new_wallet_transaction, timeout_s=_DEFAULT_TIMEOUT_S, ) + + +@log_decorator(_logger, level=logging.DEBUG) +async def get_transaction_current_credits_by_service_run_id( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + service_run_id: ServiceRunID, +) -> Decimal: + result = await rabbitmq_rpc_client.request( + RESOURCE_USAGE_TRACKER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python( + "get_transaction_current_credits_by_service_run_id" + ), + service_run_id=service_run_id, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, Decimal) # nosec + return result diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/errors.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/errors.py index 9ee0c7a4324a..8729c77462d2 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/errors.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/errors.py @@ -1,8 +1,7 @@ from common_library.errors_classes import OsparcErrorMixin -class LicensesBaseError(OsparcErrorMixin, Exception): - ... +class LicensesBaseError(OsparcErrorMixin, Exception): ... class NotEnoughAvailableSeatsError(LicensesBaseError): @@ -36,11 +35,13 @@ class WalletTransactionError(OsparcErrorMixin, Exception): msg_template = "{msg}" +class CreditTransactionNotFoundError(OsparcErrorMixin, Exception): ... + + ### Pricing Plans Error -class PricingPlanBaseError(OsparcErrorMixin, Exception): - ... +class PricingPlanBaseError(OsparcErrorMixin, Exception): ... class PricingUnitDuplicationError(PricingPlanBaseError): diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py index e7806162d3be..45f24d13835b 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py @@ -20,15 +20,13 @@ from models_library.projects_nodes_io import NodeID from models_library.users import UserID from servicelib.utils import logged_gather -from simcore_sdk.node_ports_common.exceptions import NodeportsException -from simcore_sdk.node_ports_v2 import FileLinkType from starlette import status from ...models.comp_pipelines import CompPipelineAtDB from ...models.comp_tasks import CompTaskAtDB from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository -from ...utils.dask import get_service_log_file_download_link +from ...utils import dask as dask_utils from ..dependencies.database import get_repository log = logging.getLogger(__name__) @@ -81,31 +79,6 @@ async def analyze_pipeline( return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks) -async def _get_task_log_file( - user_id: UserID, project_id: ProjectID, node_id: NodeID -) -> TaskLogFileGet: - try: - log_file_url = await get_service_log_file_download_link( - user_id, project_id, node_id, file_link_type=FileLinkType.PRESIGNED - ) - - except NodeportsException as err: - # Unexpected error: Cannot determine the cause of failure - # to get donwload link and cannot handle it automatically. - # Will treat it as "not available" and log a warning - log_file_url = None - log.warning( - "Failed to get log-file of %s: %s.", - f"{user_id=}/{project_id=}/{node_id=}", - err, - ) - - return TaskLogFileGet( - task_id=node_id, - download_link=log_file_url, - ) - - # ROUTES HANDLERS -------------------------------------------------------------- @@ -133,7 +106,7 @@ async def get_all_tasks_log_files( tasks_logs_files: list[TaskLogFileGet] = await logged_gather( *[ - _get_task_log_file(user_id, project_id, node_id) + dask_utils.get_task_log_file(user_id, project_id, node_id) for node_id in iter_task_ids ], reraise=True, @@ -165,7 +138,7 @@ async def get_task_log_file( detail=[f"No task_id={node_uuid} found under computation {project_id}"], ) - return await _get_task_log_file(user_id, project_id, node_uuid) + return await dask_utils.get_task_log_file(user_id, project_id, node_uuid) @router.post( diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index dbc32828e36c..9d922a8ccb42 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -2,16 +2,22 @@ from fastapi import FastAPI from models_library.api_schemas_directorv2.comp_runs import ( ComputationRunRpcGetPage, + ComputationTaskRpcGet, ComputationTaskRpcGetPage, ) +from models_library.api_schemas_directorv2.computations import TaskLogFileGet from models_library.products import ProductName from models_library.projects import ProjectID from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from servicelib.rabbitmq import RPCRouter +from servicelib.utils import limited_gather +from simcore_service_director_v2.models.comp_tasks import ComputationTaskForRpcDBGet from ...modules.db.repositories.comp_runs import CompRunsRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository +from ...utils import dask as dask_utils router = RPCRouter() @@ -29,19 +35,33 @@ async def list_computations_latest_iteration_page( order_by: OrderBy | None = None, ) -> ComputationRunRpcGetPage: comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) - total, comp_runs = await comp_runs_repo.list_for_user__only_latest_iterations( - product_name=product_name, - user_id=user_id, - offset=offset, - limit=limit, - order_by=order_by, + total, comp_runs_output = ( + await comp_runs_repo.list_for_user__only_latest_iterations( + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + ) ) return ComputationRunRpcGetPage( - items=comp_runs, + items=comp_runs_output, total=total, ) +async def _fetch_task_log( + user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet +) -> TaskLogFileGet | None: + if not task.state.is_running(): + return await dask_utils.get_task_log_file( + user_id=user_id, + project_id=project_id, + node_id=task.node_id, + ) + return None + + @router.expose(reraise_if_error_type=()) async def list_computations_latest_iteration_tasks_page( app: FastAPI, @@ -59,13 +79,43 @@ async def list_computations_latest_iteration_tasks_page( assert user_id # nosec NOTE: Whether user_id has access to the project was checked in the webserver comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) - total, comp_runs = await comp_tasks_repo.list_computational_tasks_rpc_domain( + comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) + + comp_latest_run = await comp_runs_repo.get( + user_id=user_id, project_id=project_id, iteration=None # Returns last iteration + ) + + total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain( project_id=project_id, offset=offset, limit=limit, order_by=order_by, ) + + # Run all log fetches concurrently + log_files = await limited_gather( + *[_fetch_task_log(user_id, project_id, task) for task in comp_tasks], + limit=20, + ) + + comp_tasks_output = [ + ComputationTaskRpcGet( + project_uuid=task.project_uuid, + node_id=task.node_id, + state=task.state, + progress=task.progress, + image=task.image, + started_at=task.started_at, + ended_at=task.ended_at, + log_download_link=log_file.download_link if log_file else None, + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational( + user_id, project_id, task.node_id, comp_latest_run.iteration + ), + ) + for task, log_file in zip(comp_tasks, log_files, strict=True) + ] + return ComputationTaskRpcGetPage( - items=comp_runs, + items=comp_tasks_output, total=total, ) diff --git a/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py b/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py index 6d9835dd032c..3d10ddc20706 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py @@ -16,6 +16,7 @@ from models_library.services_resources import BootMode from pydantic import ( BaseModel, + BeforeValidator, ByteSize, ConfigDict, Field, @@ -257,3 +258,19 @@ def to_db_model(self, **exclusion_rules) -> dict[str, Any]: ] }, ) + + +def _none_to_zero_float_pre_validator(value: Any): + if value is None: + return 0.0 + return value + + +class ComputationTaskForRpcDBGet(BaseModel): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)] + image: dict[str, Any] + started_at: dt.datetime | None + ended_at: dt.datetime | None diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index b5f36eda07fa..8c32f978e2d1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -4,7 +4,6 @@ import arrow import sqlalchemy as sa -from models_library.api_schemas_directorv2.comp_runs import ComputationTaskRpcGet from models_library.basic_types import IDStr from models_library.errors import ErrorDict from models_library.projects import ProjectAtDB, ProjectID @@ -20,7 +19,7 @@ from sqlalchemy.dialects.postgresql import insert from .....core.errors import ComputationalTaskNotFoundError -from .....models.comp_tasks import CompTaskAtDB +from .....models.comp_tasks import CompTaskAtDB, ComputationTaskForRpcDBGet from .....modules.resource_usage_tracker_client import ResourceUsageTrackerClient from .....utils.computations import to_node_class from .....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB @@ -85,7 +84,7 @@ async def list_computational_tasks_rpc_domain( limit: int = 20, # ordering order_by: OrderBy | None = None, - ) -> tuple[int, list[ComputationTaskRpcGet]]: + ) -> tuple[int, list[ComputationTaskForRpcDBGet]]: if order_by is None: order_by = OrderBy(field=IDStr("task_id")) # default ordering @@ -126,7 +125,7 @@ async def list_computational_tasks_rpc_domain( total_count = await conn.scalar(count_query) items = [ - ComputationTaskRpcGet.model_validate( + ComputationTaskForRpcDBGet.model_validate( { **row, "state": DB_TO_RUNNING_STATE[row["state"]], # Convert the state diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 85e48275a9b8..18e67f2dfc14 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -19,6 +19,7 @@ ) from dask_task_models_library.container_tasks.utils import parse_dask_job_id from fastapi import FastAPI +from models_library.api_schemas_directorv2.computations import TaskLogFileGet from models_library.api_schemas_directorv2.services import NodeRequirements from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels from models_library.errors import ErrorDict @@ -32,6 +33,7 @@ from servicelib.logging_utils import log_catch, log_context from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.exceptions import ( + NodeportsException, S3InvalidPathError, StorageInvalidCall, ) @@ -342,7 +344,7 @@ async def compute_task_envs( return task_envs -async def get_service_log_file_download_link( +async def _get_service_log_file_download_link( user_id: UserID, project_id: ProjectID, node_id: NodeID, @@ -367,6 +369,31 @@ async def get_service_log_file_download_link( return None +async def get_task_log_file( + user_id: UserID, project_id: ProjectID, node_id: NodeID +) -> TaskLogFileGet: + try: + log_file_url = await _get_service_log_file_download_link( + user_id, project_id, node_id, file_link_type=FileLinkType.PRESIGNED + ) + + except NodeportsException as err: + # Unexpected error: Cannot determine the cause of failure + # to get donwload link and cannot handle it automatically. + # Will treat it as "not available" and log a warning + log_file_url = None + _logger.warning( + "Failed to get log-file of %s: %s.", + f"{user_id=}/{project_id=}/{node_id=}", + err, + ) + + return TaskLogFileGet( + task_id=node_id, + download_link=log_file_url, + ) + + async def clean_task_output_and_log_files_if_invalid( db_engine: AsyncEngine, user_id: UserID, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_credit_transactions.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_credit_transactions.py index 6e7cfcd5ad00..a2b15d5e6202 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_credit_transactions.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_credit_transactions.py @@ -1,3 +1,5 @@ +from decimal import Decimal + from fastapi import FastAPI from models_library.api_schemas_resource_usage_tracker.credit_transactions import ( CreditTransactionCreateBody, @@ -6,9 +8,11 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.resource_tracker import CreditTransactionStatus +from models_library.services_types import ServiceRunID from models_library.wallets import WalletID from servicelib.rabbitmq import RPCRouter from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( + CreditTransactionNotFoundError, WalletTransactionError, ) @@ -31,6 +35,18 @@ async def get_wallet_total_credits( ) +@router.expose(reraise_if_error_type=(CreditTransactionNotFoundError,)) +async def get_transaction_current_credits_by_service_run_id( + app: FastAPI, + *, + service_run_id: ServiceRunID, +) -> Decimal: + return await credit_transactions.get_transaction_current_credits_by_service_run_id( + db_engine=app.state.engine, + service_run_id=service_run_id, + ) + + @router.expose(reraise_if_error_type=()) async def get_project_wallet_total_credits( app: FastAPI, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/credit_transactions.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/credit_transactions.py index 3cd60cbce9a1..d2108df0a2b2 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/credit_transactions.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/credit_transactions.py @@ -1,3 +1,4 @@ +from decimal import Decimal from typing import Annotated from fastapi import Depends @@ -12,6 +13,7 @@ CreditTransactionId, CreditTransactionStatus, ) +from models_library.services_types import ServiceRunID from models_library.wallets import WalletID from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( @@ -214,3 +216,16 @@ async def pay_project_debt( task_suffix_name=f"sum_and_publish_credits_wallet_id{current_wallet_transaction_create.wallet_id}", fire_and_forget_tasks_collection=rut_fire_and_forget_tasks, ) + + +async def get_transaction_current_credits_by_service_run_id( + db_engine: AsyncEngine, + *, + service_run_id: ServiceRunID, +) -> Decimal: + return ( + await credit_transactions_db.get_transaction_current_credits_by_service_run_id( + db_engine, + service_run_id=service_run_id, + ) + ) diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/credit_transactions_db.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/credit_transactions_db.py index d51e344d11c8..f28ab1c54de8 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/credit_transactions_db.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/credit_transactions_db.py @@ -9,7 +9,11 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.resource_tracker import CreditTransactionId, CreditTransactionStatus +from models_library.services_types import ServiceRunID from models_library.wallets import WalletID +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( + CreditTransactionNotFoundError, +) from simcore_postgres_database.models.resource_tracker_credit_transactions import ( resource_tracker_credit_transactions, ) @@ -208,3 +212,22 @@ async def sum_wallet_credits( wallet_id=wallet_id, available_osparc_credits=Decimal(0) ) return WalletTotalCredits(wallet_id=wallet_id, available_osparc_credits=row[0]) + + +async def get_transaction_current_credits_by_service_run_id( + engine: AsyncEngine, + connection: AsyncConnection | None = None, + *, + service_run_id: ServiceRunID, +) -> Decimal: + async with transaction_context(engine, connection) as conn: + select_stmt = sa.select( + resource_tracker_credit_transactions.c.osparc_credits + ).where( + resource_tracker_credit_transactions.c.service_run_id == f"{service_run_id}" + ) + result = await conn.execute(select_stmt) + row = result.first() + if row is None: + raise CreditTransactionNotFoundError + return Decimal(row[0]) diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_api_service_runs__list_billable.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_api_service_runs__list_billable.py index a9027414d5da..16a95eeb8def 100644 --- a/services/resource-usage-tracker/tests/unit/with_dbs/test_api_service_runs__list_billable.py +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_api_service_runs__list_billable.py @@ -1,5 +1,6 @@ from collections.abc import Iterator from datetime import datetime, timedelta, timezone +from decimal import Decimal import pytest import sqlalchemy as sa @@ -14,7 +15,10 @@ from models_library.rest_ordering import OrderBy, OrderDirection from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq._errors import RPCServerError -from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import service_runs +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( + credit_transactions, + service_runs, +) from simcore_postgres_database.models.resource_tracker_credit_transactions import ( resource_tracker_credit_transactions, ) @@ -111,8 +115,20 @@ async def test_rpc_list_service_runs_which_was_billed( assert len(result.items) == 2 assert result.total == 2 - assert result.items[0].credit_cost < 0 + _get_credit_cost = result.items[0].credit_cost + assert _get_credit_cost + assert _get_credit_cost < 0 assert result.items[0].transaction_status in list(CreditTransactionStatus) + _get_service_run_id = result.items[0].service_run_id + + result = ( + await credit_transactions.get_transaction_current_credits_by_service_run_id( + rpc_client, + service_run_id=_get_service_run_id, + ) + ) + assert isinstance(result, Decimal) + assert result == _get_credit_cost @pytest.mark.rpc_test() diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 131f484114cf..8ca2372bd35a 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -9175,6 +9175,19 @@ components: format: date-time - type: 'null' title: Endedat + logDownloadLink: + anyOf: + - type: string + - type: 'null' + title: Logdownloadlink + nodeName: + type: string + title: Nodename + osparcCredits: + anyOf: + - type: string + - type: 'null' + title: Osparccredits type: object required: - projectUuid @@ -9184,6 +9197,9 @@ components: - image - startedAt - endedAt + - logDownloadLink + - nodeName + - osparcCredits title: ComputationTaskRestGet ConnectServiceToPricingPlanBodyParams: properties: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py index 1e5d035f83c5..693f8b77a630 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -1,16 +1,23 @@ +from decimal import Decimal + from aiohttp import web from models_library.api_schemas_directorv2.comp_runs import ( ComputationRunRpcGetPage, - ComputationTaskRpcGetPage, ) +from models_library.computations import ComputationTaskWithAttributes from models_library.products import ProductName from models_library.projects import ProjectID from models_library.rest_ordering import OrderBy from models_library.users import UserID from pydantic import NonNegativeInt from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( + credit_transactions, +) +from servicelib.utils import limited_gather -from ..projects.api import check_user_project_permission +from ..products.products_service import is_product_billable +from ..projects.api import check_user_project_permission, get_project_dict_legacy from ..rabbitmq import get_rabbitmq_rpc_client @@ -26,7 +33,7 @@ async def list_computations_latest_iteration( ) -> ComputationRunRpcGetPage: """Returns the list of computations (only latest iterations)""" rpc_client = get_rabbitmq_rpc_client(app) - return await computations.list_computations_latest_iteration_page( + _runs_get = await computations.list_computations_latest_iteration_page( rpc_client, product_name=product_name, user_id=user_id, @@ -35,6 +42,12 @@ async def list_computations_latest_iteration( order_by=order_by, ) + # NOTE: MD: Get project metadata + # NOTE: MD: Get Root project name + assert _runs_get # nosec + + return _runs_get + async def list_computations_latest_iteration_tasks( app: web.Application, @@ -46,7 +59,7 @@ async def list_computations_latest_iteration_tasks( limit: NonNegativeInt, # ordering order_by: OrderBy, -) -> ComputationTaskRpcGetPage: +) -> tuple[int, list[ComputationTaskWithAttributes]]: """Returns the list of tasks for the latest iteration of a computation""" await check_user_project_permission( @@ -54,7 +67,7 @@ async def list_computations_latest_iteration_tasks( ) rpc_client = get_rabbitmq_rpc_client(app) - return await computations.list_computations_latest_iteration_tasks_page( + _tasks_get = await computations.list_computations_latest_iteration_tasks_page( rpc_client, product_name=product_name, user_id=user_id, @@ -63,3 +76,44 @@ async def list_computations_latest_iteration_tasks( limit=limit, order_by=order_by, ) + + # Get node names (for all project nodes) + project_dict = await get_project_dict_legacy(app, project_uuid=project_id) + workbench = project_dict["workbench"] + + _service_run_ids = [item.service_run_id for item in _tasks_get.items] + _is_product_billable = await is_product_billable(app, product_name=product_name) + _service_run_osparc_credits: list[Decimal | None] + if _is_product_billable: + # NOTE: MD: can be improved with a single batch call + _service_run_osparc_credits = await limited_gather( + *[ + credit_transactions.get_transaction_current_credits_by_service_run_id( + rpc_client, service_run_id=_run_id + ) + for _run_id in _service_run_ids + ], + limit=20, + ) + else: + _service_run_osparc_credits = [None for _ in _service_run_ids] + + # Final output + _tasks_get_output = [ + ComputationTaskWithAttributes( + project_uuid=item.project_uuid, + node_id=item.node_id, + state=item.state, + progress=item.progress, + image=item.image, + started_at=item.started_at, + ended_at=item.ended_at, + log_download_link=item.log_download_link, + node_name=workbench[f"{item.node_id}"].get("label", ""), + osparc_credits=credits_or_none, + ) + for item, credits_or_none in zip( + _tasks_get.items, _service_run_osparc_credits, strict=True + ) + ] + return _tasks_get.total, _tasks_get_output diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index 9631d10f27ed..3a32ed966cae 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -100,25 +100,27 @@ async def list_computations_latest_iteration_tasks( ) path_params = parse_request_path_parameters_as(ComputationTaskPathParams, request) - _get = await _computations_service.list_computations_latest_iteration_tasks( - request.app, - product_name=req_ctx.product_name, - user_id=req_ctx.user_id, - project_id=path_params.project_id, - # pagination - offset=query_params.offset, - limit=query_params.limit, - # ordering - order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), + _total, _items = ( + await _computations_service.list_computations_latest_iteration_tasks( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + project_id=path_params.project_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), + ) ) page = Page[ComputationTaskRestGet].model_validate( paginate_data( chunk=[ ComputationTaskRestGet.model_validate(task, from_attributes=True) - for task in _get.items + for task in _items ], - total=_get.total, + total=_total, limit=query_params.limit, offset=query_params.offset, request_url=request.url, diff --git a/services/web/server/src/simcore_service_webserver/products/_repository.py b/services/web/server/src/simcore_service_webserver/products/_repository.py index 16a677b0c829..197e7781aca9 100644 --- a/services/web/server/src/simcore_service_webserver/products/_repository.py +++ b/services/web/server/src/simcore_service_webserver/products/_repository.py @@ -16,6 +16,7 @@ ProductPriceInfo, get_product_latest_price_info_or_none, get_product_latest_stripe_info_or_none, + is_payment_enabled, ) from simcore_postgres_database.utils_repos import ( pass_or_acquire_connection, @@ -231,3 +232,10 @@ async def auto_create_products_groups( product_groups_map[product_name] = product_group_id return product_groups_map + + async def is_product_billable( + self, product_name: str, connection: AsyncConnection | None = None + ) -> bool: + """This function returns False even if the product price is defined, but is 0""" + async with pass_or_acquire_connection(self.engine, connection) as conn: + return await is_payment_enabled(conn, product_name=product_name) diff --git a/services/web/server/src/simcore_service_webserver/products/_service.py b/services/web/server/src/simcore_service_webserver/products/_service.py index 032f20d8083e..234198e08e72 100644 --- a/services/web/server/src/simcore_service_webserver/products/_service.py +++ b/services/web/server/src/simcore_service_webserver/products/_service.py @@ -108,6 +108,13 @@ async def get_credit_amount( return CreditResult(product_name=product_name, credit_amount=credit_amount) +async def is_product_billable( + app: web.Application, *, product_name: ProductName +) -> bool: + repo = ProductRepository.create_from_app(app) + return await repo.is_product_billable(product_name=product_name) + + async def get_product_stripe_info( app: web.Application, *, product_name: ProductName ) -> ProductStripeInfo: diff --git a/services/web/server/src/simcore_service_webserver/products/products_service.py b/services/web/server/src/simcore_service_webserver/products/products_service.py index d21a0e9a27e5..2e888de625a9 100644 --- a/services/web/server/src/simcore_service_webserver/products/products_service.py +++ b/services/web/server/src/simcore_service_webserver/products/products_service.py @@ -3,6 +3,7 @@ get_product, get_product_stripe_info, get_product_ui, + is_product_billable, list_products, list_products_names, ) @@ -14,6 +15,7 @@ "get_product_ui", "list_products", "list_products_names", + "is_product_billable", ) # nopycln: file diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index b7a7d484c165..02cb2eaa338d 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -252,6 +252,17 @@ async def get_project_type( return await db.get_project_type(project_uuid) +async def get_project_dict_legacy( + app: web.Application, project_uuid: ProjectID +) -> ProjectDict: + db: ProjectDBAPI = app[APP_PROJECT_DBAPI] + assert db # nosec + project, _ = await db.get_project_dict_and_type( + f"{project_uuid}", + ) + return project + + # # UPDATE project ----------------------------------------------------- # diff --git a/services/web/server/src/simcore_service_webserver/projects/api.py b/services/web/server/src/simcore_service_webserver/projects/api.py index 3c9c409f2adf..787901094d2b 100644 --- a/services/web/server/src/simcore_service_webserver/projects/api.py +++ b/services/web/server/src/simcore_service_webserver/projects/api.py @@ -10,12 +10,13 @@ create_project_group_without_checking_permissions, delete_project_group_without_checking_permissions, ) -from ._projects_service import delete_project_by_user +from ._projects_service import delete_project_by_user, get_project_dict_legacy __all__: tuple[str, ...] = ( "check_user_project_permission", "create_project_group_without_checking_permissions", "delete_project_group_without_checking_permissions", + "get_project_dict_legacy", "has_user_project_access_rights", "list_projects", "delete_project_by_user", diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py index 0e51d74ecca4..272ae3b10c72 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py @@ -154,18 +154,17 @@ def mock_rpc_list_computations_latest_iteration_tasks( @pytest.fixture def mock_rpc_list_computations_latest_iteration_tasks_page( mocker: MockerFixture, + user_project: ProjectDict, ) -> ComputationTaskRpcGetPage: + workbench_ids = list(user_project["workbench"].keys()) + example = ComputationTaskRpcGet.model_config["json_schema_extra"]["examples"][0] + example["node_id"] = workbench_ids[0] + return mocker.patch( "simcore_service_webserver.director_v2._computations_service.computations.list_computations_latest_iteration_tasks_page", spec=True, return_value=ComputationTaskRpcGetPage( - items=[ - ComputationTaskRpcGet.model_validate( - ComputationTaskRpcGet.model_config["json_schema_extra"]["examples"][ - 0 - ] - ) - ], + items=[ComputationTaskRpcGet.model_validate(example)], total=1, ), )