Skip to content

Commit 8b14e41

Browse files
authored
Surface partial scraper errors from the metrics receiver. (#32)
Handle partial errors consistently: if we fail to fetch or parse a metric, log a warning, and return a partial error at the end of the scrape. This allows us to return all available metrics while letting the user discover scrape errors through the collector's build-in metrics. We also drop our existing custom metrics for scraper errors now that we're exposing partial errors; the existing scraper_errored_metric_points_total metric, along with warnings in the logs, are now sufficient for debugging, and more idiomatic.
1 parent 959f88c commit 8b14e41

File tree

1 file changed

+55
-41
lines changed

1 file changed

+55
-41
lines changed

receiver/oxidemetricsreceiver/scraper.go

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
"go.opentelemetry.io/collector/pdata/pcommon"
1313
"go.opentelemetry.io/collector/pdata/pmetric"
14+
"go.opentelemetry.io/collector/scraper/scrapererror"
1415
"go.opentelemetry.io/otel/attribute"
1516
"go.opentelemetry.io/otel/metric"
1617
"go.uber.org/zap"
@@ -28,7 +29,6 @@ type oxideScraper struct {
2829
apiRequestDuration metric.Float64Gauge
2930
scrapeCount metric.Int64Counter
3031
scrapeDuration metric.Float64Gauge
31-
metricParseErrors metric.Int64Counter
3232
}
3333

3434
func newOxideScraper(
@@ -105,15 +105,6 @@ func (s *oxideScraper) Start(ctx context.Context, _ component.Host) error {
105105
return fmt.Errorf("failed to create scrapeDuration gauge: %w", err)
106106
}
107107

108-
s.metricParseErrors, err = meter.Int64Counter(
109-
"oxide_receiver.metric.parse_errors",
110-
metric.WithDescription("Number of errors encountered while parsing individual metrics"),
111-
metric.WithUnit("{error}"),
112-
)
113-
if err != nil {
114-
return fmt.Errorf("failed to create metricParseErrors counter: %w", err)
115-
}
116-
117108
return nil
118109
}
119110

@@ -127,10 +118,14 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
127118
var group errgroup.Group
128119
group.SetLimit(s.cfg.ScrapeConcurrency)
129120

130-
startTime := time.Now()
131-
results := make([]*oxide.OxqlQueryResult, len(s.metricNames))
121+
type queryResult struct {
122+
response *oxide.OxqlQueryResult
123+
latency time.Duration
124+
err error
125+
}
126+
results := make([]queryResult, len(s.metricNames))
132127

133-
latencies := make([]time.Duration, len(s.metricNames))
128+
startTime := time.Now()
134129

135130
for idx, metricName := range s.metricNames {
136131
query := fmt.Sprintf(
@@ -139,36 +134,58 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
139134
s.cfg.QueryLookback,
140135
)
141136
group.Go(func() error {
142-
goroStartTime := time.Now()
137+
queryStartTime := time.Now()
143138
result, err := s.client.SystemTimeseriesQuery(ctx, oxide.SystemTimeseriesQueryParams{
144139
Body: &oxide.TimeseriesQuery{
145140
Query: query,
146141
},
147142
})
148-
elapsed := time.Since(goroStartTime)
149-
latencies[idx] = elapsed
143+
elapsed := time.Since(queryStartTime)
144+
results[idx] = queryResult{
145+
response: result,
146+
latency: elapsed,
147+
err: err,
148+
}
150149
s.logger.Info(
151150
"scrape query finished",
152151
zap.String("metric", metricName),
153152
zap.String("query", query),
154153
zap.Float64("latency", elapsed.Seconds()),
155154
)
156155
if err != nil {
157-
return err
156+
s.logger.Warn(
157+
"failed to query metric",
158+
zap.String("metric", metricName),
159+
zap.Error(err),
160+
)
161+
} else {
162+
s.apiRequestDuration.Record(
163+
ctx,
164+
elapsed.Seconds(),
165+
metric.WithAttributes(attribute.String("request_name", metricName)),
166+
)
158167
}
159-
results[idx] = result
168+
160169
return nil
161170
})
162171
}
163-
if err := group.Wait(); err != nil {
164-
s.scrapeCount.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "failure")))
165-
return metrics, err
166-
}
172+
// We don't check the return value of Wait(). Instead, we accumulate error counts in the
173+
// goroutine, and return a PartialScrapeError below if we observe >0 errors. Errors will be
174+
// surfaced to users via the `scraper_errored_metric_points_total` metric, and collector logs
175+
// contain the full details of failed scrapes.
176+
_ = group.Wait()
167177
elapsed := time.Since(startTime)
168178
s.logger.Info("scrape finished", zap.Float64("latency", elapsed.Seconds()))
169179

170180
s.scrapeDuration.Record(ctx, elapsed.Seconds())
171-
s.scrapeCount.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "success")))
181+
s.scrapeCount.Add(ctx, 1)
182+
183+
var queryErrors int
184+
for _, result := range results {
185+
if result.err != nil {
186+
queryErrors++
187+
}
188+
}
172189

173190
// Cache mappings from resource UUIDs to human-readable names. Note: we can also add mappings
174191
// for higher-cardinality resources like instances and disks, but this would add more latency to
@@ -198,8 +215,12 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
198215
}
199216
}
200217

218+
var parseErrors int
201219
for _, result := range results {
202-
for _, table := range result.Tables {
220+
if result.err != nil {
221+
continue
222+
}
223+
for _, table := range result.response.Tables {
203224
for _, series := range table.Timeseries {
204225
rm := metrics.ResourceMetrics().AppendEmpty()
205226
resource := rm.Resource()
@@ -253,9 +274,7 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
253274
zap.String("metric", table.Name),
254275
zap.Error(err),
255276
)
256-
s.metricParseErrors.Add(ctx, 1, metric.WithAttributes(
257-
attribute.String("metric_name", table.Name),
258-
))
277+
parseErrors++
259278
}
260279
// Handle scalar gauge.
261280
case v0.MetricType == oxide.MetricTypeGauge:
@@ -266,9 +285,7 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
266285
zap.String("metric", table.Name),
267286
zap.Error(err),
268287
)
269-
s.metricParseErrors.Add(ctx, 1, metric.WithAttributes(
270-
attribute.String("metric_name", table.Name),
271-
))
288+
parseErrors++
272289
}
273290

274291
// Handle scalar counter.
@@ -287,30 +304,27 @@ func (s *oxideScraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
287304
zap.String("metric", table.Name),
288305
zap.Error(err),
289306
)
290-
s.metricParseErrors.Add(ctx, 1, metric.WithAttributes(
291-
attribute.String("metric_name", table.Name),
292-
))
307+
parseErrors++
293308
}
294309
}
295310

296311
}
297312
}
298313
}
299314

300-
for idx, metricName := range s.metricNames {
301-
s.apiRequestDuration.Record(
302-
ctx,
303-
latencies[idx].Seconds(),
304-
metric.WithAttributes(attribute.String("request_name", metricName)),
305-
)
306-
}
307-
308315
if s.cfg.AddUtilizationMetrics {
309316
if err := s.addSiloUtilization(ctx, metrics); err != nil {
310317
return metrics, fmt.Errorf("adding silo utilization metrics: %w", err)
311318
}
312319
}
313320

321+
// Propagate partial errors to the collector machinery.
322+
if queryErrors > 0 || parseErrors > 0 {
323+
return metrics, scrapererror.NewPartialScrapeError(
324+
fmt.Errorf("%d query errors, %d parse errors", queryErrors, parseErrors),
325+
queryErrors+parseErrors,
326+
)
327+
}
314328
return metrics, nil
315329
}
316330

0 commit comments

Comments
 (0)