Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions pkg/monitor/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Monitor struct {

// Limit for items per pagination query
queryLimit int
// Limit for goroutines per cluster Monitor instance
parallelism int
}

func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, env env.Interface, tenantID string, m metrics.Emitter, hourlyRun bool) (monitoring.Monitor, error) {
Expand Down Expand Up @@ -160,6 +162,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu
ocpclientset: clienthelper.NewWithClient(log, ocpclientset),
namespacesToMonitor: []string{},
queryLimit: 50,
parallelism: MONITOR_GOROUTINES_PER_CLUSTER,
}
mon.collectors = []collectorFunc{
mon.emitAroOperatorHeartbeat,
Expand Down Expand Up @@ -196,8 +199,16 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu
}

func (mon *Monitor) timeCall(ctx context.Context, f func(context.Context) error) (err error) {
innerNow := time.Now()
collectorName := steps.ShortName(f)

// Don't run collectors if we have already timed out
if ctx.Err() != nil {
mon.log.Debugf("skipping %s because %s:", collectorName, ctx.Err())
mon.emitMonitorCollectorSkipped(collectorName)
return &failureToRunClusterCollector{collectorName: collectorName, inner: ctx.Err()}
}

innerNow := time.Now()
mon.log.Debugf("running %s", collectorName)

// If the collector panics we should return the error (so that it bubbles
Expand Down Expand Up @@ -272,10 +283,10 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) {
return errors.Join(errs...)
}

// Run up to MONITOR_GOROUTINES_PER_CLUSTER goroutines for collecting
// metrics
// Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER)
// goroutines for collecting metrics
wg := new(errgroup.Group)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this misleading, errorgroup and waitgroup are different things that should probably named and managed differently, WDYT?

Also I have the impression that here we're losing the critical opportunity the pass down the current context to the spawned goroutines to have a safe and deterministic behavior when something happens to the parent context or to ensure we don't leave orphans around the globe.
The way I would address this, could be similar to what I've used in a previous commit by using errgroup.WithContext(ctx)

wg.SetLimit(MONITOR_GOROUTINES_PER_CLUSTER)
wg.SetLimit(mon.parallelism)

// Create a channel capable of buffering one error from every collector
errChan := make(chan error, len(mon.collectors))
Expand Down Expand Up @@ -305,7 +316,7 @@ func (mon *Monitor) Monitor(ctx context.Context) (_err error) {
}

// emit a metric with how long we took when we have no errors
if len(errs) == 0 {
if len(errs) == 0 && ctx.Err() == nil {
mon.emitFloat("monitor.cluster.duration", time.Since(now).Seconds(), map[string]string{})
}

Expand All @@ -316,6 +327,10 @@ func (mon *Monitor) emitMonitorCollectorError(collectorName string) {
emitter.EmitGauge(mon.m, "monitor.cluster.collector.error", 1, mon.dims, map[string]string{"collector": collectorName})
}

func (mon *Monitor) emitMonitorCollectorSkipped(collectorName string) {
emitter.EmitGauge(mon.m, "monitor.cluster.collector.skipped", 1, mon.dims, map[string]string{"collector": collectorName})
}

func (mon *Monitor) emitMonitorCollectionTiming(collectorName string, duration float64) {
emitter.EmitFloat(mon.m, "monitor.cluster.collector.duration", duration, mon.dims, map[string]string{"collector": collectorName})
}
Expand Down
74 changes: 72 additions & 2 deletions pkg/monitor/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type expectedMetric struct {
}

func TestMonitor(t *testing.T) {
ctx := context.Background()
var _ctx context.Context
var _cancel context.CancelFunc

innerFailure := errors.New("failure inside")

Expand Down Expand Up @@ -392,6 +393,71 @@ func TestMonitor(t *testing.T) {
},
},
},
{
name: "timeout during collector means other collectors are skipped",
healthzCall: func(r *http.Request) (*http.Response, error) { return &http.Response{StatusCode: http.StatusOK}, nil },
collectors: func(m *Monitor) []collectorFunc {
return []collectorFunc{
func(ctx context.Context) error {
_cancel()
return nil
},
func(ctx context.Context) error {
return nil
},
}
},
expectedErrors: []error{
&failureToRunClusterCollector{collectorName: "2"},
context.Canceled,
},
expectedGauges: []expectedMetric{
{
name: "apiserver.healthz.code",
value: int64(1),
labels: map[string]string{
"code": "200",
},
},
{
name: "monitor.cluster.collector.skipped",
value: int64(1),
labels: map[string]string{
"collector": "2",
},
},
},
expectedFloats: []expectedMetric{
{
name: "monitor.cluster.collector.duration",
value: gomock.Any(),
labels: map[string]string{
"collector": "emitAPIServerHealthzCode",
},
},
{
name: "monitor.cluster.collector.duration",
value: gomock.Any(),
labels: map[string]string{
"collector": "prefetchClusterVersion",
},
},
{
name: "monitor.cluster.collector.duration",
value: gomock.Any(),
labels: map[string]string{
"collector": "fetchManagedNamespaces",
},
},
{
name: "monitor.cluster.collector.duration",
value: gomock.Any(),
labels: map[string]string{
"collector": "1",
},
},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
objects := []client.Object{
Expand Down Expand Up @@ -449,6 +515,9 @@ func TestMonitor(t *testing.T) {
},
}

_ctx, _cancel = context.WithCancel(t.Context())
defer _cancel()

_, log := testlog.New()
controller := gomock.NewController(t)
m := mock_metrics.NewMockEmitter(controller)
Expand All @@ -475,6 +544,7 @@ func TestMonitor(t *testing.T) {
ocpclientset: ocpclientset,
m: m,
queryLimit: 1,
parallelism: 1,
}

if tt.collectors != nil {
Expand All @@ -493,7 +563,7 @@ func TestMonitor(t *testing.T) {
m.EXPECT().EmitFloat("monitor.cluster.duration", gomock.Any(), gomock.Any()).Times(1)
}

err := mon.Monitor(ctx)
err := mon.Monitor(_ctx)
utilerror.AssertErrorMatchesAll(t, err, tt.expectedErrors)
})
}
Expand Down
Loading