Skip to content

Commit 0fb7103

Browse files
authored
♻️✨ Comp backend task state reporting fixed (#4775)
1 parent 4af4b44 commit 0fb7103

File tree

16 files changed

+512
-276
lines changed

16 files changed

+512
-276
lines changed

packages/postgres-database/src/simcore_postgres_database/migration/versions/c4245e9e0f72_payment_transactions_states.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,19 @@ def upgrade():
4646
"payments_transactions", sa.Column("state_message", sa.Text(), nullable=True)
4747
)
4848
connection.execute(
49-
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
49+
sa.DDL(
50+
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
51+
)
5052
)
5153
connection.execute(
52-
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
54+
sa.DDL(
55+
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
56+
)
5357
)
5458
connection.execute(
55-
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
59+
sa.DDL(
60+
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
61+
)
5662
)
5763
connection.execute("UPDATE payments_transactions SET state_message = errors")
5864

@@ -72,13 +78,19 @@ def downgrade():
7278

7379
connection = op.get_bind()
7480
connection.execute(
75-
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
81+
sa.DDL(
82+
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
83+
)
7684
)
7785
connection.execute(
78-
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
86+
sa.DDL(
87+
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
88+
)
7989
)
8090
connection.execute(
81-
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
91+
sa.DDL(
92+
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
93+
)
8294
)
8395

8496
op.drop_column("payments_transactions", "state_message")

packages/service-library/src/servicelib/rabbitmq/_rpc_router.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@ async def wrapper(*args, **kwargs):
3434
msg=f"calling {func.__name__} with {args}, {kwargs}",
3535
):
3636
try:
37-
result = await func(*args, **kwargs)
38-
return result
37+
return await func(*args, **kwargs)
3938
except asyncio.CancelledError:
4039
_logger.debug("call was cancelled")
4140
raise
4241
except Exception as exc: # pylint: disable=broad-except
4342
_logger.exception("Unhandled exception:")
43+
# NOTE: we do not return internal exceptions over RPC
4444
raise RPCServerError(
45-
method_name=func.__name__, exc_type=type(exc), msg=f"{exc}"
46-
) from exc
45+
method_name=func.__name__,
46+
exc_type=f"{type(exc)}",
47+
msg=f"{exc}",
48+
) from None
4749

4850
self.routes[RPCMethodName(func.__name__)] = wrapper
4951
return func

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,14 @@ async def create_computation( # noqa: C901, PLR0912
195195
publish=computation.start_pipeline or False,
196196
)
197197
assert computation.product_name # nosec
198+
min_computation_nodes: list[NodeID] = [
199+
NodeID(n) for n in minimal_computational_dag.nodes()
200+
]
198201
inserted_comp_tasks = await comp_tasks_repo.upsert_tasks_from_project(
199202
project,
200203
catalog_client,
201204
director_client,
202-
published_nodes=list(minimal_computational_dag.nodes())
203-
if computation.start_pipeline
204-
else [],
205+
published_nodes=min_computation_nodes if computation.start_pipeline else [],
205206
user_id=computation.user_id,
206207
product_name=computation.product_name,
207208
)

services/director-v2/src/simcore_service_director_v2/core/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ class ComputationalRunNotFoundError(PydanticErrorMixin, DirectorException):
103103
msg_template = "Computational run not found"
104104

105105

106+
class ComputationalTaskNotFoundError(PydanticErrorMixin, DirectorException):
107+
msg_template = "Computational task {node_id} not found"
108+
109+
106110
class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
107111
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."
108112

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from enum import Enum
2+
3+
4+
# NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item]
5+
# when using StrAutoEnum
6+
class DaskClientTaskState(str, Enum):
7+
PENDING = "PENDING"
8+
NO_WORKER = "NO_WORKER"
9+
PENDING_OR_STARTED = "PENDING_OR_STARTED"
10+
LOST = "LOST"
11+
ERRED = "ERRED"
12+
ABORTED = "ABORTED"
13+
SUCCESS = "SUCCESS"

services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
RemoteMethodNotRegisteredError,
1313
RPCMethodName,
1414
RPCNamespace,
15+
RPCServerError,
1516
)
1617

1718
from ..core.errors import (
@@ -59,3 +60,5 @@ async def get_or_create_on_demand_cluster(
5960
except RemoteMethodNotRegisteredError as exc:
6061
# no clusters-keeper, that is not going to work!
6162
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc
63+
except RPCServerError as exc:
64+
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc

0 commit comments

Comments
 (0)