Skip to content

Commit ac0d424

Browse files
authored
elasticsearch.events.retried: Add retry dimension (#209)
This commit adds a new dimension to the `elasticsearch.events.retried` metric called `greatest_retry`. It represents the greatest observed retry count for a bulk request. This is useful to understand how many times documents in a bulk request have been retried. Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 786add0 commit ac0d424

File tree

3 files changed

+25
-6
lines changed

3 files changed

+25
-6
lines changed

appender.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
449449
}
450450
if resp.RetriedDocs > 0 {
451451
// docs are scheduled to be retried but not yet failed due to retry limit
452-
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried)
452+
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried,
453+
metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)),
454+
)
453455
}
454456
if docsIndexed > 0 {
455457
a.addCount(docsIndexed, &a.docsIndexed,

appender_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,11 @@ loop:
329329
case "elasticsearch.events.processed":
330330
assertProcessedCounter(m, indexerAttrs)
331331
case "elasticsearch.events.retried":
332-
assertCounter(m, 1, indexerAttrs)
332+
assertCounter(m, 1, attribute.NewSet(
333+
attribute.String("a", "b"),
334+
attribute.String("c", "d"),
335+
attribute.Int("greatest_retry", 1),
336+
))
333337
case "elasticsearch.bulk_requests.available":
334338
assertCounter(m, stats.AvailableBulkRequests, indexerAttrs)
335339
case "elasticsearch.flushed.bytes":
@@ -1046,7 +1050,9 @@ func TestAppenderRetryDocument(t *testing.T) {
10461050
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
10471051
switch m.Name {
10481052
case "elasticsearch.events.retried":
1049-
assertCounter(m, 5, *attribute.EmptySet())
1053+
assertCounter(m, 5, attribute.NewSet(
1054+
attribute.Int("greatest_retry", 1),
1055+
))
10501056
}
10511057
})
10521058
assert.Equal(t, int64(1), asserted.Load())
@@ -1064,7 +1070,9 @@ func TestAppenderRetryDocument(t *testing.T) {
10641070
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
10651071
switch m.Name {
10661072
case "elasticsearch.events.retried":
1067-
assertCounter(m, 5, *attribute.EmptySet())
1073+
assertCounter(m, 5, attribute.NewSet(
1074+
attribute.Int("greatest_retry", 2),
1075+
))
10681076
}
10691077
})
10701078
assert.Equal(t, int64(2), asserted.Load())

bulk_indexer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,15 @@ type BulkIndexer struct {
101101
}
102102

103103
type BulkIndexerResponseStat struct {
104-
Indexed int64
104+
// Indexed contains the total number of successfully indexed documents.
105+
Indexed int64
106+
// RetriedDocs contains the total number of retried documents.
105107
RetriedDocs int64
106-
FailedDocs []BulkIndexerResponseItem
108+
// GreatestRetry contains the greatest observed retry count in the entire
109+
// bulk request.
110+
GreatestRetry int
111+
// FailedDocs contains the failed documents.
112+
FailedDocs []BulkIndexerResponseItem
107113
}
108114

109115
// BulkIndexerResponseItem represents the Elasticsearch response item.
@@ -483,6 +489,9 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
483489
tmp = append(tmp, res)
484490
continue
485491
}
492+
if resp.GreatestRetry < count {
493+
resp.GreatestRetry = count
494+
}
486495

487496
// Since some items may have succeeded, counter positions need
488497
// to be updated to match the next current buffer position.

0 commit comments

Comments
 (0)