diff --git a/bulk_indexer.go b/bulk_indexer.go index c7444b0..22d9902 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -18,9 +18,12 @@ package docappender import ( + "bufio" "bytes" "context" + "encoding/binary" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -254,6 +257,21 @@ func (b *BulkIndexer) resetBuf() { } } +// Size returns the size of the buffer used by bulk indexer as per +// the specified sizer type. + +// EXPERIMENTAL: This is an experimental API and can be removed or +// modified with breaking changes. +func (b *BulkIndexer) Size(sizerType SizerType) int { + switch sizerType { + case ItemsCountSizer: + return b.Items() + case BytesSizer: + return b.Len() + } + return b.Items() +} + // Items returns the number of buffered items. func (b *BulkIndexer) Items() int { return b.itemsAdded @@ -381,6 +399,125 @@ func (b *BulkIndexer) writeMeta( b.jsonw.Reset() } +// Merge merges another bulk indexer to the current one. +// The merged bulk indexer should not be used after the method returns. +// +// EXPERIMENTAL: This is an experimental API and can be removed or +// modified with breaking changes. +func (b *BulkIndexer) Merge(other *BulkIndexer) error { + if b.config.CompressionLevel != other.config.CompressionLevel { + return errors.New("failed to merge bulk indexers, only same compression level merge is supported") + } + if other == nil { + return nil + } + + switch b.config.CompressionLevel { + case gzip.NoCompression: + if _, err := other.buf.WriteTo(b.writer); err != nil { + return fmt.Errorf("failed to merge uncompressed bulk indexers: %w", err) + } + default: + // All compression levels + if other.gzipw != nil { + if err := other.gzipw.Close(); err != nil { + return fmt.Errorf("failed to merge compressed bulk indexers: %w", err) + } + } + othergzip, err := gzip.NewReader(bytes.NewReader(other.buf.Bytes())) + if err != nil { + return fmt.Errorf("failed to merge compressed bulk indexers: %w", err) + } + defer othergzip.Close() + if _, err := othergzip.WriteTo(b.writer); err != nil { + return fmt.Errorf("failed to merge compressed bulk indexers: %w", err) + } + } + b.itemsAdded += other.itemsAdded + return nil +} + +// Split splits the data in the current bulk indexer into multiple +// bulk indexers based on the max size and the sizer type specified. +// Do not use the original bulk indexer after the method returns. +// +// EXPERIMENTAL: This is an experimental API and can be removed or +// modified with breaking changes. +func (b *BulkIndexer) Split(maxSize int, sizerType SizerType) ([]*BulkIndexer, error) { + size := b.Size(sizerType) + if size == 0 || size <= maxSize { + return []*BulkIndexer{b}, nil + } + + // Split of `b` is needed. If `gzip` writer is used then close it before splitting. + if b.gzipw != nil { + if err := b.gzipw.Close(); err != nil { + return nil, fmt.Errorf("failed to split bulk request, failed to close gzip writer: %w", err) + } + } + + var ( + result []*BulkIndexer + currBi *BulkIndexer + ) + + // The below logic calculates the size of the new data being added without + // considering compression. Considering the max size would generally be >>> + // single data size, the difference should be acceptable for practical cases. + var reader *bufio.Reader + if b.config.CompressionLevel != gzip.NoCompression { + gzipReader, err := gzip.NewReader(&b.buf) + if err != nil { + return nil, fmt.Errorf("failed to split bulk requests, failed to read compressed data: %w", err) + } + defer gzipReader.Close() + reader = bufio.NewReader(gzipReader) + } else { + reader = bufio.NewReader(&b.buf) + } + var tmpBuffer bytes.Buffer + for { + meta, err := reader.ReadSlice('\n') + if err != nil { + if err == io.EOF { + // EOF reached, metadata should not cause EOF so we can safely discard any read data here + break + } + return nil, fmt.Errorf("failed to split bulk requests, failed to read metadata: %w, %v", err, meta) + } + if _, err := tmpBuffer.Write(meta); err != nil { + return nil, fmt.Errorf("failed to split bulk requests, failed to write metadata: %w", err) + } + + data, err := reader.ReadSlice('\n') + if err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to split bulk requests, failed to read item: %w", err) + } + if _, err := tmpBuffer.Write(data); err != nil { + return nil, fmt.Errorf("failed to split bulk requests, failed to write item: %w", err) + } + + newDataSize := getSizeForByteBuffer(tmpBuffer, sizerType) + if newDataSize > maxSize { + return nil, errors.New("failed to split bulk request buffer, smallest bulk is greater than configured max size") + } + + // compression is not considered for calculating the size of the data + // to be added to the new bulk indexer + if currBi == nil || currBi.Size(sizerType)+newDataSize > maxSize { + currBi = newBulkIndexer(b.config) + result = append(result, currBi) + } + + if _, err := io.Copy(currBi.writer, &tmpBuffer); err != nil { + return nil, fmt.Errorf("failed to split bulk requests: %w", err) + } + currBi.itemsAdded++ + tmpBuffer.Reset() + } + return result, nil +} + func (b *BulkIndexer) newBulkIndexRequest(ctx context.Context) (*http.Request, error) { // We should not pass the original b.buf bytes.Buffer down to the client/http layer because // the indexer will reuse the buffer. The underlying http client/transport implementation may keep @@ -692,6 +829,50 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error return resp, nil } +func (b *BulkIndexer) AppendBinary(data []byte) ([]byte, error) { + if b.itemsAdded == 0 { + return data, nil + } + + if b.gzipw != nil { + if err := b.gzipw.Close(); err != nil { + return nil, fmt.Errorf("failed closing the gzip writer: %w", err) + } + } + + data = binary.AppendVarint(data, int64(b.itemsAdded)) + data = binary.AppendVarint(data, int64(b.bytesFlushed)) + data = binary.AppendVarint(data, int64(b.bytesUncompFlushed)) + data = binary.AppendVarint(data, int64(b.buf.Len())) + data = append(data, b.buf.Bytes()...) + return data, nil +} + +func (b *BulkIndexer) UnmarshalBinary(data []byte) (int, error) { + var read int + + itemsAdded, n := binary.Varint(data) + b.itemsAdded = int(itemsAdded) + data = data[n:] + read += n + + bytesFlushed, n := binary.Varint(data) + b.bytesFlushed = int(bytesFlushed) + data = data[n:] + read += n + + bytesUncompFlushed, n := binary.Varint(data) + b.bytesUncompFlushed = int(bytesUncompFlushed) + data = data[n:] + read += n + + bufLen, n := binary.Varint(data) + endIdx := n + int(bufLen) + b.buf = *bytes.NewBuffer(data[n:endIdx]) + + return read + endIdx, nil +} + func (b *BulkIndexer) shouldRetryOnStatus(docStatus int) bool { for _, status := range b.config.RetryOnDocumentStatus { if docStatus == status { @@ -736,3 +917,18 @@ func (e ErrorFlushFailed) ResponseBody() string { func (e ErrorFlushFailed) Error() string { return fmt.Sprintf("flush failed (%d): %s", e.statusCode, e.resp) } + +type SizerType int + +const ( + ItemsCountSizer SizerType = iota + BytesSizer +) + +func getSizeForByteBuffer(b bytes.Buffer, sizerType SizerType) int { + if sizerType == ItemsCountSizer { + return 1 + } + // Compression is not considered + return b.Len() +} diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index e902bbf..d5a422e 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -175,6 +175,270 @@ func TestBulkIndexer(t *testing.T) { } } +func TestBulkIndexer_Codec(t *testing.T) { + for _, tc := range []struct { + name string + + compressionLevel int + itemsCount int + }{ + { + name: "no_items_uncompressed", + compressionLevel: gzip.NoCompression, + itemsCount: 0, + }, + { + name: "no_items_compressed", + compressionLevel: gzip.BestCompression, + itemsCount: 0, + }, + { + name: "uncompressed", + compressionLevel: gzip.NoCompression, + itemsCount: 101, + }, + { + name: "compressed", + compressionLevel: gzip.BestCompression, + itemsCount: 1001, + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _ := docappendertest.DecodeBulkRequestWithStats(r) + json.NewEncoder(w).Encode(result) + }) + idx1, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, // disable retry + CompressionLevel: tc.compressionLevel, + }) + require.NoError(t, err) + for i := 0; i < tc.itemsCount; i++ { + require.NoError(t, idx1.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + } + b, err := idx1.AppendBinary(nil) + require.NoError(t, err) + expectedSize := idx1.Size(docappender.BytesSizer) + + idx2, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, // disable retry + CompressionLevel: tc.compressionLevel, + }) + require.NoError(t, err) + _, err = idx2.UnmarshalBinary(b) + require.NoError(t, err) + assert.Equal(t, tc.itemsCount, idx2.Items()) + assert.Equal(t, expectedSize, idx2.Size(docappender.BytesSizer)) + + // Add another item to the new indexer + require.NoError(t, idx2.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + stat, err := idx2.Flush(context.Background()) + require.NoError(t, err) + assert.Equal(t, int64(tc.itemsCount)+1, stat.Indexed) + }) + } +} + +func TestBulkIndexer_Merge(t *testing.T) { + for _, tc := range []struct { + name string + + sourceCompressionLevel int + sourceIndexerItems int + + targetCompressionLevel int + targetIndexerItems int + + expectedErr string + }{ + { + name: "src_target_empty", + sourceCompressionLevel: gzip.NoCompression, + sourceIndexerItems: 0, + targetCompressionLevel: gzip.NoCompression, + targetIndexerItems: 0, + }, + { + name: "src_empty", + sourceCompressionLevel: gzip.NoCompression, + sourceIndexerItems: 0, + targetCompressionLevel: gzip.NoCompression, + targetIndexerItems: 100, + }, + { + name: "target_empty", + sourceCompressionLevel: gzip.NoCompression, + sourceIndexerItems: 100, + targetCompressionLevel: gzip.NoCompression, + targetIndexerItems: 0, + }, + { + name: "merge_with_both_data", + sourceCompressionLevel: gzip.NoCompression, + sourceIndexerItems: 100, + targetCompressionLevel: gzip.NoCompression, + targetIndexerItems: 20, + }, + { + name: "compressed", + sourceCompressionLevel: gzip.BestCompression, + sourceIndexerItems: 100, + targetCompressionLevel: gzip.BestCompression, + targetIndexerItems: 20, + }, + { + name: "different_compression", + sourceCompressionLevel: gzip.BestCompression, + sourceIndexerItems: 100, + targetCompressionLevel: gzip.BestSpeed, + targetIndexerItems: 20, + expectedErr: "only same compression level merge is supported", + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _ := docappendertest.DecodeBulkRequestWithStats(r) + json.NewEncoder(w).Encode(result) + }) + sourceIndexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, // disable retry + CompressionLevel: tc.sourceCompressionLevel, + }) + require.NoError(t, err) + targetIndexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, // disable retry + CompressionLevel: tc.targetCompressionLevel, + }) + require.NoError(t, err) + generateLoad := func(items int, indexer *docappender.BulkIndexer) { + for indexer.Items() != items { + require.NoError(t, indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + } + } + + generateLoad(tc.sourceIndexerItems, sourceIndexer) + generateLoad(tc.targetIndexerItems, targetIndexer) + + totalItems := tc.sourceIndexerItems + tc.targetIndexerItems + err = targetIndexer.Merge(sourceIndexer) + if tc.expectedErr == "" { + require.NoError(t, err) + assert.Equal(t, totalItems, targetIndexer.Items()) + stat, err := targetIndexer.Flush(context.Background()) + require.NoError(t, err) + assert.Equal(t, totalItems, int(stat.Indexed)) + } else { + require.ErrorContains(t, err, tc.expectedErr) + } + }) + } +} + +func TestBulkIndexer_Split(t *testing.T) { + for _, tc := range []struct { + name string + + sourceIndexerMinSize int // used to populate test data + sourceCompressionLevel int + + maxSize int + sizerType docappender.SizerType + + expectedErr string + }{ + { + name: "empty", + maxSize: 10, + sizerType: docappender.ItemsCountSizer, + sourceIndexerMinSize: 0, + sourceCompressionLevel: gzip.BestCompression, + }, + { + name: "uncompressed_items_split", + maxSize: 100, + sizerType: docappender.ItemsCountSizer, + sourceIndexerMinSize: 205, + sourceCompressionLevel: gzip.NoCompression, + }, + { + name: "uncompressed_bytes_split", + maxSize: 10_000, + sizerType: docappender.BytesSizer, + sourceIndexerMinSize: 1_000_005, + sourceCompressionLevel: gzip.NoCompression, + }, + { + name: "compressed_items_split", + maxSize: 100, + sizerType: docappender.ItemsCountSizer, + sourceIndexerMinSize: 205, + sourceCompressionLevel: gzip.BestCompression, + }, + { + name: "compressed_bytes_split", + maxSize: 10_000, + sizerType: docappender.BytesSizer, + sourceIndexerMinSize: 1_000_005, + sourceCompressionLevel: gzip.BestCompression, + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _ := docappendertest.DecodeBulkRequestWithStats(r) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, // disable retry + CompressionLevel: tc.sourceCompressionLevel, + }) + require.NoError(t, err) + + // Populate the required indexer to size + for indexer.Size(tc.sizerType) < tc.sourceIndexerMinSize { + require.NoError(t, indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + } + totalItems := indexer.Items() + + splitBulkIndexers, err := indexer.Split(tc.maxSize, tc.sizerType) + require.NoError(t, err) + + var indexedItems int + for _, bi := range splitBulkIndexers { + stat, err := bi.Flush(context.Background()) + require.NoError(t, err) + assert.LessOrEqual(t, bi.Size(tc.sizerType), tc.maxSize) + indexedItems += int(stat.Indexed) + } + assert.Equal(t, totalItems, indexedItems) + }) + } +} + func TestDynamicTemplates(t *testing.T) { client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { _, result, _, dynamicTemplates := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplates(r) @@ -819,3 +1083,108 @@ func TestPopulateFailedDocsInput(t *testing.T) { }) } } + +func BenchmarkAppendBinary(b *testing.B) { + itemsCount := 100000 + for _, bc := range []struct { + name string + compression int + }{ + { + name: "uncompressed", + compression: gzip.NoCompression, + }, + { + name: "compressed", + compression: gzip.BestCompression, + }, + } { + b.Run(bc.name, func(b *testing.B) { + client := docappendertest.NewMockElasticsearchClient( + b, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + ) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, + CompressionLevel: bc.compression, + }) + require.NoError(b, err) + + for i := 0; i < itemsCount; i++ { + require.NoError(b, indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, err = indexer.AppendBinary(nil) + if err != nil { + // Error check to avoid benchmarking `require` overhead + require.NoError(b, err) + } + } + }) + } +} + +func BenchmarkMerge(b *testing.B) { + client := docappendertest.NewMockElasticsearchClient( + b, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + ) + newIndexerCfg := func(compression int) docappender.BulkIndexerConfig { + return docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: 0, + CompressionLevel: compression, + } + } + for _, bc := range []struct { + name string + compression int + }{ + { + name: "uncompressed", + compression: gzip.NoCompression, + }, + { + name: "compressed", + compression: gzip.BestCompression, + }, + } { + b.Run(bc.name, func(b *testing.B) { + // prepare bulk indexers for benchmarking merge + bulkIndexers := make([]*docappender.BulkIndexer, 0, b.N) + for i := 0; i < b.N; i++ { + indexer, err := docappender.NewBulkIndexer(newIndexerCfg(bc.compression)) + require.NoError(b, err) + + for j := 0; j < 100; j++ { + require.NoError(b, indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + } + bulkIndexers = append(bulkIndexers, indexer) + } + // create final indexer that will merge data from all other indexers + final, err := docappender.NewBulkIndexer(newIndexerCfg(bc.compression)) + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + assert.NoError(b, final.Merge(bulkIndexers[i])) + } + }) + } +} diff --git a/docappendertest/docappendertest.go b/docappendertest/docappendertest.go index a06bf5a..d892c5a 100644 --- a/docappendertest/docappendertest.go +++ b/docappendertest/docappendertest.go @@ -96,7 +96,8 @@ func DecodeBulkRequest(r *http.Request) ([][]byte, BulkIndexerResponse) { func DecodeBulkRequestWithStats(r *http.Request) ( docs [][]byte, res BulkIndexerResponse, - stats RequestStats) { + stats RequestStats, +) { indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r) return indexed, result, stats } @@ -119,8 +120,8 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( docs [][]byte, res BulkIndexerResponse, stats RequestStats, - dynamicTemplates []map[string]string) { - + dynamicTemplates []map[string]string, +) { indexed, result, stats, dynamicTemplates, _ := DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r) return indexed, result, stats, dynamicTemplates }