From c9ca193a900375b7bf1b1c0805df3d29f4e7e687 Mon Sep 17 00:00:00 2001 From: wtripp180901 Date: Tue, 19 Aug 2025 11:07:08 +0100 Subject: [PATCH] PoC fix for cluster services race --- azimuth_capi/models/v1alpha1/cluster.py | 3 ++ azimuth_capi/operator.py | 59 ++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/azimuth_capi/models/v1alpha1/cluster.py b/azimuth_capi/models/v1alpha1/cluster.py index 2b92a694..a712d821 100644 --- a/azimuth_capi/models/v1alpha1/cluster.py +++ b/azimuth_capi/models/v1alpha1/cluster.py @@ -358,6 +358,9 @@ class ClusterStatus(schema.BaseModel, extra="allow"): last_updated: schema.Optional[dt.datetime] = Field( default=None, description="Used to trigger the timeout of pending states" ) + finished: bool = Field( + True, description="semaphore" + ) class Cluster( diff --git a/azimuth_capi/operator.py b/azimuth_capi/operator.py index a5a838bb..4555e95b 100644 --- a/azimuth_capi/operator.py +++ b/azimuth_capi/operator.py @@ -593,12 +593,18 @@ async def ensure_platform(instance: api.Cluster, realm): "realmName": realm.metadata.name, }, } + print("dbg known services ") + print(instance.status.services.items()) + print("dbg zenith or default") + print(platform["spec"].setdefault("zenithServices", {})) for name, service in instance.status.services.items(): platform["spec"].setdefault("zenithServices", {})[name] = { "subdomain": service.subdomain, "fqdn": service.fqdn, } kopf.adopt(platform, instance.model_dump()) + print("dbg platform") + print(platform) return await ekclient.apply_object(platform, force=True) @@ -1123,13 +1129,20 @@ async def on_cluster_secret_event(cluster, type, body, name, **kwargs): # noqa: status.kubeconfig_secret_updated(cluster, body) -@model_handler(api.Cluster, kopf.on.resume) -@model_handler(api.Cluster, kopf.on.update, field="status.services") -async def on_cluster_services_updated(instance: api.Cluster, **kwargs): +@model_handler(api.Cluster, kopf.on.resume, param="resume") +@model_handler(api.Cluster, kopf.on.update, param="serviceupdate", field="status.services") +async def on_cluster_services_updated(instance: api.Cluster, param, **kwargs): """ Executed whenever the cluster services change. """ + if not instance.status.finished: + raise kopf.TemporaryError("services still being written") + print("dbg run "+param) + print("dbg cluster before realm") + print(instance) realm = await find_realm(instance) + print("dbg after realm") + print(instance) await ensure_platform(instance, realm) @@ -1294,6 +1307,8 @@ async def monitor_cluster_services(name, namespace, **kwargs): kubeconfig_data, json_encoder=pydantic_encoder ).async_client(default_field_manager=settings.easykube_field_manager) ekclusterstatus = await ekresource_for_model(api.Cluster, "status") + print("dbg got cluster status") + print(ekclusterstatus) async with ekclient_target: try: ekzenithapi = ekclient_target.api(settings.zenith.api_version) @@ -1315,7 +1330,9 @@ async def monitor_cluster_services(name, namespace, **kwargs): "labels", {} ): cluster_services[service_name] = service_status - await ekclusterstatus.json_patch( + print("dbg replace apply services") + print(cluster_services) + patchedclus = await ekclusterstatus.json_patch( name, [ { @@ -1326,12 +1343,26 @@ async def monitor_cluster_services(name, namespace, **kwargs): ], namespace=namespace, ) + print("dbg after replace apply") + print(patchedclus) # For subsequent events, we just need to patch the state of the specified # service async for event in events: event_type, reservation = event["type"], event.get("object") if not reservation: continue + print("dbg started") + await ekclusterstatus.json_patch( + name, + [ + { + "op": "replace", + "path": "/status/finished", + "value": False, + }, + ], + namespace=namespace, + ) service_name = get_service_name(reservation) if event_type in {"ADDED", "MODIFIED"}: if reservation.get("status", {}).get("phase", "Unknown") == "Ready": @@ -1339,10 +1370,13 @@ async def monitor_cluster_services(name, namespace, **kwargs): addon = await annotate_addon_for_reservation( name, namespace, reservation, service_name, service_status ) + print("dbg patch applying") + print(service_name) + print(service_status) if addon and "capi.stackhpc.com/cluster" in addon.metadata.get( "labels", {} ): - await ekclusterstatus.patch( + otherpatched = await ekclusterstatus.patch( name, { "status": { @@ -1353,8 +1387,11 @@ async def monitor_cluster_services(name, namespace, **kwargs): }, namespace=namespace, ) + print("dbg after patch apply") + print(otherpatched) elif event_type == "DELETED": service_name = get_service_name(reservation) + print("dbg deleted "+service_name) await ekclusterstatus.json_patch( name, [ @@ -1368,6 +1405,18 @@ async def monitor_cluster_services(name, namespace, **kwargs): await annotate_addon_for_reservation( name, namespace, reservation, service_name ) + print("dbg finished") + await ekclusterstatus.json_patch( + name, + [ + { + "op": "replace", + "path": "/status/finished", + "value": True, + }, + ], + namespace=namespace, + ) except (ApiError, ssl.SSLCertVerificationError) as exc: # These are expected, recoverable errors that we can retry raise kopf.TemporaryError(str(exc))