Skip to content

Commit 530381e

Browse files
authored
feat: add status code label to docs metric (#223)
If a request fails record the status code with the elasticsearch.events.processed metric. Improve debugging and analysis of failed requests
1 parent 322a299 commit 530381e

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

appender.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"go.opentelemetry.io/otel/attribute"
3636
"go.opentelemetry.io/otel/codes"
3737
"go.opentelemetry.io/otel/metric"
38+
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
3839
"go.opentelemetry.io/otel/trace"
3940
"go.uber.org/zap"
4041
"golang.org/x/sync/errgroup"
@@ -402,7 +403,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
402403
}
403404
if status != "" {
404405
a.addCount(int64(n), legacy, a.metrics.docsIndexed,
405-
metric.WithAttributes(attribute.String("status", status)),
406+
metric.WithAttributes(attribute.String("status", status), semconv.HTTPResponseStatusCode(errFailed.statusCode)),
406407
)
407408
}
408409
}

appender_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
4646
sdktrace "go.opentelemetry.io/otel/sdk/trace"
4747
"go.opentelemetry.io/otel/sdk/trace/tracetest"
48+
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
4849
"go.uber.org/zap"
4950
"go.uber.org/zap/zapcore"
5051
"go.uber.org/zap/zaptest/observer"
@@ -693,7 +694,17 @@ func TestAppenderFlushRequestError(t *testing.T) {
693694
w.WriteHeader(sc)
694695
w.Write([]byte(`{"error": {"root_cause": [{"type": "x_content_parse_exception","reason": "reason"}],"type": "x_content_parse_exception","reason": "reason","caused_by": {"type": "json_parse_exception","reason": "reason"}},"status": 400}`))
695696
})
696-
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
697+
698+
rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
699+
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
700+
return metricdata.DeltaTemporality
701+
},
702+
))
703+
704+
indexer, err := docappender.New(client, docappender.Config{
705+
FlushInterval: time.Minute,
706+
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
707+
})
697708
require.NoError(t, err)
698709
defer indexer.Close(context.Background())
699710

@@ -716,15 +727,33 @@ func TestAppenderFlushRequestError(t *testing.T) {
716727
BytesTotal: bytesTotal,
717728
BytesUncompressedTotal: bytesUncompressedTotal,
718729
}
730+
var status string
719731
switch {
720732
case sc == 429:
733+
status = "TooMany"
721734
wantStats.TooManyRequests = int64(docs)
722735
case sc >= 500:
736+
status = "FailedServer"
723737
wantStats.FailedServer = int64(docs)
724738
case sc >= 400 && sc != 429:
739+
status = "FailedClient"
725740
wantStats.FailedClient = int64(docs)
726741
}
727742
assert.Equal(t, wantStats, stats)
743+
744+
var rm metricdata.ResourceMetrics
745+
assert.NoError(t, rdr.Collect(context.Background(), &rm))
746+
747+
var asserted atomic.Int64
748+
assertCounter := docappendertest.NewAssertCounter(t, &asserted)
749+
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
750+
switch m.Name {
751+
case "elasticsearch.events.processed":
752+
assertCounter(m, 3, attribute.NewSet(attribute.String("status", status), semconv.HTTPResponseStatusCode(sc)))
753+
}
754+
})
755+
assert.Equal(t, int64(1), asserted.Load())
756+
728757
}
729758
t.Run("400", func(t *testing.T) {
730759
test(t, http.StatusBadRequest, "flush failed (400): {\"error\":{\"type\":\"x_content_parse_exception\",\"caused_by\":{\"type\":\"json_parse_exception\"}}}")

0 commit comments

Comments
 (0)