Skip to content

Commit 300dae0

Browse files
authored
✨Comp backend: Clusters-keeper can create a cluster with no wallet ID defined (#4746)
1 parent 8a99304 commit 300dae0

File tree

7 files changed

+25
-19
lines changed

7 files changed

+25
-19
lines changed

packages/models-library/src/models_library/rpc_schemas_clusters_keeper/clusters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ class OnDemandCluster(BaseModel):
2020
authentication: ClusterAuthentication
2121
state: ClusterState
2222
user_id: UserID
23-
wallet_id: WalletID
23+
wallet_id: WalletID | None
2424
gateway_ready: bool
2525
eta: datetime.timedelta

services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525

2626
async def create_cluster(
27-
app: FastAPI, *, user_id: UserID, wallet_id: WalletID
27+
app: FastAPI, *, user_id: UserID, wallet_id: WalletID | None
2828
) -> list[EC2InstanceData]:
2929
ec2_client = get_ec2_client(app)
3030
app_settings = get_application_settings(app)
@@ -56,7 +56,7 @@ async def get_all_clusters(app: FastAPI) -> list[EC2InstanceData]:
5656

5757

5858
async def get_cluster(
59-
app: FastAPI, *, user_id: UserID, wallet_id: WalletID
59+
app: FastAPI, *, user_id: UserID, wallet_id: WalletID | None
6060
) -> EC2InstanceData:
6161
app_settings = get_application_settings(app)
6262
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec
@@ -70,7 +70,7 @@ async def get_cluster(
7070

7171

7272
async def cluster_heartbeat(
73-
app: FastAPI, *, user_id: UserID, wallet_id: WalletID
73+
app: FastAPI, *, user_id: UserID, wallet_id: WalletID | None
7474
) -> None:
7575
app_settings = get_application_settings(app)
7676
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec

services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
@router.expose()
1919
async def get_or_create_cluster(
20-
app: FastAPI, *, user_id: UserID, wallet_id: WalletID
20+
app: FastAPI, *, user_id: UserID, wallet_id: WalletID | None
2121
) -> OnDemandCluster:
2222
"""Get or create cluster for user_id and wallet_id
2323
This function will create a new instance on AWS if needed or return the already running one.

services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def _create_eta(
6666
def create_cluster_from_ec2_instance(
6767
instance: EC2InstanceData,
6868
user_id: UserID,
69-
wallet_id: WalletID,
69+
wallet_id: WalletID | None,
7070
gateway_password: SecretStr,
7171
*,
7272
gateway_ready: bool,

services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
def creation_ec2_tags(
20-
app_settings: ApplicationSettings, *, user_id: UserID, wallet_id: WalletID
20+
app_settings: ApplicationSettings, *, user_id: UserID, wallet_id: WalletID | None
2121
) -> EC2Tags:
2222
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec
2323
return _DEFAULT_CLUSTERS_KEEPER_TAGS | {
@@ -38,7 +38,7 @@ def get_user_id_from_tags(tags: EC2Tags) -> UserID:
3838

3939

4040
def ec2_instances_for_user_wallet_filter(
41-
user_id: UserID, wallet_id: WalletID
41+
user_id: UserID, wallet_id: WalletID | None
4242
) -> EC2Tags:
4343
return (
4444
_DEFAULT_CLUSTERS_KEEPER_TAGS

services/clusters-keeper/tests/unit/test_rpc_clusters.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _base_configuration(
7171
async def _assert_cluster_instance_created(
7272
ec2_client: EC2Client,
7373
user_id: UserID,
74-
wallet_id: WalletID,
74+
wallet_id: WalletID | None,
7575
) -> None:
7676
instances = await ec2_client.describe_instances()
7777
assert len(instances["Reservations"]) == 1
@@ -92,11 +92,16 @@ async def _assert_cluster_instance_created(
9292
assert instances["Reservations"][0]["Instances"][0]["Tags"][1]["Key"] == "Name"
9393
assert "Value" in instances["Reservations"][0]["Instances"][0]["Tags"][1]
9494
instance_name = instances["Reservations"][0]["Instances"][0]["Tags"][1]["Value"]
95-
96-
parse_result = search("user_id:{user_id:d}-wallet_id:{wallet_id:d}", instance_name)
95+
search_str = (
96+
"user_id:{user_id:d}-wallet_id:{wallet_id:d}"
97+
if wallet_id
98+
else "user_id:{user_id:d}-wallet_id:None"
99+
)
100+
parse_result = search(search_str, instance_name)
97101
assert isinstance(parse_result, Result)
98102
assert parse_result["user_id"] == user_id
99-
assert parse_result["wallet_id"] == wallet_id
103+
if wallet_id:
104+
assert parse_result["wallet_id"] == wallet_id
100105

101106

102107
async def _assert_cluster_heartbeat_on_instance(
@@ -136,26 +141,30 @@ def mocked_dask_ping_gateway(mocker: MockerFixture) -> MockedDaskModule:
136141
)
137142

138143

144+
@pytest.mark.parametrize("use_wallet_id", [True, False])
139145
async def test_get_or_create_cluster(
140146
_base_configuration: None,
141147
clusters_keeper_rabbitmq_rpc_client: RabbitMQRPCClient,
142148
ec2_client: EC2Client,
143149
user_id: UserID,
144150
wallet_id: WalletID,
151+
use_wallet_id: bool,
145152
mocked_dask_ping_gateway: MockedDaskModule,
146153
):
147154
# send rabbitmq rpc to create_cluster
148155
rpc_response = await clusters_keeper_rabbitmq_rpc_client.request(
149156
CLUSTERS_KEEPER_NAMESPACE,
150157
RPCMethodName("get_or_create_cluster"),
151158
user_id=user_id,
152-
wallet_id=wallet_id,
159+
wallet_id=wallet_id if use_wallet_id else None,
153160
)
154161
assert rpc_response
155162
assert isinstance(rpc_response, OnDemandCluster)
156163
created_cluster = rpc_response
157164
# check we do have a new machine in AWS
158-
await _assert_cluster_instance_created(ec2_client, user_id, wallet_id)
165+
await _assert_cluster_instance_created(
166+
ec2_client, user_id, wallet_id if use_wallet_id else None
167+
)
159168
# it is called once as moto server creates instances instantly
160169
mocked_dask_ping_gateway.ping_gateway.assert_called_once()
161170
mocked_dask_ping_gateway.ping_gateway.reset_mock()
@@ -165,7 +174,7 @@ async def test_get_or_create_cluster(
165174
CLUSTERS_KEEPER_NAMESPACE,
166175
RPCMethodName("get_or_create_cluster"),
167176
user_id=user_id,
168-
wallet_id=wallet_id,
177+
wallet_id=wallet_id if use_wallet_id else None,
169178
)
170179
assert rpc_response
171180
assert isinstance(rpc_response, OnDemandCluster)

services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import datetime
22
import logging
3-
from typing import Final
43

54
from models_library.clusters import BaseCluster, ClusterTypeInModel
65
from models_library.rpc_schemas_clusters_keeper.clusters import (
76
ClusterState,
87
OnDemandCluster,
98
)
109
from models_library.users import UserID
11-
from models_library.wallets import WalletID
1210
from servicelib.rabbitmq import (
1311
RabbitMQRPCClient,
1412
RemoteMethodNotRegisteredError,
@@ -23,7 +21,6 @@
2321

2422
_logger = logging.getLogger(__name__)
2523

26-
_TEMPORARY_DEFAULT_WALLET_ID: Final[WalletID] = 43
2724
_TIME_FORMAT = "{:02d}:{:02d}" # format for minutes:seconds
2825

2926

@@ -40,7 +37,7 @@ async def get_or_create_on_demand_cluster(
4037
RPCMethodName("get_or_create_cluster"),
4138
timeout_s=300,
4239
user_id=user_id,
43-
wallet_id=_TEMPORARY_DEFAULT_WALLET_ID,
40+
wallet_id=None, # NOTE: --> MD this will need to be replaced by the real walletID
4441
)
4542
_logger.info("received cluster: %s", returned_cluster)
4643
if returned_cluster.state is not ClusterState.RUNNING:

0 commit comments

Comments
 (0)