diff --git a/pkg/monitor/cluster/cluster.go b/pkg/monitor/cluster/cluster.go index 0837b64d08d..bb06a00901a 100644 --- a/pkg/monitor/cluster/cluster.go +++ b/pkg/monitor/cluster/cluster.go @@ -72,6 +72,8 @@ type Monitor struct { // Limit for items per pagination query queryLimit int + // Limit for goroutines per cluster Monitor instance + parallelism int } func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, env env.Interface, tenantID string, m metrics.Emitter, hourlyRun bool) (monitoring.Monitor, error) { @@ -160,6 +162,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu ocpclientset: clienthelper.NewWithClient(log, ocpclientset), namespacesToMonitor: []string{}, queryLimit: 50, + parallelism: MONITOR_GOROUTINES_PER_CLUSTER, } mon.collectors = []collectorFunc{ mon.emitAroOperatorHeartbeat, @@ -196,8 +199,16 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu } func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) (err error) { - innerNow := time.Now() collectorName := steps.ShortName(f) + + // Don't run collectors if we have already timed out + if ctx.Err() != nil { + mon.log.Debugf("skipping %s because %s:", collectorName, ctx.Err()) + mon.emitMonitorCollectorSkipped(collectorName) + return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()} + } + + innerNow := time.Now() mon.log.Debugf("running %s", collectorName) // If the collector panics we should return the error (so that it bubbles @@ -272,10 +283,10 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { return errors.Join(errs...) } - // Run up to MONITOR_GOROUTINES_PER_CLUSTER goroutines for collecting - // metrics + // Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER) + // goroutines for collecting metrics wg := new(errgroup.Group) - wg.SetLimit(MONITOR_GOROUTINES_PER_CLUSTER) + wg.SetLimit(mon.parallelism) // Create a channel capable of buffering one error from every collector errChan := make(chan error, len(mon.collectors)) @@ -305,7 +316,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) { } // emit a metric with how long we took when we have no errors - if len(errs) == 0 { + if len(errs) == 0 && ctx.Err() == nil { mon.emitFloat("monitor.cluster.duration", time.Since(now).Seconds(), map[string]string{}) } @@ -316,6 +327,10 @@ func (mon *Monitor) emitMonitorCollectorError(collectorName string) { emitter.EmitGauge(mon.m, "monitor.cluster.collector.error", 1, mon.dims, map[string]string{"collector": collectorName}) } +func (mon *Monitor) emitMonitorCollectorSkipped(collectorName string) { + emitter.EmitGauge(mon.m, "monitor.cluster.collector.skipped", 1, mon.dims, map[string]string{"collector": collectorName}) +} + func (mon *Monitor) emitMonitorCollectionTiming(collectorName string, duration float64) { emitter.EmitFloat(mon.m, "monitor.cluster.collector.duration", duration, mon.dims, map[string]string{"collector": collectorName}) } diff --git a/pkg/monitor/cluster/cluster_test.go b/pkg/monitor/cluster/cluster_test.go index ea7c3c0139c..4a14c768b1c 100644 --- a/pkg/monitor/cluster/cluster_test.go +++ b/pkg/monitor/cluster/cluster_test.go @@ -36,7 +36,8 @@ type expectedMetric struct { } func TestMonitor(t *testing.T) { - ctx := context.Background() + var _ctx context.Context + var _cancel context.CancelFunc innerFailure := errors.New("failure inside") @@ -392,6 +393,71 @@ func TestMonitor(t *testing.T) { }, }, }, + { + name: "timeout during collector means other collectors are skipped", + healthzCall: func(r *http.Request) (*http.Response, error) { return &http.Response{StatusCode: http.StatusOK}, nil }, + collectors: func(m *Monitor) []collectorFunc { + return []collectorFunc{ + func(ctx context.Context) error { + _cancel() + return nil + }, + func(ctx context.Context) error { + return nil + }, + } + }, + expectedErrors: []error{ + &failureToRunClusterCollector{collectorName: "2"}, + context.Canceled, + }, + expectedGauges: []expectedMetric{ + { + name: "apiserver.healthz.code", + value: int64(1), + labels: map[string]string{ + "code": "200", + }, + }, + { + name: "monitor.cluster.collector.skipped", + value: int64(1), + labels: map[string]string{ + "collector": "2", + }, + }, + }, + expectedFloats: []expectedMetric{ + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "emitAPIServerHealthzCode", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "prefetchClusterVersion", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "fetchManagedNamespaces", + }, + }, + { + name: "monitor.cluster.collector.duration", + value: gomock.Any(), + labels: map[string]string{ + "collector": "1", + }, + }, + }, + }, } { t.Run(tt.name, func(t *testing.T) { objects := []client.Object{ @@ -449,6 +515,9 @@ func TestMonitor(t *testing.T) { }, } + _ctx, _cancel = context.WithCancel(t.Context()) + defer _cancel() + _, log := testlog.New() controller := gomock.NewController(t) m := mock_metrics.NewMockEmitter(controller) @@ -475,6 +544,7 @@ func TestMonitor(t *testing.T) { ocpclientset: ocpclientset, m: m, queryLimit: 1, + parallelism: 1, } if tt.collectors != nil { @@ -493,7 +563,7 @@ func TestMonitor(t *testing.T) { m.EXPECT().EmitFloat("monitor.cluster.duration", gomock.Any(), gomock.Any()).Times(1) } - err := mon.Monitor(ctx) + err := mon.Monitor(_ctx) utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors) }) }