From d2201bf224d8c5c43f290863ff009c6ad977b7d7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 24 Mar 2026 15:52:36 +1100 Subject: [PATCH 1/5] do not continue spawning goroutines if we've already timed out, and emit metrics when we have timed out --- pkg/monitor/cluster/cluster.go | 25 ++++++++-- pkg/monitor/cluster/cluster_test.go | 74 ++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 7 deletions(-) diff --git a/pkg/monitor/cluster/cluster.go b/pkg/monitor/cluster/cluster.go index 0837b64d08d..bb06a00901a 100644 --- a/pkg/monitor/cluster/cluster.go +++ b/pkg/monitor/cluster/cluster.go @@ -72,6 +72,8 @@ type Monitor struct { // Limit for items per pagination query queryLimit int + // Limit for goroutines per cluster Monitor instance + parallelism int } func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, env env.Interface, tenantID string, m metrics.Emitter, hourlyRun bool) (monitoring.Monitor, error) { @@ -160,6 +162,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu ocpclientset: clienthelper.NewWithClient(log, ocpclientset), namespacesToMonitor: []string{}, queryLimit: 50, + parallelism: MONITOR_GOROUTINES_PER_CLUSTER, } mon.collectors = []collectorFunc{ mon.emitAroOperatorHeartbeat, @@ -196,8 +199,16 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu } func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) (err error) { - innerNow := time.Now() collectorName := steps.ShortName(f) + + // Don't run collectors if we have already timed out + if ctx.Err() != nil { + mon.log.Debugf("skipping %s because %s:", collectorName, ctx.Err()) + mon.emitMonitorCollectorSkipped(collectorName) + return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()} + } + + innerNow := time.Now() mon.log.Debugf("running %s", collectorName) // If the collector panics we should return the error (so that it bubbles @@ -272,10 +283,10 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { return errors.Join(errs...) } - // Run up to MONITOR_GOROUTINES_PER_CLUSTER goroutines for collecting - // metrics + // Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER) + // goroutines for collecting metrics wg := new(errgroup.Group) - wg.SetLimit(MONITOR_GOROUTINES_PER_CLUSTER) + wg.SetLimit(mon.parallelism) // Create a channel capable of buffering one error from every collector errChan := make(chan error, len(mon.collectors)) @@ -305,7 +316,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { } // emit a metric with how long we took when we have no errors - if len(errs) == 0 { + if len(errs) == 0 && ctx.Err() == nil { mon.emitFloat("monitor.cluster.duration", time.Since(now).Seconds(), map[string]string{}) } @@ -316,6 +327,10 @@ func (mon *Monitor) emitMonitorCollectorError(collectorName string) { emitter.EmitGauge(mon.m, "monitor.cluster.collector.error", 1, mon.dims, map[string]string{"collector": collectorName}) } +func (mon *Monitor) emitMonitorCollectorSkipped(collectorName string) { + emitter.EmitGauge(mon.m, "monitor.cluster.collector.skipped", 1, mon.dims, map[string]string{"collector": collectorName}) +} + func (mon *Monitor) emitMonitorCollectionTiming(collectorName string, duration float64) { emitter.EmitFloat(mon.m, "monitor.cluster.collector.duration", duration, mon.dims, map[string]string{"collector": collectorName}) } diff --git a/pkg/monitor/cluster/cluster_test.go b/pkg/monitor/cluster/cluster_test.go index ea7c3c0139c..4a14c768b1c 100644 --- a/pkg/monitor/cluster/cluster_test.go +++ b/pkg/monitor/cluster/cluster_test.go @@ -36,7 +36,8 @@ type expectedMetric struct { } func TestMonitor(t *testing.T) { - ctx := context.Background() + var _ctx context.Context + var _cancel context.CancelFunc innerFailure := errors.New("failure inside") @@ -392,6 +393,71 @@ func TestMonitor(t *testing.T) { }, }, }, + { + name: "timeout during collector means other collectors are skipped", + healthzCall: func(r *http.Request) (*http.Response, error) { return &http.Response{StatusCode: http.StatusOK}, nil }, + collectors: func(m *Monitor) []collectorFunc { + return []collectorFunc{ + func(ctx context.Context) error { + _cancel() + return nil + }, + func(ctx context.Context) error { + return nil + }, + } + }, + expectedErrors: []error{ + &failureToRunClusterCollector{collectorName: "2"}, + context.Canceled, + }, + expectedGauges: []expectedMetric{ + { + name: "apiserver.healthz.code", + value: int64(1), + labels: map[string]string{ + "code": "200", + }, + }, + { + name: "monitor.cluster.collector.skipped", + value: int64(1), + labels: map[string]string{ + "collector": "2", + }, + }, + }, + expectedFloats: []expectedMetric{ + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "emitAPIServerHealthzCode", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "prefetchClusterVersion", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "fetchManagedNamespaces", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "1", + }, + }, + }, + }, } { t.Run(tt.name, func(t *testing.T) { objects := []client.Object{ @@ -449,6 +515,9 @@ func TestMonitor(t *testing.T) { }, } + _ctx, _cancel = context.WithCancel(t.Context()) + defer _cancel() + _, log := testlog.New() controller := gomock.NewController(t) m := mock_metrics.NewMockEmitter(controller) @@ -475,6 +544,7 @@ func TestMonitor(t *testing.T) { ocpclientset: ocpclientset, m: m, queryLimit: 1, + parallelism: 1, } if tt.collectors != nil { @@ -493,7 +563,7 @@ func TestMonitor(t *testing.T) { m.EXPECT().EmitFloat("monitor.cluster.duration", gomock.Any(), gomock.Any()).Times(1) } - err := mon.Monitor(ctx) + err := mon.Monitor(_ctx) utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors) }) } From 0d95d9796d1422b7898d347306c2cc65c20a24b6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 27 Mar 2026 11:40:43 +1100 Subject: [PATCH 2/5] update monitor to use the proper fake metrics for testing --- .../cluster/arooperatorheartbeat_test.go | 31 +- pkg/monitor/cluster/cluster.go | 10 +- pkg/monitor/cluster/cluster_test.go | 300 +++++++++--------- 3 files changed, 169 insertions(+), 172 deletions(-) diff --git a/pkg/monitor/cluster/arooperatorheartbeat_test.go b/pkg/monitor/cluster/arooperatorheartbeat_test.go index b6aae3968ba..1b356e0a54d 100644 --- a/pkg/monitor/cluster/arooperatorheartbeat_test.go +++ b/pkg/monitor/cluster/arooperatorheartbeat_test.go @@ -8,8 +8,6 @@ import ( "errors" "testing" - "go.uber.org/mock/gomock" - appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -17,10 +15,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/Azure/ARO-RP/pkg/util/clienthelper" - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" "github.com/Azure/ARO-RP/pkg/util/pointerutils" testclienthelper "github.com/Azure/ARO-RP/test/util/clienthelper" testlog "github.com/Azure/ARO-RP/test/util/log" + fakemetrics "github.com/Azure/ARO-RP/test/util/metrics" ) func TestEmitAroOperatorHeartbeat(t *testing.T) { @@ -29,7 +27,7 @@ func TestEmitAroOperatorHeartbeat(t *testing.T) { for _, tt := range []struct { name string objects []client.Object - expectedGauges []expectedMetric + expectedGauges []fakemetrics.MetricsAssertion[int64] hooks func(hc *testclienthelper.HookingClient) wantErr error }{ @@ -90,16 +88,16 @@ func TestEmitAroOperatorHeartbeat(t *testing.T) { }, }, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "arooperator.heartbeat", - value: int64(0), - labels: map[string]string{"name": "aro-operator-master"}, + MetricName: "arooperator.heartbeat", + Value: int64(0), + Dimensions: map[string]string{"name": "aro-operator-master"}, }, { - name: "arooperator.heartbeat", - value: int64(1), - labels: map[string]string{"name": "aro-operator-worker"}, + MetricName: "arooperator.heartbeat", + Value: int64(1), + Dimensions: map[string]string{"name": "aro-operator-worker"}, }, }, }, @@ -114,9 +112,7 @@ func TestEmitAroOperatorHeartbeat(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - controller := gomock.NewController(t) - m := mock_metrics.NewMockEmitter(controller) - + m := fakemetrics.NewFakeMetricsEmitter(t) _, log := testlog.New() client := testclienthelper.NewHookingClient(fake. NewClientBuilder(). @@ -135,16 +131,15 @@ func TestEmitAroOperatorHeartbeat(t *testing.T) { tt.hooks(client) } - for _, gauge := range tt.expectedGauges { - m.EXPECT().EmitGauge(gauge.name, gauge.value, gauge.labels).Times(1) - } - err := mon.emitAroOperatorHeartbeat(ctx) if tt.wantErr != nil && !errors.Is(err, tt.wantErr) { t.Fatalf("Wanted %v, got %v", err, tt.wantErr) } else if tt.wantErr == nil && err != nil { t.Fatal(err) } + + m.AssertGauges(tt.expectedGauges...) + m.AssertFloats() }) } } diff --git a/pkg/monitor/cluster/cluster.go b/pkg/monitor/cluster/cluster.go index bb06a00901a..e0c9b61b2b8 100644 --- a/pkg/monitor/cluster/cluster.go +++ b/pkg/monitor/cluster/cluster.go @@ -60,6 +60,7 @@ type Monitor struct { env env.Interface rawClient rest.Interface tenantID string + now func() time.Time ocpclientset clienthelper.Interface @@ -155,6 +156,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu operatorcli: operatorcli, arocli: arocli, rawClient: rawClient, + now: time.Now, env: env, tenantID: tenantID, @@ -208,7 +210,7 @@ func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()} } - innerNow := time.Now() + innerNow := mon.now() mon.log.Debugf("running %s", collectorName) // If the collector panics we should return the error (so that it bubbles @@ -227,7 +229,7 @@ func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) mon.emitMonitorCollectorError(collectorName) return &failureToRunClusterCollector{collectorName: collectorName, inner: innerErr} } else { - timeToComplete := time.Since(innerNow).Seconds() + timeToComplete := mon.now().Sub(innerNow).Seconds() mon.emitMonitorCollectionTiming(collectorName, timeToComplete) mon.log.Debugf("successfully ran cluster collector '%s' in %2f sec", collectorName, timeToComplete) } @@ -245,7 +247,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { errs := []error{} - now := time.Now() + monitoringStartTime := mon.now() mon.log.Debug("monitoring") if mon.hourlyRun { @@ -317,7 +319,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { // emit a metric with how long we took when we have no errors if len(errs) == 0 && ctx.Err() == nil { - mon.emitFloat("monitor.cluster.duration", time.Since(now).Seconds(), map[string]string{}) + mon.emitFloat("monitor.cluster.duration", mon.now().Sub(monitoringStartTime).Seconds(), map[string]string{}) } return errors.Join(errs...) diff --git a/pkg/monitor/cluster/cluster_test.go b/pkg/monitor/cluster/cluster_test.go index 4a14c768b1c..87f2b0d7a3f 100644 --- a/pkg/monitor/cluster/cluster_test.go +++ b/pkg/monitor/cluster/cluster_test.go @@ -8,8 +8,7 @@ import ( "errors" "net/http" "testing" - - "go.uber.org/mock/gomock" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -23,18 +22,12 @@ import ( "github.com/Azure/ARO-RP/pkg/operator/clientset/versioned/scheme" "github.com/Azure/ARO-RP/pkg/util/clienthelper" - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" testclienthelper "github.com/Azure/ARO-RP/test/util/clienthelper" utilerror "github.com/Azure/ARO-RP/test/util/error" testlog "github.com/Azure/ARO-RP/test/util/log" + fakemetrics "github.com/Azure/ARO-RP/test/util/metrics" ) -type expectedMetric struct { - name string - value any - labels map[string]string -} - func TestMonitor(t *testing.T) { var _ctx context.Context var _cancel context.CancelFunc @@ -47,8 +40,8 @@ func TestMonitor(t *testing.T) { hooks func(*testclienthelper.HookingClient) collectors func(*Monitor) []collectorFunc healthzCall func(*http.Request) (*http.Response, error) - expectedGauges []expectedMetric - expectedFloats []expectedMetric + expectedGauges []fakemetrics.MetricsAssertion[int64] + expectedFloats []fakemetrics.MetricsAssertion[float64] }{ { name: "happy path", @@ -56,18 +49,18 @@ func TestMonitor(t *testing.T) { collectors: func(m *Monitor) []collectorFunc { return []collectorFunc{m.emitReplicasetStatuses, m.emitDaemonsetStatuses} }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "replicaset.statuses", - value: int64(1), - labels: map[string]string{ + MetricName: "replicaset.statuses", + Value: int64(1), + Dimensions: map[string]string{ "availableReplicas": "1", "name": "name1", "namespace": "openshift", @@ -75,9 +68,9 @@ func TestMonitor(t *testing.T) { }, }, { - name: "daemonset.statuses", - value: int64(1), - labels: map[string]string{ + MetricName: "daemonset.statuses", + Value: int64(1), + Dimensions: map[string]string{ "desiredNumberScheduled": "2", "numberAvailable": "1", "namespace": "openshift", @@ -85,39 +78,39 @@ func TestMonitor(t *testing.T) { }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "prefetchClusterVersion", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "fetchManagedNamespaces", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitReplicasetStatuses", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitDaemonsetStatuses", }, }, @@ -139,34 +132,34 @@ func TestMonitor(t *testing.T) { &failureToRunClusterCollector{collectorName: "fetchManagedNamespaces"}, errListNamespaces, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "fetchManagedNamespaces", }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "prefetchClusterVersion", }, }, @@ -192,41 +185,41 @@ func TestMonitor(t *testing.T) { errListReplicaSets, innerFailure, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "emitReplicasetStatuses", }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "prefetchClusterVersion", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "fetchManagedNamespaces", }, }, @@ -251,18 +244,18 @@ func TestMonitor(t *testing.T) { &failureToRunClusterCollector{collectorName: "emitReplicasetStatuses"}, &collectorPanic{panicValue: innerFailure}, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "daemonset.statuses", - value: int64(1), - labels: map[string]string{ + MetricName: "daemonset.statuses", + Value: int64(1), + Dimensions: map[string]string{ "desiredNumberScheduled": "2", "numberAvailable": "1", "namespace": "openshift", @@ -270,39 +263,39 @@ func TestMonitor(t *testing.T) { }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "emitReplicasetStatuses", }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "prefetchClusterVersion", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "fetchManagedNamespaces", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitDaemonsetStatuses", }, }, @@ -317,37 +310,37 @@ func TestMonitor(t *testing.T) { errAPIServerHealthzFailure, errAPIServerPingFailure, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "500", }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "emitAPIServerPingCode", }, }, { - name: "apiserver.healthz.ping.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.ping.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "500", }, }, }, - expectedFloats: []expectedMetric{}, + expectedFloats: []fakemetrics.MetricsAssertion[float64]{}, }, { name: "api failure, ping succeeds", @@ -360,34 +353,34 @@ func TestMonitor(t *testing.T) { expectedErrors: []error{ errAPIServerHealthzFailure, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "500", }, }, { - name: "apiserver.healthz.ping.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.ping.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "monitor.cluster.collector.error", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.error", + Value: int64(1), + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerPingCode", }, }, @@ -411,48 +404,48 @@ func TestMonitor(t *testing.T) { &failureToRunClusterCollector{collectorName: "2"}, context.Canceled, }, - expectedGauges: []expectedMetric{ + expectedGauges: []fakemetrics.MetricsAssertion[int64]{ { - name: "apiserver.healthz.code", - value: int64(1), - labels: map[string]string{ + MetricName: "apiserver.healthz.code", + Value: int64(1), + Dimensions: map[string]string{ "code": "200", }, }, { - name: "monitor.cluster.collector.skipped", - value: int64(1), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.skipped", + Value: int64(1), + Dimensions: map[string]string{ "collector": "2", }, }, }, - expectedFloats: []expectedMetric{ + expectedFloats: []fakemetrics.MetricsAssertion[float64]{ { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "emitAPIServerHealthzCode", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "prefetchClusterVersion", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "fetchManagedNamespaces", }, }, { - name: "monitor.cluster.collector.duration", - value: gomock.Any(), - labels: map[string]string{ + MetricName: "monitor.cluster.collector.duration", + Value: 1.0, + Dimensions: map[string]string{ "collector": "1", }, }, @@ -519,8 +512,7 @@ func TestMonitor(t *testing.T) { defer _cancel() _, log := testlog.New() - controller := gomock.NewController(t) - m := mock_metrics.NewMockEmitter(controller) + m := fakemetrics.NewFakeMetricsEmitter(t) // for healthz fakeRawClient := &restfake.RESTClient{ @@ -538,10 +530,17 @@ func TestMonitor(t *testing.T) { tt.hooks(client) } + currTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + now := func() time.Time { + currTime = currTime.Add(1 * time.Second) + return currTime + } + mon := &Monitor{ log: log, rawClient: fakeRawClient, ocpclientset: ocpclientset, + now: now, m: m, queryLimit: 1, parallelism: 1, @@ -551,20 +550,21 @@ func TestMonitor(t *testing.T) { mon.collectors = tt.collectors(mon) } - for _, gauge := range tt.expectedGauges { - m.EXPECT().EmitGauge(gauge.name, gauge.value, gauge.labels).Times(1) - } - for _, gauge := range tt.expectedFloats { - m.EXPECT().EmitFloat(gauge.name, gauge.value, gauge.labels).Times(1) - } + err := mon.Monitor(_ctx) + utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors) // we only emit duration when no errors + f := tt.expectedFloats if len(tt.expectedErrors) == 0 { - m.EXPECT().EmitFloat("monitor.cluster.duration", gomock.Any(), gomock.Any()).Times(1) + f = append(tt.expectedFloats, fakemetrics.MetricsAssertion[float64]{ + MetricName: "monitor.cluster.duration", + Value: currTime.Sub(time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC)).Seconds(), + Dimensions: map[string]string{}, + }) } - err := mon.Monitor(_ctx) - utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors) + m.AssertFloats(f...) + m.AssertGauges(tt.expectedGauges...) }) } } From 5074ed8698e8c9886fd523caa88cc1a98e5eeaed Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 27 Mar 2026 11:52:07 +1100 Subject: [PATCH 3/5] add a test for skipping when the context is cancelled to begin with --- pkg/monitor/cluster/cluster_test.go | 65 +++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/pkg/monitor/cluster/cluster_test.go b/pkg/monitor/cluster/cluster_test.go index 87f2b0d7a3f..b290284887d 100644 --- a/pkg/monitor/cluster/cluster_test.go +++ b/pkg/monitor/cluster/cluster_test.go @@ -568,3 +568,68 @@ func TestMonitor(t *testing.T) { }) } } + +func TestMonitorAlreadyCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + _, log := testlog.New() + m := fakemetrics.NewFakeMetricsEmitter(t) + + // for healthz + fakeRawClient := &restfake.RESTClient{ + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + Client: restfake.CreateHTTPClient( + func(r *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: http.StatusOK}, nil + }), + } + + client := testclienthelper.NewHookingClient(fake. + NewClientBuilder(). + Build()) + ocpclientset := clienthelper.NewWithClient(log, client) + + currTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + now := func() time.Time { + currTime = currTime.Add(1 * time.Second) + return currTime + } + + mon := &Monitor{ + log: log, + rawClient: fakeRawClient, + ocpclientset: ocpclientset, + m: m, + queryLimit: 1, + now: now, + } + + mon.collectors = []collectorFunc{mon.emitDaemonsetStatuses} + + // Cancel context before it hits the monitor + cancel() + + err := mon.Monitor(ctx) + utilerror.AssertErrorMatchesAll(t, err, []error{ + &failureToRunClusterCollector{collectorName: "emitAPIServerHealthzCode"}, + &failureToRunClusterCollector{collectorName: "emitAPIServerPingCode"}, + context.Canceled, + }) + + m.AssertFloats() + m.AssertGauges([]fakemetrics.MetricsAssertion[int64]{ + { + MetricName: "monitor.cluster.collector.skipped", + Value: 1, + Dimensions: map[string]string{ + "collector": "emitAPIServerPingCode", + }, + }, + { + MetricName: "monitor.cluster.collector.skipped", + Value: 1, + Dimensions: map[string]string{ + "collector": "emitAPIServerHealthzCode", + }, + }, + }...) +} From 42963cd848548f7c2c32789eb451d2b35546d3a4 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 27 Mar 2026 11:53:33 +1100 Subject: [PATCH 4/5] fix nit --- pkg/monitor/cluster/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/monitor/cluster/cluster.go b/pkg/monitor/cluster/cluster.go index e0c9b61b2b8..459c0ced3f0 100644 --- a/pkg/monitor/cluster/cluster.go +++ b/pkg/monitor/cluster/cluster.go @@ -205,7 +205,7 @@ func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) // Don't run collectors if we have already timed out if ctx.Err() != nil { - mon.log.Debugf("skipping %s because %s:", collectorName, ctx.Err()) + mon.log.Debugf("skipping %s because %s", collectorName, ctx.Err()) mon.emitMonitorCollectorSkipped(collectorName) return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()} } From 72654e0f47a3591b960712c99a58065cfca618b7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 27 Mar 2026 12:24:35 +1100 Subject: [PATCH 5/5] fix flake --- pkg/mimo/scheduler/clustercache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mimo/scheduler/clustercache.go b/pkg/mimo/scheduler/clustercache.go index 3d86fb0a408..1ab1f1223a9 100644 --- a/pkg/mimo/scheduler/clustercache.go +++ b/pkg/mimo/scheduler/clustercache.go @@ -105,7 +105,7 @@ func (c *openShiftClusterCache) OnAllPendingProcessed() { old := c.lastChangefeed.Swap(time.Now()) // we've done one rotation, unlock the waitgroup if old == nil { - c.initialPopulationWaitGroup.Done() + defer c.initialPopulationWaitGroup.Done() } c.m.EmitGauge("changefeed.caches.size", int64(c.clusters.Size()), map[string]string{ "name": "OpenShiftClusterDocument",