Skip to content

Commit a958762

Browse files
kiivihalkarmi
authored andcommitted
Util: Fix for writeMeta in BulkIndexer helper
This fix adds '_index' to meta when BulkIndexerItem.Index is not empty. Currently, this value was ignored which leads to index errors when the default index in BulkIndexerConfig.Index is empty. Closes #146 (cherry picked from commit fc7f75e)
1 parent fa22e79 commit a958762

File tree

3 files changed

+136
-1
lines changed

3 files changed

+136
-1
lines changed

esutil/bulk_indexer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,15 @@ func (w *worker) writeMeta(item BulkIndexerItem) error {
393393
w.buf.Write(w.aux)
394394
w.aux = w.aux[:0]
395395
}
396+
if item.Index != "" {
397+
if item.DocumentID != "" {
398+
w.buf.WriteRune(',')
399+
}
400+
w.buf.WriteString(`"_index":`)
401+
w.aux = strconv.AppendQuote(w.aux, item.Index)
402+
w.buf.Write(w.aux)
403+
w.aux = w.aux[:0]
404+
}
396405
w.buf.WriteRune('}')
397406
w.buf.WriteRune('}')
398407
w.buf.WriteRune('\n')

esutil/bulk_indexer_integration_test.go

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
)
2323

2424
func TestBulkIndexerIntegration(t *testing.T) {
25+
body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}`
26+
2527
t.Run("Default", func(t *testing.T) {
2628
var countSuccessful uint64
2729
indexName := "test-bulk-integration"
@@ -43,7 +45,6 @@ func TestBulkIndexerIntegration(t *testing.T) {
4345
})
4446

4547
numItems := 100000
46-
body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}`
4748
start := time.Now().UTC()
4849

4950
for i := 1; i <= numItems; i++ {
@@ -90,4 +91,69 @@ func TestBulkIndexerIntegration(t *testing.T) {
9091
time.Since(start).Truncate(time.Millisecond),
9192
1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed))
9293
})
94+
95+
t.Run("Multiple indices", func(t *testing.T) {
96+
es, _ := elasticsearch.NewClient(elasticsearch.Config{
97+
Logger: &estransport.ColorLogger{Output: os.Stdout},
98+
})
99+
100+
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
101+
Index: "test-index-a",
102+
Client: es,
103+
})
104+
105+
// Default index
106+
for i := 1; i <= 10; i++ {
107+
err := bi.Add(context.Background(), esutil.BulkIndexerItem{
108+
Action: "index",
109+
DocumentID: strconv.Itoa(i),
110+
Body: strings.NewReader(body),
111+
})
112+
if err != nil {
113+
t.Fatalf("Unexpected error: %s", err)
114+
}
115+
}
116+
117+
// Index 1
118+
for i := 1; i <= 10; i++ {
119+
err := bi.Add(context.Background(), esutil.BulkIndexerItem{
120+
Action: "index",
121+
Index: "test-index-b",
122+
Body: strings.NewReader(body),
123+
})
124+
if err != nil {
125+
t.Fatalf("Unexpected error: %s", err)
126+
}
127+
}
128+
129+
// Index 2
130+
for i := 1; i <= 10; i++ {
131+
err := bi.Add(context.Background(), esutil.BulkIndexerItem{
132+
Action: "index",
133+
Index: "test-index-c",
134+
Body: strings.NewReader(body),
135+
})
136+
if err != nil {
137+
t.Fatalf("Unexpected error: %s", err)
138+
}
139+
}
140+
141+
if err := bi.Close(context.Background()); err != nil {
142+
t.Errorf("Unexpected error: %s", err)
143+
}
144+
stats := bi.Stats()
145+
146+
expectedIndexed := 10 + 10 + 10
147+
if stats.NumIndexed != uint64(expectedIndexed) {
148+
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", expectedIndexed, stats.NumIndexed)
149+
}
150+
151+
res, err := es.Indices.Exists([]string{"test-index-a", "test-index-b", "test-index-c"})
152+
if err != nil {
153+
t.Fatalf("Unexpected error: %s", err)
154+
}
155+
if res.StatusCode != 200 {
156+
t.Errorf("Expected indices to exist, but got a [%s] response", res.Status())
157+
}
158+
})
93159
}

esutil/bulk_indexer_internal_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,66 @@ func TestBulkIndexer(t *testing.T) {
544544
t.Errorf("Unexpected NumAdded: %d", stats.NumAdded)
545545
}
546546
})
547+
548+
t.Run("Worker.writeMeta()", func(t *testing.T) {
549+
type args struct {
550+
item BulkIndexerItem
551+
}
552+
tests := []struct {
553+
name string
554+
args args
555+
want string
556+
}{
557+
{
558+
"without _index and _id",
559+
args{BulkIndexerItem{Action: "index"}},
560+
`{"index":{}}` + "\n",
561+
},
562+
{
563+
"with _id",
564+
args{BulkIndexerItem{
565+
Action: "index",
566+
DocumentID: "42",
567+
}},
568+
`{"index":{"_id":"42"}}` + "\n",
569+
},
570+
{
571+
"with _index",
572+
args{BulkIndexerItem{
573+
Action: "index",
574+
Index: "test",
575+
}},
576+
`{"index":{"_index":"test"}}` + "\n",
577+
},
578+
{
579+
"with _index and _id",
580+
args{BulkIndexerItem{
581+
Action: "index",
582+
DocumentID: "42",
583+
Index: "test",
584+
}},
585+
`{"index":{"_id":"42","_index":"test"}}` + "\n",
586+
},
587+
}
588+
for _, tt := range tests {
589+
tt := tt
590+
591+
t.Run(tt.name, func(t *testing.T) {
592+
w := &worker{
593+
buf: bytes.NewBuffer(make([]byte, 0, 5e+6)),
594+
aux: make([]byte, 0, 512),
595+
}
596+
if err := w.writeMeta(tt.args.item); err != nil {
597+
t.Errorf("Unexpected error: %v", err)
598+
}
599+
600+
if w.buf.String() != tt.want {
601+
t.Errorf("worker.writeMeta() %s = got [%s], want [%s]", tt.name, w.buf.String(), tt.want)
602+
}
603+
604+
})
605+
}
606+
})
547607
}
548608

549609
type customJSONDecoder struct{}

0 commit comments

Comments
 (0)