Skip to content

Commit 76a1c93

Browse files
authored
Revert inadequate backports #559 #565 #580 (#613)
1 parent 4f1507d commit 76a1c93

File tree

3 files changed

+85
-193
lines changed

3 files changed

+85
-193
lines changed

esutil/bulk_indexer.go

Lines changed: 80 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
)
3838

3939
// BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
40+
//
4041
type BulkIndexer interface {
4142
// Add adds an item to the indexer. It returns an error when the item cannot be added.
4243
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
@@ -55,6 +56,7 @@ type BulkIndexer interface {
5556
}
5657

5758
// BulkIndexerConfig represents configuration of the indexer.
59+
//
5860
type BulkIndexerConfig struct {
5961
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
6062
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
@@ -86,6 +88,7 @@ type BulkIndexerConfig struct {
8688
}
8789

8890
// BulkIndexerStats represents the indexer statistics.
91+
//
8992
type BulkIndexerStats struct {
9093
NumAdded uint64
9194
NumFlushed uint64
@@ -98,6 +101,7 @@ type BulkIndexerStats struct {
98101
}
99102

100103
// BulkIndexerItem represents an indexer item.
104+
//
101105
type BulkIndexerItem struct {
102106
Index string
103107
Action string
@@ -107,100 +111,21 @@ type BulkIndexerItem struct {
107111
VersionType string
108112
Body io.Reader
109113
RetryOnConflict *int
110-
meta bytes.Buffer // Item metadata header
111-
payloadLength int // Item payload total length metadata+newline+body length
112114

113115
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
114116
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
115117
}
116118

117-
// marshallMeta format as JSON the item metadata.
118-
func (item *BulkIndexerItem) marshallMeta() {
119-
// Pre-allocate a buffer large enough for most use cases.
120-
// 'aux = aux[:0]' resets the length without changing the capacity.
121-
aux := make([]byte, 0, 256)
122-
123-
item.meta.WriteRune('{')
124-
item.meta.Write(strconv.AppendQuote(aux, item.Action))
125-
aux = aux[:0]
126-
item.meta.WriteRune(':')
127-
item.meta.WriteRune('{')
128-
if item.DocumentID != "" {
129-
item.meta.WriteString(`"_id":`)
130-
item.meta.Write(strconv.AppendQuote(aux, item.DocumentID))
131-
aux = aux[:0]
132-
}
133-
134-
if item.DocumentID != "" && item.Version != nil {
135-
item.meta.WriteRune(',')
136-
item.meta.WriteString(`"version":`)
137-
item.meta.WriteString(strconv.FormatInt(*item.Version, 10))
138-
}
139-
140-
if item.DocumentID != "" && item.VersionType != "" {
141-
item.meta.WriteRune(',')
142-
item.meta.WriteString(`"version_type":`)
143-
item.meta.Write(strconv.AppendQuote(aux, item.VersionType))
144-
aux = aux[:0]
145-
}
146-
147-
if item.Routing != "" {
148-
if item.DocumentID != "" {
149-
item.meta.WriteRune(',')
150-
}
151-
item.meta.WriteString(`"routing":`)
152-
item.meta.Write(strconv.AppendQuote(aux, item.Routing))
153-
aux = aux[:0]
154-
}
155-
if item.Index != "" {
156-
if item.DocumentID != "" || item.Routing != "" {
157-
item.meta.WriteRune(',')
158-
}
159-
item.meta.WriteString(`"_index":`)
160-
item.meta.Write(strconv.AppendQuote(aux, item.Index))
161-
aux = aux[:0]
162-
}
163-
if item.RetryOnConflict != nil && item.Action == "update" {
164-
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
165-
item.meta.WriteString(",")
166-
}
167-
item.meta.WriteString(`"retry_on_conflict":`)
168-
item.meta.Write(strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10))
169-
aux = aux[:0]
170-
}
171-
item.meta.WriteRune('}')
172-
item.meta.WriteRune('}')
173-
item.meta.WriteRune('\n')
174-
}
175-
176-
// computeLength calculate the size of the body and the metadata.
177-
func (item *BulkIndexerItem) computeLength() error {
178-
if item.Body != nil {
179-
n, err := item.Body.Seek(0, io.SeekEnd)
180-
if err != nil {
181-
return err
182-
}
183-
item.payloadLength += int(n)
184-
_, err = item.Body.Seek(0, io.SeekStart)
185-
if err != nil {
186-
return err
187-
}
188-
}
189-
item.payloadLength += len(item.meta.Bytes())
190-
// Add one byte to account for newline at the end of payload.
191-
item.payloadLength++
192-
193-
return nil
194-
}
195-
196119
// BulkIndexerResponse represents the Elasticsearch response.
120+
//
197121
type BulkIndexerResponse struct {
198122
Took int `json:"took"`
199123
HasErrors bool `json:"errors"`
200124
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
201125
}
202126

203127
// BulkIndexerResponseItem represents the Elasticsearch response item.
128+
//
204129
type BulkIndexerResponseItem struct {
205130
Index string `json:"_index"`
206131
DocumentID string `json:"_id"`
@@ -227,11 +152,13 @@ type BulkIndexerResponseItem struct {
227152
}
228153

229154
// BulkResponseJSONDecoder defines the interface for custom JSON decoders.
155+
//
230156
type BulkResponseJSONDecoder interface {
231157
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
232158
}
233159

234160
// BulkIndexerDebugLogger defines the interface for a debugging logger.
161+
//
235162
type BulkIndexerDebugLogger interface {
236163
Printf(string, ...interface{})
237164
}
@@ -259,6 +186,7 @@ type bulkIndexerStats struct {
259186
}
260187

261188
// NewBulkIndexer creates a new bulk indexer.
189+
//
262190
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
263191
if cfg.Client == nil {
264192
cfg.Client, _ = elasticsearch.NewDefaultClient()
@@ -294,16 +222,10 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
294222
// Add adds an item to the indexer.
295223
//
296224
// Adding an item after a call to Close() will panic.
225+
//
297226
func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
298227
atomic.AddUint64(&bi.stats.numAdded, 1)
299228

300-
// Serialize metadata to JSON
301-
item.marshallMeta()
302-
// Compute length for body & metadata
303-
if err := item.computeLength(); err != nil {
304-
return err
305-
}
306-
307229
select {
308230
case <-ctx.Done():
309231
if bi.config.OnError != nil {
@@ -318,6 +240,7 @@ func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
318240

319241
// Close stops the periodic flush, closes the indexer queue channel,
320242
// notifies the done channel and calls flush on all writers.
243+
//
321244
func (bi *bulkIndexer) Close(ctx context.Context) error {
322245
bi.ticker.Stop()
323246
close(bi.queue)
@@ -350,6 +273,7 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
350273
}
351274

352275
// Stats returns indexer statistics.
276+
//
353277
func (bi *bulkIndexer) Stats() BulkIndexerStats {
354278
return BulkIndexerStats{
355279
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
@@ -364,6 +288,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats {
364288
}
365289

366290
// init initializes the bulk indexer.
291+
//
367292
func (bi *bulkIndexer) init() {
368293
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
369294

@@ -409,6 +334,7 @@ func (bi *bulkIndexer) init() {
409334
}
410335

411336
// worker represents an indexer worker.
337+
//
412338
type worker struct {
413339
id int
414340
ch <-chan BulkIndexerItem
@@ -420,6 +346,7 @@ type worker struct {
420346
}
421347

422348
// run launches the worker in a goroutine.
349+
//
423350
func (w *worker) run() {
424351
go func() {
425352
ctx := context.Background()
@@ -436,18 +363,7 @@ func (w *worker) run() {
436363
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
437364
}
438365

439-
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
440-
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
441-
if err := w.flush(ctx); err != nil {
442-
w.mu.Unlock()
443-
if w.bi.config.OnError != nil {
444-
w.bi.config.OnError(ctx, err)
445-
}
446-
continue
447-
}
448-
}
449-
450-
if err := w.writeMeta(&item); err != nil {
366+
if err := w.writeMeta(item); err != nil {
451367
if item.OnFailure != nil {
452368
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
453369
}
@@ -466,11 +382,7 @@ func (w *worker) run() {
466382
}
467383

468384
w.items = append(w.items, item)
469-
// Should the item payload exceed the configured FlushBytes flush happens instantly.
470-
if oversizePayload {
471-
if w.bi.config.DebugLogger != nil {
472-
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
473-
}
385+
if w.buf.Len() >= w.bi.config.FlushBytes {
474386
if err := w.flush(ctx); err != nil {
475387
w.mu.Unlock()
476388
if w.bi.config.OnError != nil {
@@ -484,15 +396,71 @@ func (w *worker) run() {
484396
}()
485397
}
486398

487-
// writeMeta writes the item metadata to the buffer; it must be called under a lock.
488-
func (w *worker) writeMeta(item *BulkIndexerItem) error {
489-
if _, err := w.buf.Write(item.meta.Bytes()); err != nil {
490-
return err
399+
// writeMeta formats and writes the item metadata to the buffer; it must be called under a lock.
400+
//
401+
func (w *worker) writeMeta(item BulkIndexerItem) error {
402+
w.buf.WriteRune('{')
403+
w.aux = strconv.AppendQuote(w.aux, item.Action)
404+
w.buf.Write(w.aux)
405+
w.aux = w.aux[:0]
406+
w.buf.WriteRune(':')
407+
w.buf.WriteRune('{')
408+
if item.DocumentID != "" {
409+
w.buf.WriteString(`"_id":`)
410+
w.aux = strconv.AppendQuote(w.aux, item.DocumentID)
411+
w.buf.Write(w.aux)
412+
w.aux = w.aux[:0]
413+
}
414+
415+
if item.DocumentID != "" && item.Version != nil {
416+
w.buf.WriteRune(',')
417+
w.buf.WriteString(`"version":`)
418+
w.buf.WriteString(strconv.FormatInt(*item.Version, 10))
419+
}
420+
421+
if item.DocumentID != "" && item.VersionType != "" {
422+
w.buf.WriteRune(',')
423+
w.buf.WriteString(`"version_type":`)
424+
w.aux = strconv.AppendQuote(w.aux, item.VersionType)
425+
w.buf.Write(w.aux)
426+
w.aux = w.aux[:0]
491427
}
428+
429+
if item.Routing != "" {
430+
if item.DocumentID != "" {
431+
w.buf.WriteRune(',')
432+
}
433+
w.buf.WriteString(`"routing":`)
434+
w.aux = strconv.AppendQuote(w.aux, item.Routing)
435+
w.buf.Write(w.aux)
436+
w.aux = w.aux[:0]
437+
}
438+
if item.Index != "" {
439+
if item.DocumentID != "" || item.Routing != "" {
440+
w.buf.WriteRune(',')
441+
}
442+
w.buf.WriteString(`"_index":`)
443+
w.aux = strconv.AppendQuote(w.aux, item.Index)
444+
w.buf.Write(w.aux)
445+
w.aux = w.aux[:0]
446+
}
447+
if item.RetryOnConflict != nil && item.Action == "update" {
448+
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
449+
w.buf.WriteString(",")
450+
}
451+
w.buf.WriteString(`"retry_on_conflict":`)
452+
w.aux = strconv.AppendInt(w.aux, int64(*item.RetryOnConflict), 10)
453+
w.buf.Write(w.aux)
454+
w.aux = w.aux[:0]
455+
}
456+
w.buf.WriteRune('}')
457+
w.buf.WriteRune('}')
458+
w.buf.WriteRune('\n')
492459
return nil
493460
}
494461

495462
// writeBody writes the item body to the buffer; it must be called under a lock.
463+
//
496464
func (w *worker) writeBody(item *BulkIndexerItem) error {
497465
if item.Body != nil {
498466

@@ -524,6 +492,7 @@ func (w *worker) writeBody(item *BulkIndexerItem) error {
524492
}
525493

526494
// flush writes out the worker buffer; it must be called under a lock.
495+
//
527496
func (w *worker) flush(ctx context.Context) error {
528497
if w.bi.config.OnFlushStart != nil {
529498
ctx = w.bi.config.OnFlushStart(ctx)
@@ -546,12 +515,8 @@ func (w *worker) flush(ctx context.Context) error {
546515
)
547516

548517
defer func() {
549-
w.items = nil
550-
if w.buf.Cap() > w.bi.config.FlushBytes {
551-
w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
552-
} else {
553-
w.buf.Reset()
554-
}
518+
w.items = w.items[:0]
519+
w.buf.Reset()
555520
}()
556521

557522
if w.bi.config.DebugLogger != nil {

esutil/bulk_indexer_integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
7272
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
7373
Index: indexName,
7474
Client: es,
75+
// FlushBytes: 3e+6,
7576
})
7677

7778
numItems := 100000

0 commit comments

Comments
 (0)