Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package docappender

import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -254,6 +257,21 @@ func (b *BulkIndexer) resetBuf() {
}
}

// Size returns the size of the buffer used by bulk indexer as per
// the specified sizer type.

// EXPERIMENTAL: This is an experimental API and can be removed or
// modified with breaking changes.
func (b *BulkIndexer) Size(sizerType SizerType) int {
switch sizerType {
case ItemsCountSizer:
return b.Items()
case BytesSizer:
return b.Len()
}
return b.Items()
}

// Items returns the number of buffered items.
func (b *BulkIndexer) Items() int {
return b.itemsAdded
Expand Down Expand Up @@ -381,6 +399,125 @@ func (b *BulkIndexer) writeMeta(
b.jsonw.Reset()
}

// Merge merges another bulk indexer to the current one.
// The merged bulk indexer should not be used after the method returns.
//
// EXPERIMENTAL: This is an experimental API and can be removed or
// modified with breaking changes.
func (b *BulkIndexer) Merge(other *BulkIndexer) error {
if b.config.CompressionLevel != other.config.CompressionLevel {
return errors.New("failed to merge bulk indexers, only same compression level merge is supported")
}
if other == nil {
return nil
}

switch b.config.CompressionLevel {
case gzip.NoCompression:
if _, err := other.buf.WriteTo(b.writer); err != nil {
return fmt.Errorf("failed to merge uncompressed bulk indexers: %w", err)
}
default:
// All compression levels
if other.gzipw != nil {
if err := other.gzipw.Close(); err != nil {
return fmt.Errorf("failed to merge compressed bulk indexers: %w", err)
}
}
othergzip, err := gzip.NewReader(bytes.NewReader(other.buf.Bytes()))
if err != nil {
return fmt.Errorf("failed to merge compressed bulk indexers: %w", err)
}
defer othergzip.Close()
if _, err := othergzip.WriteTo(b.writer); err != nil {
return fmt.Errorf("failed to merge compressed bulk indexers: %w", err)
}
}
b.itemsAdded += other.itemsAdded
return nil
}

// Split splits the data in the current bulk indexer into multiple
// bulk indexers based on the max size and the sizer type specified.
// Do not use the original bulk indexer after the method returns.
//
// EXPERIMENTAL: This is an experimental API and can be removed or
// modified with breaking changes.
func (b *BulkIndexer) Split(maxSize int, sizerType SizerType) ([]*BulkIndexer, error) {
size := b.Size(sizerType)
if size == 0 || size <= maxSize {
return []*BulkIndexer{b}, nil
}

// Split of `b` is needed. If `gzip` writer is used then close it before splitting.
if b.gzipw != nil {
if err := b.gzipw.Close(); err != nil {
return nil, fmt.Errorf("failed to split bulk request, failed to close gzip writer: %w", err)
}
}

var (
result []*BulkIndexer
currBi *BulkIndexer
)

// The below logic calculates the size of the new data being added without
// considering compression. Considering the max size would generally be >>>
// single data size, the difference should be acceptable for practical cases.
var reader *bufio.Reader
if b.config.CompressionLevel != gzip.NoCompression {
gzipReader, err := gzip.NewReader(&b.buf)
if err != nil {
return nil, fmt.Errorf("failed to split bulk requests, failed to read compressed data: %w", err)
}
defer gzipReader.Close()
reader = bufio.NewReader(gzipReader)
} else {
reader = bufio.NewReader(&b.buf)
}
var tmpBuffer bytes.Buffer
for {
meta, err := reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
// EOF reached, metadata should not cause EOF so we can safely discard any read data here
break
}
return nil, fmt.Errorf("failed to split bulk requests, failed to read metadata: %w, %v", err, meta)
}
if _, err := tmpBuffer.Write(meta); err != nil {
return nil, fmt.Errorf("failed to split bulk requests, failed to write metadata: %w", err)
}

data, err := reader.ReadSlice('\n')
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to split bulk requests, failed to read item: %w", err)
}
if _, err := tmpBuffer.Write(data); err != nil {
return nil, fmt.Errorf("failed to split bulk requests, failed to write item: %w", err)
}

newDataSize := getSizeForByteBuffer(tmpBuffer, sizerType)
if newDataSize > maxSize {
return nil, errors.New("failed to split bulk request buffer, smallest bulk is greater than configured max size")
}

// compression is not considered for calculating the size of the data
// to be added to the new bulk indexer
if currBi == nil || currBi.Size(sizerType)+newDataSize > maxSize {
currBi = newBulkIndexer(b.config)
result = append(result, currBi)
}

if _, err := io.Copy(currBi.writer, &tmpBuffer); err != nil {
return nil, fmt.Errorf("failed to split bulk requests: %w", err)
}
currBi.itemsAdded++
tmpBuffer.Reset()
}
return result, nil
}

func (b *BulkIndexer) newBulkIndexRequest(ctx context.Context) (*http.Request, error) {
// We should not pass the original b.buf bytes.Buffer down to the client/http layer because
// the indexer will reuse the buffer. The underlying http client/transport implementation may keep
Expand Down Expand Up @@ -692,6 +829,50 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
return resp, nil
}

func (b *BulkIndexer) AppendBinary(data []byte) ([]byte, error) {
if b.itemsAdded == 0 {
return data, nil
}

if b.gzipw != nil {
if err := b.gzipw.Close(); err != nil {
return nil, fmt.Errorf("failed closing the gzip writer: %w", err)
}
}

data = binary.AppendVarint(data, int64(b.itemsAdded))
data = binary.AppendVarint(data, int64(b.bytesFlushed))
data = binary.AppendVarint(data, int64(b.bytesUncompFlushed))
data = binary.AppendVarint(data, int64(b.buf.Len()))
data = append(data, b.buf.Bytes()...)
return data, nil
}

func (b *BulkIndexer) UnmarshalBinary(data []byte) (int, error) {
var read int

itemsAdded, n := binary.Varint(data)
b.itemsAdded = int(itemsAdded)
data = data[n:]
read += n

bytesFlushed, n := binary.Varint(data)
b.bytesFlushed = int(bytesFlushed)
data = data[n:]
read += n

bytesUncompFlushed, n := binary.Varint(data)
b.bytesUncompFlushed = int(bytesUncompFlushed)
data = data[n:]
read += n

bufLen, n := binary.Varint(data)
endIdx := n + int(bufLen)
b.buf = *bytes.NewBuffer(data[n:endIdx])

return read + endIdx, nil
}

func (b *BulkIndexer) shouldRetryOnStatus(docStatus int) bool {
for _, status := range b.config.RetryOnDocumentStatus {
if docStatus == status {
Expand Down Expand Up @@ -736,3 +917,18 @@ func (e ErrorFlushFailed) ResponseBody() string {
func (e ErrorFlushFailed) Error() string {
return fmt.Sprintf("flush failed (%d): %s", e.statusCode, e.resp)
}

type SizerType int

const (
ItemsCountSizer SizerType = iota
BytesSizer
)

func getSizeForByteBuffer(b bytes.Buffer, sizerType SizerType) int {
if sizerType == ItemsCountSizer {
return 1
}
// Compression is not considered
return b.Len()
}
Loading