Skip to content

Commit 4f1507d

Browse files
Reduce CPU usage (#575) (#580)
* Compute length using Seek() * Reduce heap allocations in marshallMeta * Remove outdated comment Co-authored-by: Laurent Saint-Félix <[email protected]> Co-authored-by: Tim Rühsen <[email protected]>
1 parent 03041fc commit 4f1507d

File tree

1 file changed

+21
-31
lines changed

1 file changed

+21
-31
lines changed

esutil/bulk_indexer.go

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"bytes"
2222
"context"
2323
"encoding/json"
24-
"errors"
2524
"fmt"
2625
"io"
2726
"io/ioutil"
@@ -117,18 +116,19 @@ type BulkIndexerItem struct {
117116

118117
// marshallMeta format as JSON the item metadata.
119118
func (item *BulkIndexerItem) marshallMeta() {
120-
var aux []byte
119+
// Pre-allocate a buffer large enough for most use cases.
120+
// 'aux = aux[:0]' resets the length without changing the capacity.
121+
aux := make([]byte, 0, 256)
122+
121123
item.meta.WriteRune('{')
122-
aux = strconv.AppendQuote(aux, item.Action)
123-
item.meta.Write(aux)
124-
aux = nil
124+
item.meta.Write(strconv.AppendQuote(aux, item.Action))
125+
aux = aux[:0]
125126
item.meta.WriteRune(':')
126127
item.meta.WriteRune('{')
127128
if item.DocumentID != "" {
128129
item.meta.WriteString(`"_id":`)
129-
aux = strconv.AppendQuote(aux, item.DocumentID)
130-
item.meta.Write(aux)
131-
aux = nil
130+
item.meta.Write(strconv.AppendQuote(aux, item.DocumentID))
131+
aux = aux[:0]
132132
}
133133

134134
if item.DocumentID != "" && item.Version != nil {
@@ -140,37 +140,33 @@ func (item *BulkIndexerItem) marshallMeta() {
140140
if item.DocumentID != "" && item.VersionType != "" {
141141
item.meta.WriteRune(',')
142142
item.meta.WriteString(`"version_type":`)
143-
aux = strconv.AppendQuote(aux, item.VersionType)
144-
item.meta.Write(aux)
145-
aux = nil
143+
item.meta.Write(strconv.AppendQuote(aux, item.VersionType))
144+
aux = aux[:0]
146145
}
147146

148147
if item.Routing != "" {
149148
if item.DocumentID != "" {
150149
item.meta.WriteRune(',')
151150
}
152151
item.meta.WriteString(`"routing":`)
153-
aux = strconv.AppendQuote(aux, item.Routing)
154-
item.meta.Write(aux)
155-
aux = nil
152+
item.meta.Write(strconv.AppendQuote(aux, item.Routing))
153+
aux = aux[:0]
156154
}
157155
if item.Index != "" {
158156
if item.DocumentID != "" || item.Routing != "" {
159157
item.meta.WriteRune(',')
160158
}
161159
item.meta.WriteString(`"_index":`)
162-
aux = strconv.AppendQuote(aux, item.Index)
163-
item.meta.Write(aux)
164-
aux = nil
160+
item.meta.Write(strconv.AppendQuote(aux, item.Index))
161+
aux = aux[:0]
165162
}
166163
if item.RetryOnConflict != nil && item.Action == "update" {
167164
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
168165
item.meta.WriteString(",")
169166
}
170167
item.meta.WriteString(`"retry_on_conflict":`)
171-
aux = strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10)
172-
item.meta.Write(aux)
173-
aux = nil
168+
item.meta.Write(strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10))
169+
aux = aux[:0]
174170
}
175171
item.meta.WriteRune('}')
176172
item.meta.WriteRune('}')
@@ -180,18 +176,12 @@ func (item *BulkIndexerItem) marshallMeta() {
180176
// computeLength calculate the size of the body and the metadata.
181177
func (item *BulkIndexerItem) computeLength() error {
182178
if item.Body != nil {
183-
// TODO propagate buf len to config to allow for performance gains.
184-
var buf = make([]byte, 1<<4)
185-
for {
186-
n, err := item.Body.Read(buf)
187-
if errors.Is(err, io.EOF) {
188-
break
189-
} else if err != nil {
190-
return err
191-
}
192-
item.payloadLength += n
179+
n, err := item.Body.Seek(0, io.SeekEnd)
180+
if err != nil {
181+
return err
193182
}
194-
_, err := item.Body.Seek(0, io.SeekStart)
183+
item.payloadLength += int(n)
184+
_, err = item.Body.Seek(0, io.SeekStart)
195185
if err != nil {
196186
return err
197187
}

0 commit comments

Comments
 (0)