Skip to content

Commit 66fb134

Browse files
fix(webhook,observer): validate resource limits and fix observer bugs
The overnight exercise run found that the webhook accepts resource specs where limits < requests, causing pods to fail at creation and clusters to get stuck in Progressing. The observer also had false positive errors for multi-cell primary counts and failed to discover external etcd endpoints. - Add validateResourceRequirements helper in pkg/resolver/validation.go checking limits >= requests for all component resource specs (etcd, multiadmin, multiadmin-web, multigateway, multiorch, pool postgres, pool multipooler) - Add countPrimariesForPool helper in observer status.go to count primaries across all cells when len(poolSpec.Cells) > 1, falling back to per-cell checks for single-cell pools - Update findEtcdAddress in observer topology.go to check GlobalTopoServer.External.Endpoints before managed etcd lookup; extract checkEtcdHealth helper for reuse - Extend probeTopoServerServices in observer connectivity.go to probe external etcd endpoints parsed from cluster specs Catches invalid resource specs at admission time instead of at pod creation, and eliminates false positive observer findings for multi-cell and external-etcd clusters.
1 parent b7ef800 commit 66fb134

File tree

7 files changed

+229
-75
lines changed

7 files changed

+229
-75
lines changed

pkg/resolver/validation.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,42 @@ func (r *Resolver) ValidateClusterLogic(
263263
))
264264
}
265265

266+
// ------------------------------------------------------------------
267+
// 0c. Resource Limits Validation (Top-Level Components)
268+
// ------------------------------------------------------------------
269+
if cluster.Spec.GlobalTopoServer != nil &&
270+
cluster.Spec.GlobalTopoServer.Etcd != nil {
271+
if err := validateResourceRequirements(
272+
cluster.Spec.GlobalTopoServer.Etcd.Resources, "etcd",
273+
); err != nil {
274+
return nil, err
275+
}
276+
}
277+
if cluster.Spec.MultiAdmin != nil && cluster.Spec.MultiAdmin.Spec != nil {
278+
if err := validateResourceRequirements(
279+
cluster.Spec.MultiAdmin.Spec.Resources, "multiadmin",
280+
); err != nil {
281+
return nil, err
282+
}
283+
}
284+
if cluster.Spec.MultiAdminWeb != nil && cluster.Spec.MultiAdminWeb.Spec != nil {
285+
if err := validateResourceRequirements(
286+
cluster.Spec.MultiAdminWeb.Spec.Resources, "multiadmin-web",
287+
); err != nil {
288+
return nil, err
289+
}
290+
}
291+
for _, cell := range cluster.Spec.Cells {
292+
if cell.Spec != nil {
293+
if err := validateResourceRequirements(
294+
cell.Spec.MultiGateway.Resources,
295+
fmt.Sprintf("cell '%s' multigateway", cell.Name),
296+
); err != nil {
297+
return nil, err
298+
}
299+
}
300+
}
301+
266302
// Iterate through every Shard and "Simulate" Resolution
267303
for _, db := range cluster.Spec.Databases {
268304
dbBackup := multigresv1alpha1.MergeBackupConfig(db.Backup, cluster.Spec.Backup)
@@ -383,6 +419,30 @@ func (r *Resolver) ValidateClusterLogic(
383419
}
384420
}
385421

422+
// ------------------------------------------------------------------
423+
// 3b. Resource Limits Validation (Resolved Shard Components)
424+
// ------------------------------------------------------------------
425+
if err := validateResourceRequirements(
426+
orch.Resources,
427+
fmt.Sprintf("shard '%s' multiorch", shard.Name),
428+
); err != nil {
429+
return nil, err
430+
}
431+
for poolName, pool := range pools {
432+
if err := validateResourceRequirements(
433+
pool.Postgres.Resources,
434+
fmt.Sprintf("shard '%s' pool '%s' postgres", shard.Name, poolName),
435+
); err != nil {
436+
return nil, err
437+
}
438+
if err := validateResourceRequirements(
439+
pool.Multipooler.Resources,
440+
fmt.Sprintf("shard '%s' pool '%s' multipooler", shard.Name, poolName),
441+
); err != nil {
442+
return nil, err
443+
}
444+
}
445+
386446
// ------------------------------------------------------------------
387447
// 4. Backup Validation
388448
// ------------------------------------------------------------------
@@ -519,6 +579,26 @@ func getEffectiveEtcdReplicas(cluster *multigresv1alpha1.MultigresCluster) int32
519579
return DefaultEtcdReplicas
520580
}
521581

582+
// validateResourceRequirements checks that for every resource type present in
583+
// both Limits and Requests, the limit is >= the request. Kubernetes enforces
584+
// this on Pods but NOT on CRDs, so we catch it early in the webhook.
585+
func validateResourceRequirements(resources corev1.ResourceRequirements, component string) error {
586+
for resourceName, limit := range resources.Limits {
587+
if request, ok := resources.Requests[resourceName]; ok {
588+
if limit.Cmp(request) < 0 {
589+
return fmt.Errorf(
590+
"%s: resource %s limit (%s) must be >= request (%s)",
591+
component,
592+
resourceName,
593+
limit.String(),
594+
request.String(),
595+
)
596+
}
597+
}
598+
}
599+
return nil
600+
}
601+
522602
// validateCellTopology checks if nodes exist in the cluster matching each cell's
523603
// zone or region topology label. Returns warnings (not errors) for any cells
524604
// whose topology labels don't match any nodes — pods targeting those cells will

tools/observer/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ RUN CGO_ENABLED=0 \
2424
-a -o multigres-observer \
2525
cmd/multigres-observer/main.go
2626

27-
FROM gcr.io/distroless/static:nonroot
27+
FROM alpine:3.23
28+
29+
RUN apk add --no-cache curl
2830

2931
LABEL org.opencontainers.image.source="https://github.com/numtide/multigres-operator"
3032
WORKDIR /

tools/observer/pkg/observer/connectivity.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"net/http"
10+
"net/url"
1011
"strings"
1112
"time"
1213

@@ -100,6 +101,7 @@ func (o *Observer) probeMultiOrchServices(ctx context.Context) {
100101
}
101102

102103
func (o *Observer) probeTopoServerServices(ctx context.Context) {
104+
// Probe managed etcd services.
103105
var svcs corev1.ServiceList
104106
if err := o.client.List(ctx, &svcs,
105107
o.listOpts(client.MatchingLabels{
@@ -115,6 +117,34 @@ func (o *Observer) probeTopoServerServices(ctx context.Context) {
115117
addr := fmt.Sprintf("%s.%s.svc", svc.Name, svc.Namespace)
116118
o.probeHTTP(ctx, addr, common.PortEtcdClient, "/health", "etcd-health", svc.Name)
117119
}
120+
121+
// Probe external etcd endpoints from clusters using external topo servers.
122+
var clusters multigresv1alpha1.MultigresClusterList
123+
if err := o.client.List(ctx, &clusters, o.listOpts()...); err != nil {
124+
return
125+
}
126+
for i := range clusters.Items {
127+
cluster := &clusters.Items[i]
128+
if cluster.Spec.GlobalTopoServer == nil ||
129+
cluster.Spec.GlobalTopoServer.External == nil {
130+
continue
131+
}
132+
for _, ep := range cluster.Spec.GlobalTopoServer.External.Endpoints {
133+
parsed, err := url.Parse(string(ep))
134+
if err != nil {
135+
continue
136+
}
137+
host := parsed.Hostname()
138+
port := common.PortEtcdClient
139+
if parsed.Port() != "" {
140+
if p, err := net.LookupPort("tcp", parsed.Port()); err == nil {
141+
port = p
142+
}
143+
}
144+
component := fmt.Sprintf("external-etcd/%s/%s", cluster.Name, host)
145+
o.probeHTTP(ctx, host, port, "/health", "etcd-health", component)
146+
}
147+
}
118148
}
119149

120150
func (o *Observer) probePoolPodHealth(ctx context.Context) {

tools/observer/pkg/observer/status.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -334,15 +334,14 @@ func (o *Observer) checkShardPodRoles(ctx context.Context, shard *multigresv1alp
334334
}
335335

336336
for poolName, poolSpec := range shard.Spec.Pools {
337-
for _, cellName := range poolSpec.Cells {
338-
violationKey := fmt.Sprintf("%s/%s-%s", comp, poolName, cellName)
339-
primaryCount := countPrimariesForPoolCell(
337+
if len(poolSpec.Cells) > 1 {
338+
// Multi-cell: expect exactly 1 primary across ALL cells for this pool.
339+
violationKey := fmt.Sprintf("%s/%s-global", comp, poolName)
340+
primaryCount := countPrimariesForPool(
340341
shard.Status.PodRoles,
341342
podLabels,
342343
string(poolName),
343-
string(cellName),
344344
)
345-
346345
if primaryCount != 1 {
347346
since, tracked := o.primaryViolationSince[violationKey]
348347
if !tracked {
@@ -355,21 +354,59 @@ func (o *Observer) checkShardPodRoles(ctx context.Context, shard *multigresv1alp
355354
Check: "crd-status",
356355
Component: comp,
357356
Message: fmt.Sprintf(
358-
"Pool %s cell %s has %d primaries (expected 1)",
357+
"Pool %s has %d primaries across all cells (expected 1)",
359358
poolName,
360-
cellName,
361359
primaryCount,
362360
),
363361
Details: map[string]any{
364362
"pool": string(poolName),
365-
"cell": string(cellName),
366363
"primaryCount": primaryCount,
364+
"multiCell": true,
367365
},
368366
})
369367
}
370368
} else {
371369
delete(o.primaryViolationSince, violationKey)
372370
}
371+
} else {
372+
// Single-cell: expect exactly 1 primary per cell.
373+
for _, cellName := range poolSpec.Cells {
374+
violationKey := fmt.Sprintf("%s/%s-%s", comp, poolName, cellName)
375+
primaryCount := countPrimariesForPoolCell(
376+
shard.Status.PodRoles,
377+
podLabels,
378+
string(poolName),
379+
string(cellName),
380+
)
381+
382+
if primaryCount != 1 {
383+
since, tracked := o.primaryViolationSince[violationKey]
384+
if !tracked {
385+
o.primaryViolationSince[violationKey] = now
386+
continue
387+
}
388+
if now.Sub(since) > common.PrimaryGracePeriod {
389+
o.reporter.Report(report.Finding{
390+
Severity: report.SeverityError,
391+
Check: "crd-status",
392+
Component: comp,
393+
Message: fmt.Sprintf(
394+
"Pool %s cell %s has %d primaries (expected 1)",
395+
poolName,
396+
cellName,
397+
primaryCount,
398+
),
399+
Details: map[string]any{
400+
"pool": string(poolName),
401+
"cell": string(cellName),
402+
"primaryCount": primaryCount,
403+
},
404+
})
405+
}
406+
} else {
407+
delete(o.primaryViolationSince, violationKey)
408+
}
409+
}
373410
}
374411
}
375412
}
@@ -380,6 +417,30 @@ type podPoolCell struct {
380417
cell string
381418
}
382419

420+
// countPrimariesForPool counts pods with a "primary" role across all cells
421+
// for the specified pool. Used for multi-cell durability policies where
422+
// exactly one primary exists across all cells.
423+
func countPrimariesForPool(
424+
podRoles map[string]string,
425+
podLabels map[string]podPoolCell,
426+
poolName string,
427+
) int {
428+
count := 0
429+
for podName, role := range podRoles {
430+
if role != "primary" && role != "PRIMARY" {
431+
continue
432+
}
433+
info, ok := podLabels[podName]
434+
if !ok {
435+
continue
436+
}
437+
if info.pool == poolName {
438+
count++
439+
}
440+
}
441+
return count
442+
}
443+
383444
// countPrimariesForPoolCell counts pods with a "primary" role that belong to
384445
// the specified pool and cell, using pod labels for accurate filtering.
385446
func countPrimariesForPoolCell(

tools/observer/pkg/observer/topology.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (o *Observer) checkTopology(ctx context.Context) {
2929
for i := range clusters.Items {
3030
cluster := &clusters.Items[i]
3131

32-
etcdAddr := o.findEtcdAddress(ctx, cluster.Name)
32+
etcdAddr := o.findEtcdAddress(ctx, cluster)
3333
if etcdAddr == "" {
3434
o.reporter.Report(report.Finding{
3535
Severity: report.SeverityWarn,
@@ -66,36 +66,51 @@ func (o *Observer) checkTopology(ctx context.Context) {
6666
}
6767
}
6868

69-
func (o *Observer) findEtcdAddress(ctx context.Context, clusterName string) string {
69+
func (o *Observer) findEtcdAddress(ctx context.Context, cluster *multigresv1alpha1.MultigresCluster) string {
70+
// Check external etcd endpoints first.
71+
if cluster.Spec.GlobalTopoServer != nil &&
72+
cluster.Spec.GlobalTopoServer.External != nil &&
73+
len(cluster.Spec.GlobalTopoServer.External.Endpoints) > 0 {
74+
for _, ep := range cluster.Spec.GlobalTopoServer.External.Endpoints {
75+
addr := strings.TrimRight(string(ep), "/")
76+
if o.checkEtcdHealth(addr) {
77+
return addr
78+
}
79+
}
80+
return ""
81+
}
82+
83+
// Fall through to managed etcd service lookup.
7084
var svcs corev1.ServiceList
7185
if err := o.client.List(ctx, &svcs,
7286
o.listOpts(client.MatchingLabels{
7387
common.LabelAppManagedBy: common.ManagedByMultigres,
7488
common.LabelAppComponent: common.ComponentGlobalTopo,
75-
common.LabelMultigresCluster: clusterName,
89+
common.LabelMultigresCluster: cluster.Name,
7690
})...,
7791
); err != nil || len(svcs.Items) == 0 {
7892
return ""
7993
}
8094

81-
// Use the first topo service found.
8295
svc := &svcs.Items[0]
8396
addr := fmt.Sprintf("http://%s.%s.svc:%d", svc.Name, svc.Namespace, common.PortEtcdClient)
97+
if o.checkEtcdHealth(addr) {
98+
return addr
99+
}
100+
return ""
101+
}
84102

85-
// Verify connectivity with a health check.
103+
// checkEtcdHealth verifies etcd is reachable at the given address.
104+
func (o *Observer) checkEtcdHealth(addr string) bool {
86105
resp, err := o.httpClient.Get(addr + "/health")
87106
if err != nil {
88-
return ""
107+
return false
89108
}
90109
defer func() {
91110
_, _ = io.Copy(io.Discard, resp.Body)
92111
_ = resp.Body.Close()
93112
}()
94-
95-
if resp.StatusCode != http.StatusOK {
96-
return ""
97-
}
98-
return addr
113+
return resp.StatusCode == http.StatusOK
99114
}
100115

101116
// etcdRangeResponse is the minimal JSON response from the etcd v3 gRPC-gateway range API.

0 commit comments

Comments
 (0)