Skip to content

Commit 0ae73ad

Browse files
AnaethelionZhuo Chen
andauthored
[BulkIndexer] fix concurrent map error caused when assigned header (#475) (#476)
Co-authored-by: Zhuo Chen <[email protected]>
1 parent 0d2bcf3 commit 0ae73ad

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

esutil/bulk_indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ func (w *worker) flush(ctx context.Context) error {
523523
Human: w.bi.config.Human,
524524
ErrorTrace: w.bi.config.ErrorTrace,
525525
FilterPath: w.bi.config.FilterPath,
526-
Header: w.bi.config.Header,
526+
Header: w.bi.config.Header.Clone(),
527527
}
528528

529529
// Add Header and MetaHeader to config if not already set

esutil/bulk_indexer_internal_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"context"
2626
"encoding/json"
2727
"fmt"
28-
"github.com/elastic/go-elasticsearch/v8/esapi"
2928
"io"
3029
"io/ioutil"
3130
"log"
@@ -42,6 +41,7 @@ import (
4241

4342
"github.com/elastic/elastic-transport-go/v8/elastictransport"
4443
"github.com/elastic/go-elasticsearch/v8"
44+
"github.com/elastic/go-elasticsearch/v8/esapi"
4545
)
4646

4747
var defaultRoundTripFunc = func(*http.Request) (*http.Response, error) {
@@ -808,6 +808,48 @@ func TestBulkIndexer(t *testing.T) {
808808
})
809809
}
810810
})
811+
812+
t.Run("Concurrent Flushing", func(t *testing.T) {
813+
esConfig := elasticsearch.Config{
814+
Transport: &mockTransport{
815+
RoundTripFunc: func(request *http.Request) (*http.Response, error) {
816+
return &http.Response{
817+
StatusCode: http.StatusOK,
818+
Status: "200 OK",
819+
Body: io.NopCloser(bytes.NewBuffer(nil)),
820+
}, nil
821+
},
822+
},
823+
}
824+
825+
client, err := elasticsearch.NewClient(esConfig)
826+
if err != nil {
827+
log.Fatal(err)
828+
}
829+
830+
cfg := BulkIndexerConfig{
831+
NumWorkers: 10,
832+
Client: client,
833+
Header: http.Header{"X-Test": []string{"TestValue"}},
834+
FlushBytes: 1,
835+
}
836+
bi, err := NewBulkIndexer(cfg)
837+
if err != nil {
838+
log.Fatal(err)
839+
}
840+
841+
for i := 0; i < 100; i++ {
842+
err = bi.Add(context.Background(), BulkIndexerItem{
843+
Action: "foo",
844+
DocumentID: strconv.Itoa(1),
845+
Body: strings.NewReader(`{"title":"foo"}`),
846+
})
847+
}
848+
if err != nil {
849+
log.Fatal(err)
850+
}
851+
bi.Close(context.Background())
852+
})
811853
}
812854

813855
type customJSONDecoder struct{}

0 commit comments

Comments
 (0)