Skip to content

Commit 3e17944

Browse files
authored
Add dynamic templates to bulk indexer item (#189)
Make bulk request accept dynamic templates by adding DynamicTemplates field to bulk indexer item. This is only added to bulk indexer, not appender.
1 parent b180c1b commit 3e17944

File tree

3 files changed

+80
-8
lines changed

3 files changed

+80
-8
lines changed

bulk_indexer.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,15 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int {
270270
}
271271

272272
type BulkIndexerItem struct {
273-
Index string
274-
DocumentID string
275-
Body io.WriterTo
273+
Index string
274+
DocumentID string
275+
Body io.WriterTo
276+
DynamicTemplates map[string]string
276277
}
277278

278279
// Add encodes an item in the buffer.
279280
func (b *BulkIndexer) Add(item BulkIndexerItem) error {
280-
b.writeMeta(item.Index, item.DocumentID)
281+
b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates)
281282
if _, err := item.Body.WriteTo(b.writer); err != nil {
282283
return fmt.Errorf("failed to write bulk indexer item: %w", err)
283284
}
@@ -288,18 +289,39 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
288289
return nil
289290
}
290291

291-
func (b *BulkIndexer) writeMeta(index, documentID string) {
292+
func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) {
292293
b.jsonw.RawString(`{"create":{`)
294+
first := true
293295
if documentID != "" {
294296
b.jsonw.RawString(`"_id":`)
295297
b.jsonw.String(documentID)
298+
first = false
296299
}
297300
if index != "" {
298-
if documentID != "" {
301+
if !first {
299302
b.jsonw.RawByte(',')
300303
}
301304
b.jsonw.RawString(`"_index":`)
302305
b.jsonw.String(index)
306+
first = false
307+
}
308+
if len(dynamicTemplates) > 0 {
309+
if !first {
310+
b.jsonw.RawByte(',')
311+
}
312+
b.jsonw.RawString(`"dynamic_templates":{`)
313+
firstDynamicTemplate := true
314+
for k, v := range dynamicTemplates {
315+
if !firstDynamicTemplate {
316+
b.jsonw.RawByte(',')
317+
}
318+
b.jsonw.String(k)
319+
b.jsonw.RawByte(':')
320+
b.jsonw.String(v)
321+
firstDynamicTemplate = false
322+
}
323+
b.jsonw.RawByte('}')
324+
first = false
303325
}
304326
b.jsonw.RawString("}}\n")
305327
b.writer.Write(b.jsonw.Bytes())

bulk_indexer_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,40 @@ func TestBulkIndexer(t *testing.T) {
128128
})
129129
}
130130
}
131+
132+
func TestDynamicTemplates(t *testing.T) {
133+
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
134+
_, result, _, dynamicTemplates := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplates(r)
135+
require.Equal(t, []map[string]string{
136+
{"one": "two", "three": "four"},
137+
{"five": "six", "seven": "eight"},
138+
}, dynamicTemplates)
139+
json.NewEncoder(w).Encode(result)
140+
})
141+
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
142+
Client: client,
143+
})
144+
require.NoError(t, err)
145+
146+
err = indexer.Add(docappender.BulkIndexerItem{
147+
Index: "testidx",
148+
Body: newJSONReader(map[string]any{
149+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
150+
}),
151+
DynamicTemplates: map[string]string{"one": "two", "three": "four"},
152+
})
153+
require.NoError(t, err)
154+
155+
err = indexer.Add(docappender.BulkIndexerItem{
156+
Index: "testidx",
157+
Body: newJSONReader(map[string]any{
158+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
159+
}),
160+
DynamicTemplates: map[string]string{"five": "six", "seven": "eight"},
161+
})
162+
require.NoError(t, err)
163+
164+
stat, err := indexer.Flush(context.Background())
165+
require.NoError(t, err)
166+
require.Equal(t, int64(2), stat.Indexed)
167+
}

docappendertest/docappendertest.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ func DecodeBulkRequestWithStats(r *http.Request) (
5656
docs [][]byte,
5757
res esutil.BulkIndexerResponse,
5858
stats RequestStats) {
59+
indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r)
60+
return indexed, result, stats
61+
}
62+
63+
// DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body,
64+
// returning the decoded documents and a response body and stats about request, and per-request dynamic templates.
65+
func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
66+
docs [][]byte,
67+
res esutil.BulkIndexerResponse,
68+
stats RequestStats,
69+
dynamicTemplates []map[string]string) {
5970
body := r.Body
6071
switch r.Header.Get("Content-Encoding") {
6172
case "gzip":
@@ -76,7 +87,8 @@ func DecodeBulkRequestWithStats(r *http.Request) (
7687
var result esutil.BulkIndexerResponse
7788
for scanner.Scan() {
7889
action := make(map[string]struct {
79-
Index string `json:"_index"`
90+
Index string `json:"_index"`
91+
DynamicTemplates map[string]string `json:"dynamic_templates"`
8092
})
8193
if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil {
8294
panic(err)
@@ -96,8 +108,9 @@ func DecodeBulkRequestWithStats(r *http.Request) (
96108

97109
item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index}
98110
result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item})
111+
dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates)
99112
}
100-
return indexed, result, RequestStats{int64(cr.bytesRead)}
113+
return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates
101114
}
102115

103116
// NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.

0 commit comments

Comments
 (0)