Skip to content

Commit e75e698

Browse files
committed
do not continue spawning goroutines if we've already timed out, and emit metrics when we have timed out
1 parent a8959fc commit e75e698

File tree

2 files changed

+101
-7
lines changed

2 files changed

+101
-7
lines changed

pkg/monitor/cluster/cluster.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ type Monitor struct {
7272

7373
// Limit for items per pagination query
7474
queryLimit int
75+
// Limit for goroutines per cluster Monitor instance
76+
parallelism int
7577
}
7678

7779
func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, env env.Interface, tenantID string, m metrics.Emitter, hourlyRun bool) (monitoring.Monitor, error) {
@@ -160,6 +162,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu
160162
ocpclientset: clienthelper.NewWithClient(log, ocpclientset),
161163
namespacesToMonitor: []string{},
162164
queryLimit: 50,
165+
parallelism: MONITOR_GOROUTINES_PER_CLUSTER,
163166
}
164167
mon.collectors = []collectorFunc{
165168
mon.emitAroOperatorHeartbeat,
@@ -196,8 +199,16 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu
196199
}
197200

198201
func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) (err error) {
199-
innerNow := time.Now()
200202
collectorName := steps.ShortName(f)
203+
204+
// Don't run collectors if we have already timed out
205+
if ctx.Err() != nil {
206+
mon.log.Debugf("skipping %s because %s:", collectorName, ctx.Err())
207+
mon.emitMonitorCollectorSkipped(collectorName)
208+
return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()}
209+
}
210+
211+
innerNow := time.Now()
201212
mon.log.Debugf("running %s", collectorName)
202213

203214
// If the collector panics we should return the error (so that it bubbles
@@ -272,10 +283,10 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) {
272283
return errors.Join(errs...)
273284
}
274285

275-
// Run up to MONITOR_GOROUTINES_PER_CLUSTER goroutines for collecting
276-
// metrics
286+
// Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER)
287+
// goroutines for collecting metrics
277288
wg := new(errgroup.Group)
278-
wg.SetLimit(MONITOR_GOROUTINES_PER_CLUSTER)
289+
wg.SetLimit(mon.parallelism)
279290

280291
// Create a channel capable of buffering one error from every collector
281292
errChan := make(chan error, len(mon.collectors))
@@ -305,7 +316,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) {
305316
}
306317

307318
// emit a metric with how long we took when we have no errors
308-
if len(errs) == 0 {
319+
if len(errs) == 0 && ctx.Err() == nil {
309320
mon.emitFloat("monitor.cluster.duration", time.Since(now).Seconds(), map[string]string{})
310321
}
311322

@@ -316,6 +327,10 @@ func (mon *Monitor) emitMonitorCollectorError(collectorName string) {
316327
emitter.EmitGauge(mon.m, "monitor.cluster.collector.error", 1, mon.dims, map[string]string{"collector": collectorName})
317328
}
318329

330+
func (mon *Monitor) emitMonitorCollectorSkipped(collectorName string) {
331+
emitter.EmitGauge(mon.m, "monitor.cluster.collector.skipped", 1, mon.dims, map[string]string{"collector": collectorName})
332+
}
333+
319334
func (mon *Monitor) emitMonitorCollectionTiming(collectorName string, duration float64) {
320335
emitter.EmitFloat(mon.m, "monitor.cluster.collector.duration", duration, mon.dims, map[string]string{"collector": collectorName})
321336
}

pkg/monitor/cluster/cluster_test.go

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ type expectedMetric struct {
3636
}
3737

3838
func TestMonitor(t *testing.T) {
39-
ctx := context.Background()
39+
var _ctx context.Context
40+
var _cancel context.CancelFunc
4041

4142
innerFailure := errors.New("failure inside")
4243

@@ -392,6 +393,80 @@ func TestMonitor(t *testing.T) {
392393
},
393394
},
394395
},
396+
{
397+
name: "timeout during collector means other collectors are skipped",
398+
healthzCall: func(r *http.Request) (*http.Response, error) { return &http.Response{StatusCode: http.StatusOK}, nil },
399+
collectors: func(m *Monitor) []collectorFunc {
400+
return []collectorFunc{
401+
func(ctx context.Context) error {
402+
_cancel()
403+
return nil
404+
},
405+
func(ctx context.Context) error {
406+
return nil
407+
},
408+
}
409+
},
410+
hooks: func(hc *testclienthelper.HookingClient) {
411+
hc.WithPreListHook(func(obj client.ObjectList, opts *client.ListOptions) error {
412+
_, ok := obj.(*appsv1.ReplicaSetList)
413+
if ok {
414+
panic(innerFailure)
415+
}
416+
return nil
417+
})
418+
},
419+
expectedErrors: []error{
420+
&failureToRunClusterCollector{collectorName: "2"},
421+
context.Canceled,
422+
},
423+
expectedGauges: []expectedMetric{
424+
{
425+
name: "apiserver.healthz.code",
426+
value: int64(1),
427+
labels: map[string]string{
428+
"code": "200",
429+
},
430+
},
431+
{
432+
name: "monitor.cluster.collector.skipped",
433+
value: int64(1),
434+
labels: map[string]string{
435+
"collector": "2",
436+
},
437+
},
438+
},
439+
expectedFloats: []expectedMetric{
440+
{
441+
name: "monitor.cluster.collector.duration",
442+
value: gomock.Any(),
443+
labels: map[string]string{
444+
"collector": "emitAPIServerHealthzCode",
445+
},
446+
},
447+
{
448+
name: "monitor.cluster.collector.duration",
449+
value: gomock.Any(),
450+
labels: map[string]string{
451+
"collector": "prefetchClusterVersion",
452+
},
453+
},
454+
{
455+
name: "monitor.cluster.collector.duration",
456+
value: gomock.Any(),
457+
labels: map[string]string{
458+
"collector": "fetchManagedNamespaces",
459+
},
460+
},
461+
{
462+
name: "monitor.cluster.collector.duration",
463+
value: gomock.Any(),
464+
labels: map[string]string{
465+
"collector": "1",
466+
},
467+
},
468+
},
469+
},
395470
} {
396471
t.Run(tt.name, func(t *testing.T) {
397472
objects := []client.Object{
@@ -449,6 +524,9 @@ func TestMonitor(t *testing.T) {
449524
},
450525
}
451526

527+
_ctx, _cancel = context.WithCancel(t.Context())
528+
defer _cancel()
529+
452530
_, log := testlog.New()
453531
controller := gomock.NewController(t)
454532
m := mock_metrics.NewMockEmitter(controller)
@@ -475,6 +553,7 @@ func TestMonitor(t *testing.T) {
475553
ocpclientset: ocpclientset,
476554
m: m,
477555
queryLimit: 1,
556+
parallelism: 1,
478557
}
479558

480559
if tt.collectors != nil {
@@ -493,7 +572,7 @@ func TestMonitor(t *testing.T) {
493572
m.EXPECT().EmitFloat("monitor.cluster.duration", gomock.Any(), gomock.Any()).Times(1)
494573
}
495574

496-
err := mon.Monitor(ctx)
575+
err := mon.Monitor(_ctx)
497576
utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors)
498577
})
499578
}

0 commit comments

Comments
 (0)