Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d3506a0
failure store: update bulk indexer to support fs stats
1pkg Jan 25, 2025
2cdab68
Merge branch 'main' into response_failure_store_support
1pkg Jan 29, 2025
94581be
test: fix racy test
1pkg Jan 29, 2025
38095d1
failure store: appender stats update
1pkg Jan 29, 2025
bc81099
failure store: update stats too
1pkg Jan 29, 2025
6e8684c
failure store: add to response filter
1pkg Jan 30, 2025
8b691cc
failure store: ignore implicit not_applicable_or_unknown status
1pkg Jan 30, 2025
2f4f9f3
failure store: add relevant tests
1pkg Jan 31, 2025
f2332ac
failure store: add status const
1pkg Jan 31, 2025
323b9ba
failure store: fix typo
1pkg Jan 31, 2025
ee7973c
failure store: fix metric label test
1pkg Jan 31, 2025
442690a
failure store: use main branch from go-elasticsearch
1pkg Jan 31, 2025
18c7530
failure store: track "not_enabled" status
1pkg Jan 31, 2025
0250223
failure store: fix formatting
1pkg Jan 31, 2025
9423aa3
failure store: use latest elasticsearch client release
1pkg Feb 12, 2025
c0d492c
Merge branch 'main' into response_failure_store_support
1pkg Feb 12, 2025
1f7e409
Merge branch 'main' into response_failure_store_support
1pkg Feb 12, 2025
22f8a12
Merge branch 'main' into response_failure_store_support
1pkg Feb 13, 2025
48f9d85
failure store: do not export legacy metrics
1pkg Feb 13, 2025
19d6d05
Merge branch 'response_failure_store_support' of github.com:1pkg/go-d…
1pkg Feb 13, 2025
d2fbedf
Merge branch 'main' into response_failure_store_support
1pkg Feb 14, 2025
80a3199
failure store: use consistent docs naming for the stats
1pkg Feb 19, 2025
2f8eaad
Merge branch 'main' into response_failure_store_support
1pkg Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,23 @@ var (
// server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests.
type Appender struct {
// legacy metrics for Stats()
bulkRequests int64
docsAdded int64
docsActive int64
docsFailed int64
docsFailedClient int64
docsFailedServer int64
docsIndexed int64
tooManyRequests int64
bytesTotal int64
bytesUncompressedTotal int64
availableBulkRequests int64
activeCreated int64
activeDestroyed int64
blockedAdd int64
bulkRequests int64
docsAdded int64
docsActive int64
docsFailed int64
docsFailedClient int64
docsFailedServer int64
docsIndexed int64
docsFailureStoreUsed int64
docsFailureStoreFailed int64
docsFailureStoreNotEnabled int64
tooManyRequests int64
bytesTotal int64
bytesUncompressedTotal int64
availableBulkRequests int64
activeCreated int64
activeDestroyed int64
blockedAdd int64

scalingInfo atomic.Value

Expand Down Expand Up @@ -255,6 +258,9 @@ func (a *Appender) Stats() Stats {
IndexersActive: a.scalingInformation().activeIndexers,
IndexersCreated: atomic.LoadInt64(&a.activeCreated),
IndexersDestroyed: atomic.LoadInt64(&a.activeDestroyed),
FailureStoreUsed: atomic.LoadInt64(&a.docsFailureStoreUsed),
FailureStoreFailed: atomic.LoadInt64(&a.docsFailureStoreFailed),
FailureStoreNotEnabled: atomic.LoadInt64(&a.docsFailureStoreNotEnabled),
}
}

Expand Down Expand Up @@ -409,13 +415,13 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}
return err
}
var (
docsFailed, docsIndexed,
var docsFailed, docsIndexed,
// breakdown of failed docs:
tooManyRequests, // failed after document retries (if it applies) and final status is 429
clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429
serverFailed int64 // failed after document retries (if it applies) and final status is 500s
)

failureStore := resp.FailureStore
docsIndexed = resp.Indexed
var failedCount map[BulkIndexerResponseItem]int
if len(resp.FailedDocs) > 0 {
Expand Down Expand Up @@ -483,11 +489,41 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
metric.WithAttributes(attribute.String("status", "FailedServer")),
)
}
if failureStore.Used > 0 {
a.addCount(failureStore.Used, &a.docsFailureStoreUsed,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusUsed)),
),
)
}
if failureStore.Failed > 0 {
a.addCount(failureStore.Failed, &a.docsFailureStoreFailed,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusFailed)),
),
)
}
if failureStore.NotEnabled > 0 {
a.addCount(failureStore.NotEnabled, &a.docsFailureStoreNotEnabled,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusNotEnabled)),
),
)
}
logger.Debug(
"bulk request completed",
zap.Int64("docs_indexed", docsIndexed),
zap.Int64("docs_failed", docsFailed),
zap.Int64("docs_rate_limited", tooManyRequests),
zap.Int64("docs_failure_store_used", failureStore.Used),
zap.Int64("docs_failure_store_failed", failureStore.Failed),
zap.Int64("docs_failure_store_not_enabled", failureStore.NotEnabled),
)
if a.otelTracingEnabled() && span.IsRecording() {
span.SetStatus(codes.Ok, "")
Expand Down Expand Up @@ -826,6 +862,18 @@ type Stats struct {

// IndexersDestroyed represents the number of times an active indexer was destroyed.
IndexersDestroyed int64

// FailureStoreUsed represents the number of indexing operations that have resolved
// in indexing to failure store.
FailureStoreUsed int64

// FailureStoreFailed represents the number of indexing operations that have failed
// while indexing to failure store.
FailureStoreFailed int64

// FailureStoreNoEnabled represents the number of indexing operations that could have been
// processed by failure store if it was enabled.
FailureStoreNotEnabled int64
}

func timeFunc(f func()) time.Duration {
Expand Down
47 changes: 34 additions & 13 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,24 @@ func TestAppender(t *testing.T) {
// "too many requests". These will be recorded as failures in indexing
// stats.
for i := range result.Items {
if i > 2 {
if i > 5 {
break
}
status := http.StatusInternalServerError
switch i {
case 1:
status = http.StatusTooManyRequests
case 2:
status = http.StatusUnauthorized
}
for action, item := range result.Items[i] {
item.Status = status
switch i {
case 0:
item.Status = http.StatusInternalServerError
case 1:
item.Status = http.StatusTooManyRequests
case 2:
item.Status = http.StatusUnauthorized
case 3:
item.FailureStore = string(docappender.FailureStoreStatusUsed)
case 4:
item.FailureStore = string(docappender.FailureStoreStatusFailed)
case 5:
item.FailureStore = string(docappender.FailureStoreStatusNotEnabled)
}
result.Items[i][action] = item
}
}
Expand Down Expand Up @@ -145,6 +151,9 @@ loop:
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressed,
FailureStoreUsed: 1,
FailureStoreFailed: 1,
FailureStoreNotEnabled: 1,
}, stats)

var rm metricdata.ResourceMetrics
Expand Down Expand Up @@ -173,6 +182,18 @@ loop:
case "TooMany":
processedAsserted++
assert.Equal(t, stats.TooManyRequests, dp.Value)
case "FailureStore":
processedAsserted++
fs, exist := dp.Attributes.Value(attribute.Key("failure_store"))
assert.True(t, exist)
switch docappender.FailureStoreStatus(fs.AsString()) {
case docappender.FailureStoreStatusUsed:
assert.Equal(t, stats.FailureStoreUsed, dp.Value)
case docappender.FailureStoreStatusFailed:
assert.Equal(t, stats.FailureStoreFailed, dp.Value)
case docappender.FailureStoreStatusNotEnabled:
assert.Equal(t, stats.FailureStoreNotEnabled, dp.Value)
}
default:
assert.FailNow(t, "Unexpected metric with status: "+status.AsString())
}
Expand Down Expand Up @@ -206,7 +227,7 @@ loop:

assert.Empty(t, unexpectedMetrics)
assert.Equal(t, int64(7), asserted.Load())
assert.Equal(t, 4, processedAsserted)
assert.Equal(t, 7, processedAsserted)
}

func TestAppenderRetry(t *testing.T) {
Expand Down Expand Up @@ -753,7 +774,6 @@ func TestAppenderFlushRequestError(t *testing.T) {
}
})
assert.Equal(t, int64(1), asserted.Load())

}
t.Run("400", func(t *testing.T) {
test(t, http.StatusBadRequest, "flush failed (400): {\"error\":{\"type\":\"x_content_parse_exception\",\"caused_by\":{\"type\":\"json_parse_exception\"}}}")
Expand Down Expand Up @@ -806,7 +826,7 @@ func TestAppenderIndexFailedLogging(t *testing.T) {

core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
indexer, err := docappender.New(client, docappender.Config{
FlushBytes: 500,
FlushBytes: 5000,
Logger: zap.New(core),
})
require.NoError(t, err)
Expand Down Expand Up @@ -1372,7 +1392,8 @@ func TestAppenderCloseBusyIndexer(t *testing.T) {
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
AvailableBulkRequests: 10,
IndexersActive: 0}, indexer.Stats())
IndexersActive: 0,
}, indexer.Stats())
}

func TestAppenderPipeline(t *testing.T) {
Expand Down
38 changes: 37 additions & 1 deletion bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ type BulkIndexerResponseStat struct {
Indexed int64
// RetriedDocs contains the total number of retried documents.
RetriedDocs int64
// FailureStore contains failure store specific stats.
FailureStore struct {
// Used contains the total number of documents indexed to failure store.
Used int64
// Failed contains the total number of documents which failed when indexed to failure store.
Failed int64
// NotEnabled contains the total number of documents which could have been indexed to failure store
// if it was enabled.
NotEnabled int64
}
// GreatestRetry contains the greatest observed retry count in the entire
// bulk request.
GreatestRetry int
Expand All @@ -125,6 +135,22 @@ type BulkIndexerResponseItem struct {
} `json:"error,omitempty"`
}

// FailureStoreStatus defines enumeration type for all known failure store statuses.
type FailureStoreStatus string

const (
// FailureStoreStatusUnknown implicit status which represents that there is no information about
// this response or that the failure store is not applicable.
FailureStoreStatusUnknown FailureStoreStatus = "not_applicable_or_unknown"
// FailureStoreStatusUsed status which represents that this document was stored in the failure store successfully.
FailureStoreStatusUsed FailureStoreStatus = "used"
// FailureStoreStatusFailed status which represents that this document was rejected from the failure store.
FailureStoreStatusFailed FailureStoreStatus = "failed"
// FailureStoreStatusNotEnabled status which represents that this document was rejected, but
// it could have ended up in the failure store if it was enabled.
FailureStoreStatusNotEnabled FailureStoreStatus = "not_enabled"
)

func init() {
jsoniter.RegisterTypeDecoderFunc("docappender.BulkIndexerResponseStat", func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
iter.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool {
Expand All @@ -140,6 +166,16 @@ func init() {
item.Index = i.ReadString()
case "status":
item.Status = i.ReadInt()
case "failure_store":
// For the stats track only actionable explicit failure store statuses "used", "failed" and "not_enabled".
switch fs := i.ReadString(); FailureStoreStatus(fs) {
case FailureStoreStatusUsed:
(*((*BulkIndexerResponseStat)(ptr))).FailureStore.Used++
case FailureStoreStatusFailed:
(*((*BulkIndexerResponseStat)(ptr))).FailureStore.Failed++
case FailureStoreStatusNotEnabled:
(*((*BulkIndexerResponseStat)(ptr))).FailureStore.NotEnabled++
}
case "error":
i.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool {
switch s {
Expand Down Expand Up @@ -376,7 +412,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// See: https://github.com/golang/go/issues/51907
Body: bytes.NewReader(b.buf.Bytes()),
Header: make(http.Header),
FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"},
FilterPath: []string{"items.*._index", "items.*.status", "items.*.failure_store", "items.*.error.type", "items.*.error.reason"},
Pipeline: b.config.Pipeline,
}
if b.requireDataStream {
Expand Down
46 changes: 46 additions & 0 deletions bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,49 @@ func TestPipeline(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(2), stat.Indexed)
}

func TestBulkIndexer_FailureStore(t *testing.T) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := docappendertest.DecodeBulkRequest(r)
var i int
for _, itemsMap := range result.Items {
for k, item := range itemsMap {
switch i % 4 {
case 0:
item.FailureStore = string(docappender.FailureStoreStatusUsed)
case 1:
item.FailureStore = string(docappender.FailureStoreStatusFailed)
case 2:
item.FailureStore = string(docappender.FailureStoreStatusUnknown)
case 3:
item.FailureStore = string(docappender.FailureStoreStatusNotEnabled)
}
itemsMap[k] = item
i++
}
}
err := json.NewEncoder(w).Encode(result)
require.NoError(t, err)
})
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
})
require.NoError(t, err)

for range 4 {
err = indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
}),
})
require.NoError(t, err)
}

stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(4), stat.Indexed)
require.Equal(t, int64(1), stat.FailureStore.Used)
require.Equal(t, int64(1), stat.FailureStore.Failed)
require.Equal(t, int64(1), stat.FailureStore.NotEnabled)
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/elastic/go-docappender/v2

go 1.22
go 1.22.0

require (
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/elastic/go-elasticsearch/v8 v8.4.0-alpha.1.0.20250131174650-1dda5df4f11f
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.11
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -31,14 +31,17 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.6.3 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.27.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
howett.net/plist v1.0.0 // indirect
)
Loading