Skip to content

Commit ac4681f

Browse files
authored
Add BulkIndexerItem.RequireDataStream (#235)
If set to true, require_data_stream=true will be added to the bulk request item action.
1 parent 4402475 commit ac4681f

File tree

3 files changed

+101
-20
lines changed

3 files changed

+101
-20
lines changed

bulk_indexer.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,13 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int {
286286
}
287287

288288
type BulkIndexerItem struct {
289-
Index string
290-
DocumentID string
291-
Pipeline string
292-
Action string
293-
Body io.WriterTo
294-
DynamicTemplates map[string]string
289+
Index string
290+
DocumentID string
291+
Pipeline string
292+
Action string
293+
Body io.WriterTo
294+
DynamicTemplates map[string]string
295+
RequireDataStream bool
295296
}
296297

297298
// Add encodes an item in the buffer.
@@ -307,7 +308,14 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
307308
return fmt.Errorf("%s is not a valid action", action)
308309
}
309310

310-
b.writeMeta(item.Index, item.DocumentID, item.Pipeline, action, item.DynamicTemplates)
311+
b.writeMeta(
312+
item.Index,
313+
item.DocumentID,
314+
item.Pipeline,
315+
action,
316+
item.DynamicTemplates,
317+
item.RequireDataStream,
318+
)
311319
if _, err := item.Body.WriteTo(b.writer); err != nil {
312320
return fmt.Errorf("failed to write bulk indexer item: %w", err)
313321
}
@@ -318,7 +326,11 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
318326
return nil
319327
}
320328

321-
func (b *BulkIndexer) writeMeta(index, documentID, pipeline, action string, dynamicTemplates map[string]string) {
329+
func (b *BulkIndexer) writeMeta(
330+
index, documentID, pipeline, action string,
331+
dynamicTemplates map[string]string,
332+
requireDataStream bool,
333+
) {
322334
b.jsonw.RawString(`{"`)
323335
b.jsonw.RawString(action)
324336
b.jsonw.RawString(`":{`)
@@ -363,6 +375,13 @@ func (b *BulkIndexer) writeMeta(index, documentID, pipeline, action string, dyna
363375
b.jsonw.RawByte('}')
364376
first = false
365377
}
378+
if requireDataStream {
379+
if !first {
380+
b.jsonw.RawByte(',')
381+
}
382+
b.jsonw.RawString(`"require_data_stream":true`)
383+
first = false
384+
}
366385
b.jsonw.RawString("}}\n")
367386
b.writer.Write(b.jsonw.Bytes())
368387
b.jsonw.Reset()

bulk_indexer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"encoding/json"
2323
"net/http"
24+
"strconv"
25+
"strings"
2426
"sync/atomic"
2527
"testing"
2628
"time"
@@ -266,3 +268,30 @@ func TestAction(t *testing.T) {
266268
require.NoError(t, err)
267269
require.Equal(t, int64(3), stat.Indexed)
268270
}
271+
272+
func TestItemRequireDataStream(t *testing.T) {
273+
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
274+
_, meta, result, _ := docappendertest.DecodeBulkRequestWithStatsAndMeta(r)
275+
require.Len(t, meta, 2)
276+
assert.False(t, meta[0].RequireDataStream)
277+
assert.True(t, meta[1].RequireDataStream)
278+
json.NewEncoder(w).Encode(result)
279+
})
280+
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
281+
Client: client,
282+
})
283+
require.NoError(t, err)
284+
285+
for _, required := range []bool{false, true} {
286+
err := indexer.Add(docappender.BulkIndexerItem{
287+
Index: strconv.FormatBool(required),
288+
Body: strings.NewReader(`{}`),
289+
RequireDataStream: required,
290+
})
291+
require.NoError(t, err)
292+
}
293+
294+
stat, err := indexer.Flush(context.Background())
295+
require.NoError(t, err)
296+
require.Equal(t, int64(2), stat.Indexed)
297+
}

docappendertest/docappendertest.go

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ import (
3939
"github.com/elastic/go-elasticsearch/v8/esutil"
4040
)
4141

42+
type BulkRequestItemMeta struct {
43+
Action string `json:"-"`
44+
Index string `json:"_index"`
45+
DocumentID string `json:"_id"`
46+
Pipeline string `json:"pipeline"`
47+
DynamicTemplates map[string]string `json:"dynamic_templates"`
48+
RequireDataStream bool `json:"require_data_stream"`
49+
}
50+
4251
// TimestampFormat holds the time format for formatting timestamps according to
4352
// Elasticsearch's strict_date_optional_time date format, which includes a fractional
4453
// seconds component.
@@ -60,6 +69,18 @@ func DecodeBulkRequestWithStats(r *http.Request) (
6069
return indexed, result, stats
6170
}
6271

72+
// DecodeBulkRequestWithStatsAndMeta decodes a /_bulk request's body,
73+
// returning the decoded bulk request action/meta and documents,
74+
// and a response body and stats about the request.
75+
func DecodeBulkRequestWithStatsAndMeta(r *http.Request) (
76+
docs [][]byte,
77+
meta []BulkRequestItemMeta,
78+
res esutil.BulkIndexerResponse,
79+
stats RequestStats,
80+
) {
81+
return decodeBulkRequest(r)
82+
}
83+
6384
// DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body,
6485
// returning the decoded documents and a response body and stats about request, and per-request dynamic templates.
6586
func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
@@ -79,7 +100,22 @@ func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request)
79100
res esutil.BulkIndexerResponse,
80101
stats RequestStats,
81102
dynamicTemplates []map[string]string,
82-
pipelines []string) {
103+
pipelines []string,
104+
) {
105+
docs, meta, res, stats := decodeBulkRequest(r)
106+
for _, meta := range meta {
107+
dynamicTemplates = append(dynamicTemplates, meta.DynamicTemplates)
108+
pipelines = append(pipelines, meta.Pipeline)
109+
}
110+
return docs, res, stats, dynamicTemplates, pipelines
111+
}
112+
113+
func decodeBulkRequest(r *http.Request) (
114+
docs [][]byte,
115+
meta []BulkRequestItemMeta,
116+
result esutil.BulkIndexerResponse,
117+
stats RequestStats,
118+
) {
83119
body := r.Body
84120
switch r.Header.Get("Content-Encoding") {
85121
case "gzip":
@@ -95,15 +131,10 @@ func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request)
95131
}
96132
body = cr
97133
defer cr.Close()
134+
98135
scanner := bufio.NewScanner(body)
99-
var indexed [][]byte
100-
var result esutil.BulkIndexerResponse
101136
for scanner.Scan() {
102-
action := make(map[string]struct {
103-
Index string `json:"_index"`
104-
DynamicTemplates map[string]string `json:"dynamic_templates"`
105-
Pipeline string `json:"pipeline"`
106-
})
137+
action := make(map[string]BulkRequestItemMeta)
107138
if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil {
108139
panic(err)
109140
}
@@ -118,14 +149,16 @@ func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request)
118149
if !json.Valid(doc) {
119150
panic(fmt.Errorf("invalid JSON: %s", doc))
120151
}
121-
indexed = append(indexed, doc)
152+
docs = append(docs, doc)
122153

123154
item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index}
124155
result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item})
125-
dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates)
126-
pipelines = append(pipelines, action[actionType].Pipeline)
156+
157+
itemMeta := action[actionType]
158+
itemMeta.Action = actionType
159+
meta = append(meta, itemMeta)
127160
}
128-
return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates, pipelines
161+
return docs, meta, result, RequestStats{int64(cr.bytesRead)}
129162
}
130163

131164
// NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.

0 commit comments

Comments
 (0)