Skip to content

Commit 631b10d

Browse files
Improve memory consumption for length calculation (#564) (#565)
Add tests Update tests that relies on a specific set of request Co-authored-by: Laurent Saint-Félix <[email protected]>
1 parent ef641c1 commit 631b10d

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

esutil/bulk_indexer.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"bytes"
2222
"context"
2323
"encoding/json"
24+
"errors"
2425
"fmt"
2526
"io"
2627
"io/ioutil"
@@ -179,22 +180,25 @@ func (item *BulkIndexerItem) marshallMeta() {
179180
// computeLength calculate the size of the body and the metadata.
180181
func (item *BulkIndexerItem) computeLength() error {
181182
if item.Body != nil {
182-
var buf bytes.Buffer
183-
_, err := io.Copy(&buf, item.Body)
184-
if err != nil {
185-
return err
183+
// TODO propagate buf len to config to allow for performance gains.
184+
var buf = make([]byte, 1<<4)
185+
for {
186+
n, err := item.Body.Read(buf)
187+
if errors.Is(err, io.EOF) {
188+
break
189+
} else if err != nil {
190+
return err
191+
}
192+
item.payloadLength += n
186193
}
187-
188-
_, err = item.Body.Seek(0, io.SeekStart)
194+
_, err := item.Body.Seek(0, io.SeekStart)
189195
if err != nil {
190196
return err
191197
}
192-
item.payloadLength = buf.Len()
193-
return nil
194198
}
195199
item.payloadLength += len(item.meta.Bytes())
196-
// Add two bytes to account for newlines.
197-
item.payloadLength += 2
200+
// Add one byte to account for newline at the end of payload.
201+
item.payloadLength++
198202

199203
return nil
200204
}

esutil/bulk_indexer_internal_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestBulkIndexer(t *testing.T) {
9999

100100
cfg := BulkIndexerConfig{
101101
NumWorkers: 1,
102-
FlushBytes: 38 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata
102+
FlushBytes: 39 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata
103103
FlushInterval: time.Hour, // Disable auto-flushing, because response doesn't match number of items
104104
Client: es}
105105
if os.Getenv("DEBUG") != "" {
@@ -576,7 +576,7 @@ func TestBulkIndexer(t *testing.T) {
576576
}
577577
es, _ := elasticsearch.NewClient(esCfg)
578578

579-
biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 50, Client: es}
579+
biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28*2, Client: es}
580580
if os.Getenv("DEBUG") != "" {
581581
biCfg.DebugLogger = log.New(os.Stdout, "", 0)
582582
}
@@ -955,6 +955,36 @@ func TestBulkIndexer(t *testing.T) {
955955
})
956956
}
957957

958+
func TestBulkIndexerItem(t *testing.T) {
959+
body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}`
960+
t.Run("correct computeLength size", func(t *testing.T) {
961+
expectedLength := 266
962+
bi := BulkIndexerItem{
963+
Action: "index",
964+
DocumentID: strconv.Itoa(1),
965+
Body: strings.NewReader(body),
966+
}
967+
bi.marshallMeta()
968+
bi.computeLength()
969+
if bi.payloadLength != expectedLength {
970+
t.Fatalf("invalid length, expected %d, got %d", expectedLength, bi.payloadLength)
971+
}
972+
})
973+
t.Run("empty reader length should be meta length plus newlines", func(t *testing.T) {
974+
expectedLength := 23
975+
bi := BulkIndexerItem{
976+
Action: "index",
977+
DocumentID: strconv.Itoa(1),
978+
Body: strings.NewReader(""),
979+
}
980+
bi.marshallMeta()
981+
bi.computeLength()
982+
if bi.payloadLength != expectedLength {
983+
t.Fatalf("invalid length, expected %d, got %d", expectedLength, bi.payloadLength)
984+
}
985+
})
986+
}
987+
958988
type customJSONDecoder struct{}
959989

960990
func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {

0 commit comments

Comments
 (0)