diff --git a/appender.go b/appender.go index f1a31c9..3cb08c7 100644 --- a/appender.go +++ b/appender.go @@ -409,13 +409,13 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { } return err } - var ( - docsFailed, docsIndexed, + var docsFailed, docsIndexed, // breakdown of failed docs: tooManyRequests, // failed after document retries (if it applies) and final status is 429 clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429 serverFailed int64 // failed after document retries (if it applies) and final status is 500s - ) + + failureStoreDocs := resp.FailureStoreDocs docsIndexed = resp.Indexed var failedCount map[BulkIndexerResponseItem]int if len(resp.FailedDocs) > 0 { @@ -483,11 +483,41 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { metric.WithAttributes(attribute.String("status", "FailedServer")), ) } + if failureStoreDocs.Used > 0 { + a.addCount(failureStoreDocs.Used, nil, + a.metrics.docsIndexed, + metric.WithAttributes( + attribute.String("status", "FailureStore"), + attribute.String("failure_store", string(FailureStoreStatusUsed)), + ), + ) + } + if failureStoreDocs.Failed > 0 { + a.addCount(failureStoreDocs.Failed, nil, + a.metrics.docsIndexed, + metric.WithAttributes( + attribute.String("status", "FailureStore"), + attribute.String("failure_store", string(FailureStoreStatusFailed)), + ), + ) + } + if failureStoreDocs.NotEnabled > 0 { + a.addCount(failureStoreDocs.NotEnabled, nil, + a.metrics.docsIndexed, + metric.WithAttributes( + attribute.String("status", "FailureStore"), + attribute.String("failure_store", string(FailureStoreStatusNotEnabled)), + ), + ) + } logger.Debug( "bulk request completed", zap.Int64("docs_indexed", docsIndexed), zap.Int64("docs_failed", docsFailed), zap.Int64("docs_rate_limited", tooManyRequests), + zap.Int64("docs_failure_store_used", failureStoreDocs.Used), + zap.Int64("docs_failure_store_failed", failureStoreDocs.Failed), + zap.Int64("docs_failure_store_not_enabled", failureStoreDocs.NotEnabled), ) if a.otelTracingEnabled() && span.IsRecording() { span.SetStatus(codes.Ok, "") diff --git a/appender_test.go b/appender_test.go index 49fb953..8161541 100644 --- a/appender_test.go +++ b/appender_test.go @@ -67,18 +67,24 @@ func TestAppender(t *testing.T) { // "too many requests". These will be recorded as failures in indexing // stats. for i := range result.Items { - if i > 2 { + if i > 5 { break } - status := http.StatusInternalServerError - switch i { - case 1: - status = http.StatusTooManyRequests - case 2: - status = http.StatusUnauthorized - } for action, item := range result.Items[i] { - item.Status = status + switch i { + case 0: + item.Status = http.StatusInternalServerError + case 1: + item.Status = http.StatusTooManyRequests + case 2: + item.Status = http.StatusUnauthorized + case 3: + item.FailureStore = string(docappender.FailureStoreStatusUsed) + case 4: + item.FailureStore = string(docappender.FailureStoreStatusFailed) + case 5: + item.FailureStore = string(docappender.FailureStoreStatusNotEnabled) + } result.Items[i][action] = item } } @@ -157,7 +163,7 @@ loop: asserted.Add(1) counter := metric.Data.(metricdata.Sum[int64]) for _, dp := range counter.DataPoints { - metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, dp, attrs.ToSlice()...) + metricdatatest.AssertHasAttributes(t, dp, attrs.ToSlice()...) status, exist := dp.Attributes.Value(attribute.Key("status")) assert.True(t, exist) switch status.AsString() { @@ -173,6 +179,19 @@ loop: case "TooMany": processedAsserted++ assert.Equal(t, stats.TooManyRequests, dp.Value) + case "FailureStore": + processedAsserted++ + fs, exist := dp.Attributes.Value(attribute.Key("failure_store")) + assert.True(t, exist) + assert.Contains( + t, + []docappender.FailureStoreStatus{ + docappender.FailureStoreStatusUsed, + docappender.FailureStoreStatusFailed, + docappender.FailureStoreStatusNotEnabled, + }, + docappender.FailureStoreStatus(fs.AsString()), + ) default: assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) } @@ -206,7 +225,7 @@ loop: assert.Empty(t, unexpectedMetrics) assert.Equal(t, int64(7), asserted.Load()) - assert.Equal(t, 4, processedAsserted) + assert.Equal(t, 7, processedAsserted) } func TestAppenderRetry(t *testing.T) { @@ -753,7 +772,6 @@ func TestAppenderFlushRequestError(t *testing.T) { } }) assert.Equal(t, int64(1), asserted.Load()) - } t.Run("400", func(t *testing.T) { test(t, http.StatusBadRequest, "flush failed (400): {\"error\":{\"type\":\"x_content_parse_exception\",\"caused_by\":{\"type\":\"json_parse_exception\"}}}") @@ -806,7 +824,7 @@ func TestAppenderIndexFailedLogging(t *testing.T) { core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) indexer, err := docappender.New(client, docappender.Config{ - FlushBytes: 500, + FlushBytes: 5000, Logger: zap.New(core), }) require.NoError(t, err) @@ -1372,7 +1390,8 @@ func TestAppenderCloseBusyIndexer(t *testing.T) { BytesTotal: bytesTotal, BytesUncompressedTotal: bytesUncompressedTotal, AvailableBulkRequests: 10, - IndexersActive: 0}, indexer.Stats()) + IndexersActive: 0, + }, indexer.Stats()) } func TestAppenderPipeline(t *testing.T) { diff --git a/bulk_indexer.go b/bulk_indexer.go index c48be79..8bc00a0 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -114,6 +114,16 @@ type BulkIndexerResponseStat struct { Indexed int64 // RetriedDocs contains the total number of retried documents. RetriedDocs int64 + // FailureStoreDocs contains failure store specific document stats. + FailureStoreDocs struct { + // Used contains the total number of documents indexed to failure store. + Used int64 + // Failed contains the total number of documents which failed when indexed to failure store. + Failed int64 + // NotEnabled contains the total number of documents which could have been indexed to failure store + // if it was enabled. + NotEnabled int64 + } // GreatestRetry contains the greatest observed retry count in the entire // bulk request. GreatestRetry int @@ -134,6 +144,22 @@ type BulkIndexerResponseItem struct { } `json:"error,omitempty"` } +// FailureStoreStatus defines enumeration type for all known failure store statuses. +type FailureStoreStatus string + +const ( + // FailureStoreStatusUnknown implicit status which represents that there is no information about + // this response or that the failure store is not applicable. + FailureStoreStatusUnknown FailureStoreStatus = "not_applicable_or_unknown" + // FailureStoreStatusUsed status which represents that this document was stored in the failure store successfully. + FailureStoreStatusUsed FailureStoreStatus = "used" + // FailureStoreStatusFailed status which represents that this document was rejected from the failure store. + FailureStoreStatusFailed FailureStoreStatus = "failed" + // FailureStoreStatusNotEnabled status which represents that this document was rejected, but + // it could have ended up in the failure store if it was enabled. + FailureStoreStatusNotEnabled FailureStoreStatus = "not_enabled" +) + func init() { jsoniter.RegisterTypeDecoderFunc("docappender.BulkIndexerResponseStat", func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { iter.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool { @@ -149,6 +175,16 @@ func init() { item.Index = i.ReadString() case "status": item.Status = i.ReadInt() + case "failure_store": + // For the stats track only actionable explicit failure store statuses "used", "failed" and "not_enabled". + switch fs := i.ReadString(); FailureStoreStatus(fs) { + case FailureStoreStatusUsed: + (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.Used++ + case FailureStoreStatusFailed: + (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.Failed++ + case FailureStoreStatusNotEnabled: + (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.NotEnabled++ + } case "error": i.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool { switch s { @@ -419,7 +455,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // See: https://github.com/golang/go/issues/51907 Body: bytes.NewReader(b.buf.Bytes()), Header: make(http.Header), - FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"}, + FilterPath: []string{"items.*._index", "items.*.status", "items.*.failure_store", "items.*.error.type", "items.*.error.reason"}, Pipeline: b.config.Pipeline, } if b.requireDataStream { diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index e2b069f..6788b74 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -295,3 +295,49 @@ func TestItemRequireDataStream(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), stat.Indexed) } + +func TestBulkIndexer_FailureStore(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + var i int + for _, itemsMap := range result.Items { + for k, item := range itemsMap { + switch i % 4 { + case 0: + item.FailureStore = string(docappender.FailureStoreStatusUsed) + case 1: + item.FailureStore = string(docappender.FailureStoreStatusFailed) + case 2: + item.FailureStore = string(docappender.FailureStoreStatusUnknown) + case 3: + item.FailureStore = string(docappender.FailureStoreStatusNotEnabled) + } + itemsMap[k] = item + i++ + } + } + err := json.NewEncoder(w).Encode(result) + require.NoError(t, err) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + for range 4 { + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + } + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(4), stat.Indexed) + require.Equal(t, int64(1), stat.FailureStoreDocs.Used) + require.Equal(t, int64(1), stat.FailureStoreDocs.Failed) + require.Equal(t, int64(1), stat.FailureStoreDocs.NotEnabled) +} diff --git a/go.mod b/go.mod index 0e517b9..0a90c32 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/elastic/go-docappender/v2 -go 1.22 +go 1.22.0 require ( - github.com/elastic/go-elasticsearch/v8 v8.17.0 + github.com/elastic/go-elasticsearch/v8 v8.17.1 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.18.0 github.com/stretchr/testify v1.10.0 @@ -23,7 +23,7 @@ require ( require ( github.com/armon/go-radix v1.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect github.com/elastic/go-sysinfo v1.7.1 // indirect github.com/elastic/go-windows v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -31,14 +31,17 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.6.3 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.27.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index b912277..f997483 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,13 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= -github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-elasticsearch/v8 v8.17.0 h1:e9cWksE/Fr7urDRmGPGp47Nsp4/mvNOrU8As1l2HQQ0= -github.com/elastic/go-elasticsearch/v8 v8.17.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64= +github.com/elastic/elastic-transport-go/v8 v8.6.1 h1:h2jQRqH6eLGiBSN4eZbQnJLtL4bC5b4lfVFRjw2R4e4= +github.com/elastic/elastic-transport-go/v8 v8.6.1/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.17.1 h1:bOXChDoCMB4TIwwGqKd031U8OXssmWLT3UrAr9EGs3Q= +github.com/elastic/go-elasticsearch/v8 v8.17.1/go.mod h1:MVJCtL+gJJ7x5jFeUmA20O7rvipX8GcQmo5iBcmaJn4= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= @@ -30,8 +31,10 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -41,6 +44,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -49,6 +53,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -90,8 +97,9 @@ golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=