Skip to content

Commit c888fcc

Browse files
🎨 Improve web-api listing task manager (download logs, credits, node name) (#7656)
1 parent c131aca commit c888fcc

File tree

23 files changed

+369
-84
lines changed

23 files changed

+369
-84
lines changed

packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from datetime import datetime
2-
from typing import Annotated, Any, NamedTuple
2+
from typing import Any, NamedTuple
33

4+
from models_library.services_types import ServiceRunID
45
from pydantic import (
6+
AnyUrl,
57
BaseModel,
6-
BeforeValidator,
78
ConfigDict,
89
PositiveInt,
910
)
@@ -62,20 +63,16 @@ class ComputationRunRpcGetPage(NamedTuple):
6263
total: PositiveInt
6364

6465

65-
def _none_to_zero_float_pre_validator(value: Any):
66-
if value is None:
67-
return 0.0
68-
return value
69-
70-
7166
class ComputationTaskRpcGet(BaseModel):
7267
project_uuid: ProjectID
7368
node_id: NodeID
7469
state: RunningState
75-
progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)]
70+
progress: float
7671
image: dict[str, Any]
7772
started_at: datetime | None
7873
ended_at: datetime | None
74+
log_download_link: AnyUrl | None
75+
service_run_id: ServiceRunID
7976

8077
model_config = ConfigDict(
8178
json_schema_extra={
@@ -92,6 +89,8 @@ class ComputationTaskRpcGet(BaseModel):
9289
},
9390
"started_at": "2023-01-11 13:11:47.293595",
9491
"ended_at": "2023-01-11 13:11:47.293595",
92+
"log_download_link": "https://example.com/logs",
93+
"service_run_id": "comp_1_12e0c8b2-bad6-40fb-9948-8dec4f65d4d9_1",
9594
}
9695
]
9796
}

packages/models-library/src/models_library/api_schemas_webserver/computations.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from datetime import datetime
2+
from decimal import Decimal
23
from typing import Annotated, Any
34

45
from common_library.basic_types import DEFAULT_FACTORY
56
from pydantic import (
7+
AnyUrl,
68
BaseModel,
79
ConfigDict,
810
Field,
@@ -123,3 +125,6 @@ class ComputationTaskRestGet(OutputSchema):
123125
image: dict[str, Any]
124126
started_at: datetime | None
125127
ended_at: datetime | None
128+
log_download_link: AnyUrl | None
129+
node_name: str
130+
osparc_credits: Decimal | None
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from datetime import datetime
2+
from decimal import Decimal
3+
from typing import Any
4+
5+
from pydantic import AnyUrl, BaseModel
6+
7+
from .projects import ProjectID
8+
from .projects_nodes_io import NodeID
9+
from .projects_state import RunningState
10+
11+
12+
class ComputationTaskWithAttributes(BaseModel):
13+
project_uuid: ProjectID
14+
node_id: NodeID
15+
state: RunningState
16+
progress: float
17+
image: dict[str, Any]
18+
started_at: datetime | None
19+
ended_at: datetime | None
20+
log_download_link: AnyUrl | None
21+
22+
# Attributes added by the webserver
23+
node_name: str
24+
osparc_credits: Decimal | None

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from decimal import Decimal
23
from typing import Final
34

45
from models_library.api_schemas_resource_usage_tracker import (
@@ -12,6 +13,7 @@
1213
from models_library.projects import ProjectID
1314
from models_library.rabbitmq_basic_types import RPCMethodName
1415
from models_library.resource_tracker import CreditTransactionStatus
16+
from models_library.services_types import ServiceRunID
1517
from models_library.wallets import WalletID
1618
from pydantic import NonNegativeInt, TypeAdapter
1719

@@ -82,3 +84,21 @@ async def pay_project_debt(
8284
new_wallet_transaction=new_wallet_transaction,
8385
timeout_s=_DEFAULT_TIMEOUT_S,
8486
)
87+
88+
89+
@log_decorator(_logger, level=logging.DEBUG)
90+
async def get_transaction_current_credits_by_service_run_id(
91+
rabbitmq_rpc_client: RabbitMQRPCClient,
92+
*,
93+
service_run_id: ServiceRunID,
94+
) -> Decimal:
95+
result = await rabbitmq_rpc_client.request(
96+
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
97+
_RPC_METHOD_NAME_ADAPTER.validate_python(
98+
"get_transaction_current_credits_by_service_run_id"
99+
),
100+
service_run_id=service_run_id,
101+
timeout_s=_DEFAULT_TIMEOUT_S,
102+
)
103+
assert isinstance(result, Decimal) # nosec
104+
return result

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/errors.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from common_library.errors_classes import OsparcErrorMixin
22

33

4-
class LicensesBaseError(OsparcErrorMixin, Exception):
5-
...
4+
class LicensesBaseError(OsparcErrorMixin, Exception): ...
65

76

87
class NotEnoughAvailableSeatsError(LicensesBaseError):
@@ -36,11 +35,13 @@ class WalletTransactionError(OsparcErrorMixin, Exception):
3635
msg_template = "{msg}"
3736

3837

38+
class CreditTransactionNotFoundError(OsparcErrorMixin, Exception): ...
39+
40+
3941
### Pricing Plans Error
4042

4143

42-
class PricingPlanBaseError(OsparcErrorMixin, Exception):
43-
...
44+
class PricingPlanBaseError(OsparcErrorMixin, Exception): ...
4445

4546

4647
class PricingUnitDuplicationError(PricingPlanBaseError):

services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@
2020
from models_library.projects_nodes_io import NodeID
2121
from models_library.users import UserID
2222
from servicelib.utils import logged_gather
23-
from simcore_sdk.node_ports_common.exceptions import NodeportsException
24-
from simcore_sdk.node_ports_v2 import FileLinkType
2523
from starlette import status
2624

2725
from ...models.comp_pipelines import CompPipelineAtDB
2826
from ...models.comp_tasks import CompTaskAtDB
2927
from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository
3028
from ...modules.db.repositories.comp_tasks import CompTasksRepository
31-
from ...utils.dask import get_service_log_file_download_link
29+
from ...utils import dask as dask_utils
3230
from ..dependencies.database import get_repository
3331

3432
log = logging.getLogger(__name__)
@@ -81,31 +79,6 @@ async def analyze_pipeline(
8179
return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks)
8280

8381

84-
async def _get_task_log_file(
85-
user_id: UserID, project_id: ProjectID, node_id: NodeID
86-
) -> TaskLogFileGet:
87-
try:
88-
log_file_url = await get_service_log_file_download_link(
89-
user_id, project_id, node_id, file_link_type=FileLinkType.PRESIGNED
90-
)
91-
92-
except NodeportsException as err:
93-
# Unexpected error: Cannot determine the cause of failure
94-
# to get donwload link and cannot handle it automatically.
95-
# Will treat it as "not available" and log a warning
96-
log_file_url = None
97-
log.warning(
98-
"Failed to get log-file of %s: %s.",
99-
f"{user_id=}/{project_id=}/{node_id=}",
100-
err,
101-
)
102-
103-
return TaskLogFileGet(
104-
task_id=node_id,
105-
download_link=log_file_url,
106-
)
107-
108-
10982
# ROUTES HANDLERS --------------------------------------------------------------
11083

11184

@@ -133,7 +106,7 @@ async def get_all_tasks_log_files(
133106

134107
tasks_logs_files: list[TaskLogFileGet] = await logged_gather(
135108
*[
136-
_get_task_log_file(user_id, project_id, node_id)
109+
dask_utils.get_task_log_file(user_id, project_id, node_id)
137110
for node_id in iter_task_ids
138111
],
139112
reraise=True,
@@ -165,7 +138,7 @@ async def get_task_log_file(
165138
detail=[f"No task_id={node_uuid} found under computation {project_id}"],
166139
)
167140

168-
return await _get_task_log_file(user_id, project_id, node_uuid)
141+
return await dask_utils.get_task_log_file(user_id, project_id, node_uuid)
169142

170143

171144
@router.post(

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,22 @@
22
from fastapi import FastAPI
33
from models_library.api_schemas_directorv2.comp_runs import (
44
ComputationRunRpcGetPage,
5+
ComputationTaskRpcGet,
56
ComputationTaskRpcGetPage,
67
)
8+
from models_library.api_schemas_directorv2.computations import TaskLogFileGet
79
from models_library.products import ProductName
810
from models_library.projects import ProjectID
911
from models_library.rest_ordering import OrderBy
12+
from models_library.services_types import ServiceRunID
1013
from models_library.users import UserID
1114
from servicelib.rabbitmq import RPCRouter
15+
from servicelib.utils import limited_gather
16+
from simcore_service_director_v2.models.comp_tasks import ComputationTaskForRpcDBGet
1217

1318
from ...modules.db.repositories.comp_runs import CompRunsRepository
1419
from ...modules.db.repositories.comp_tasks import CompTasksRepository
20+
from ...utils import dask as dask_utils
1521

1622
router = RPCRouter()
1723

@@ -29,19 +35,33 @@ async def list_computations_latest_iteration_page(
2935
order_by: OrderBy | None = None,
3036
) -> ComputationRunRpcGetPage:
3137
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
32-
total, comp_runs = await comp_runs_repo.list_for_user__only_latest_iterations(
33-
product_name=product_name,
34-
user_id=user_id,
35-
offset=offset,
36-
limit=limit,
37-
order_by=order_by,
38+
total, comp_runs_output = (
39+
await comp_runs_repo.list_for_user__only_latest_iterations(
40+
product_name=product_name,
41+
user_id=user_id,
42+
offset=offset,
43+
limit=limit,
44+
order_by=order_by,
45+
)
3846
)
3947
return ComputationRunRpcGetPage(
40-
items=comp_runs,
48+
items=comp_runs_output,
4149
total=total,
4250
)
4351

4452

53+
async def _fetch_task_log(
54+
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
55+
) -> TaskLogFileGet | None:
56+
if not task.state.is_running():
57+
return await dask_utils.get_task_log_file(
58+
user_id=user_id,
59+
project_id=project_id,
60+
node_id=task.node_id,
61+
)
62+
return None
63+
64+
4565
@router.expose(reraise_if_error_type=())
4666
async def list_computations_latest_iteration_tasks_page(
4767
app: FastAPI,
@@ -59,13 +79,43 @@ async def list_computations_latest_iteration_tasks_page(
5979
assert user_id # nosec NOTE: Whether user_id has access to the project was checked in the webserver
6080

6181
comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine)
62-
total, comp_runs = await comp_tasks_repo.list_computational_tasks_rpc_domain(
82+
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)
83+
84+
comp_latest_run = await comp_runs_repo.get(
85+
user_id=user_id, project_id=project_id, iteration=None # Returns last iteration
86+
)
87+
88+
total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain(
6389
project_id=project_id,
6490
offset=offset,
6591
limit=limit,
6692
order_by=order_by,
6793
)
94+
95+
# Run all log fetches concurrently
96+
log_files = await limited_gather(
97+
*[_fetch_task_log(user_id, project_id, task) for task in comp_tasks],
98+
limit=20,
99+
)
100+
101+
comp_tasks_output = [
102+
ComputationTaskRpcGet(
103+
project_uuid=task.project_uuid,
104+
node_id=task.node_id,
105+
state=task.state,
106+
progress=task.progress,
107+
image=task.image,
108+
started_at=task.started_at,
109+
ended_at=task.ended_at,
110+
log_download_link=log_file.download_link if log_file else None,
111+
service_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational(
112+
user_id, project_id, task.node_id, comp_latest_run.iteration
113+
),
114+
)
115+
for task, log_file in zip(comp_tasks, log_files, strict=True)
116+
]
117+
68118
return ComputationTaskRpcGetPage(
69-
items=comp_runs,
119+
items=comp_tasks_output,
70120
total=total,
71121
)

services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from models_library.services_resources import BootMode
1717
from pydantic import (
1818
BaseModel,
19+
BeforeValidator,
1920
ByteSize,
2021
ConfigDict,
2122
Field,
@@ -257,3 +258,19 @@ def to_db_model(self, **exclusion_rules) -> dict[str, Any]:
257258
]
258259
},
259260
)
261+
262+
263+
def _none_to_zero_float_pre_validator(value: Any):
264+
if value is None:
265+
return 0.0
266+
return value
267+
268+
269+
class ComputationTaskForRpcDBGet(BaseModel):
270+
project_uuid: ProjectID
271+
node_id: NodeID
272+
state: RunningState
273+
progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)]
274+
image: dict[str, Any]
275+
started_at: dt.datetime | None
276+
ended_at: dt.datetime | None

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import arrow
66
import sqlalchemy as sa
7-
from models_library.api_schemas_directorv2.comp_runs import ComputationTaskRpcGet
87
from models_library.basic_types import IDStr
98
from models_library.errors import ErrorDict
109
from models_library.projects import ProjectAtDB, ProjectID
@@ -20,7 +19,7 @@
2019
from sqlalchemy.dialects.postgresql import insert
2120

2221
from .....core.errors import ComputationalTaskNotFoundError
23-
from .....models.comp_tasks import CompTaskAtDB
22+
from .....models.comp_tasks import CompTaskAtDB, ComputationTaskForRpcDBGet
2423
from .....modules.resource_usage_tracker_client import ResourceUsageTrackerClient
2524
from .....utils.computations import to_node_class
2625
from .....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB
@@ -85,7 +84,7 @@ async def list_computational_tasks_rpc_domain(
8584
limit: int = 20,
8685
# ordering
8786
order_by: OrderBy | None = None,
88-
) -> tuple[int, list[ComputationTaskRpcGet]]:
87+
) -> tuple[int, list[ComputationTaskForRpcDBGet]]:
8988
if order_by is None:
9089
order_by = OrderBy(field=IDStr("task_id")) # default ordering
9190

@@ -126,7 +125,7 @@ async def list_computational_tasks_rpc_domain(
126125
total_count = await conn.scalar(count_query)
127126

128127
items = [
129-
ComputationTaskRpcGet.model_validate(
128+
ComputationTaskForRpcDBGet.model_validate(
130129
{
131130
**row,
132131
"state": DB_TO_RUNNING_STATE[row["state"]], # Convert the state

0 commit comments

Comments
 (0)