Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions azimuth_capi/models/v1alpha1/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class ClusterStatus(schema.BaseModel, extra="allow"):
last_updated: schema.Optional[dt.datetime] = Field(
default=None, description="Used to trigger the timeout of pending states"
)
finished: bool = Field(
True, description="semaphore"
)


class Cluster(
Expand Down
59 changes: 54 additions & 5 deletions azimuth_capi/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,18 @@ async def ensure_platform(instance: api.Cluster, realm):
"realmName": realm.metadata.name,
},
}
print("dbg known services ")
print(instance.status.services.items())
print("dbg zenith or default")
print(platform["spec"].setdefault("zenithServices", {}))
for name, service in instance.status.services.items():
platform["spec"].setdefault("zenithServices", {})[name] = {
"subdomain": service.subdomain,
"fqdn": service.fqdn,
}
kopf.adopt(platform, instance.model_dump())
print("dbg platform")
print(platform)
return await ekclient.apply_object(platform, force=True)


Expand Down Expand Up @@ -1123,13 +1129,20 @@ async def on_cluster_secret_event(cluster, type, body, name, **kwargs): # noqa:
status.kubeconfig_secret_updated(cluster, body)


@model_handler(api.Cluster, kopf.on.resume)
@model_handler(api.Cluster, kopf.on.update, field="status.services")
async def on_cluster_services_updated(instance: api.Cluster, **kwargs):
@model_handler(api.Cluster, kopf.on.resume, param="resume")
@model_handler(api.Cluster, kopf.on.update, param="serviceupdate", field="status.services")
async def on_cluster_services_updated(instance: api.Cluster, param, **kwargs):
"""
Executed whenever the cluster services change.
"""
if not instance.status.finished:
raise kopf.TemporaryError("services still being written")
print("dbg run "+param)
print("dbg cluster before realm")
print(instance)
realm = await find_realm(instance)
print("dbg after realm")
print(instance)
await ensure_platform(instance, realm)


Expand Down Expand Up @@ -1294,6 +1307,8 @@ async def monitor_cluster_services(name, namespace, **kwargs):
kubeconfig_data, json_encoder=pydantic_encoder
).async_client(default_field_manager=settings.easykube_field_manager)
ekclusterstatus = await ekresource_for_model(api.Cluster, "status")
print("dbg got cluster status")
print(ekclusterstatus)
async with ekclient_target:
try:
ekzenithapi = ekclient_target.api(settings.zenith.api_version)
Expand All @@ -1315,7 +1330,9 @@ async def monitor_cluster_services(name, namespace, **kwargs):
"labels", {}
):
cluster_services[service_name] = service_status
await ekclusterstatus.json_patch(
print("dbg replace apply services")
print(cluster_services)
patchedclus = await ekclusterstatus.json_patch(
name,
[
{
Expand All @@ -1326,23 +1343,40 @@ async def monitor_cluster_services(name, namespace, **kwargs):
],
namespace=namespace,
)
print("dbg after replace apply")
print(patchedclus)
# For subsequent events, we just need to patch the state of the specified
# service
async for event in events:
event_type, reservation = event["type"], event.get("object")
if not reservation:
continue
print("dbg started")
await ekclusterstatus.json_patch(
name,
[
{
"op": "replace",
"path": "/status/finished",
"value": False,
},
],
namespace=namespace,
)
service_name = get_service_name(reservation)
if event_type in {"ADDED", "MODIFIED"}:
if reservation.get("status", {}).get("phase", "Unknown") == "Ready":
service_status = get_service_status(reservation)
addon = await annotate_addon_for_reservation(
name, namespace, reservation, service_name, service_status
)
print("dbg patch applying")
print(service_name)
print(service_status)
if addon and "capi.stackhpc.com/cluster" in addon.metadata.get(
"labels", {}
):
await ekclusterstatus.patch(
otherpatched = await ekclusterstatus.patch(
name,
{
"status": {
Expand All @@ -1353,8 +1387,11 @@ async def monitor_cluster_services(name, namespace, **kwargs):
},
namespace=namespace,
)
print("dbg after patch apply")
print(otherpatched)
elif event_type == "DELETED":
service_name = get_service_name(reservation)
print("dbg deleted "+service_name)
await ekclusterstatus.json_patch(
name,
[
Expand All @@ -1368,6 +1405,18 @@ async def monitor_cluster_services(name, namespace, **kwargs):
await annotate_addon_for_reservation(
name, namespace, reservation, service_name
)
print("dbg finished")
await ekclusterstatus.json_patch(
name,
[
{
"op": "replace",
"path": "/status/finished",
"value": True,
},
],
namespace=namespace,
)
except (ApiError, ssl.SSLCertVerificationError) as exc:
# These are expected, recoverable errors that we can retry
raise kopf.TemporaryError(str(exc))
Expand Down
Loading