Skip to content

Commit 95f67c5

Browse files
authored
Add per event pipeline (#227)
Adds pipeline to the BulkIndexItem. This will be necessary to implement open-telemetry/opentelemetry-collector-contrib#37419
1 parent 0f0f9b9 commit 95f67c5

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

bulk_indexer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,14 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int {
279279
type BulkIndexerItem struct {
280280
Index string
281281
DocumentID string
282+
Pipeline string
282283
Body io.WriterTo
283284
DynamicTemplates map[string]string
284285
}
285286

286287
// Add encodes an item in the buffer.
287288
func (b *BulkIndexer) Add(item BulkIndexerItem) error {
288-
b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates)
289+
b.writeMeta(item.Index, item.DocumentID, item.Pipeline, item.DynamicTemplates)
289290
if _, err := item.Body.WriteTo(b.writer); err != nil {
290291
return fmt.Errorf("failed to write bulk indexer item: %w", err)
291292
}
@@ -296,7 +297,7 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
296297
return nil
297298
}
298299

299-
func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) {
300+
func (b *BulkIndexer) writeMeta(index, documentID, pipeline string, dynamicTemplates map[string]string) {
300301
b.jsonw.RawString(`{"create":{`)
301302
first := true
302303
if documentID != "" {
@@ -312,6 +313,14 @@ func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[s
312313
b.jsonw.String(index)
313314
first = false
314315
}
316+
if pipeline != "" {
317+
if !first {
318+
b.jsonw.RawByte(',')
319+
}
320+
b.jsonw.RawString(`"pipeline":`)
321+
b.jsonw.String(pipeline)
322+
first = false
323+
}
315324
if len(dynamicTemplates) > 0 {
316325
if !first {
317326
b.jsonw.RawByte(',')

bulk_indexer_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,41 @@ func TestDynamicTemplates(t *testing.T) {
165165
require.NoError(t, err)
166166
require.Equal(t, int64(2), stat.Indexed)
167167
}
168+
169+
func TestPipeline(t *testing.T) {
170+
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
171+
_, result, _, _, pipelines := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r)
172+
err := json.NewEncoder(w).Encode(result)
173+
require.NoError(t, err)
174+
for _, p := range pipelines {
175+
require.Contains(t, p, "test-pipeline", "test-pipeline should have been present")
176+
}
177+
require.Equal(t, 2, len(pipelines), "2 pipelines should have been returned")
178+
})
179+
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
180+
Client: client,
181+
})
182+
require.NoError(t, err)
183+
184+
err = indexer.Add(docappender.BulkIndexerItem{
185+
Index: "testidx",
186+
Pipeline: "test-pipeline1",
187+
Body: newJSONReader(map[string]any{
188+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
189+
}),
190+
})
191+
require.NoError(t, err)
192+
193+
err = indexer.Add(docappender.BulkIndexerItem{
194+
Index: "testidx",
195+
Pipeline: "test-pipeline2",
196+
Body: newJSONReader(map[string]any{
197+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
198+
}),
199+
})
200+
require.NoError(t, err)
201+
202+
stat, err := indexer.Flush(context.Background())
203+
require.NoError(t, err)
204+
require.Equal(t, int64(2), stat.Indexed)
205+
}

docappendertest/docappendertest.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
6767
res esutil.BulkIndexerResponse,
6868
stats RequestStats,
6969
dynamicTemplates []map[string]string) {
70+
71+
indexed, result, stats, dynamicTemplates, _ := DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r)
72+
return indexed, result, stats, dynamicTemplates
73+
}
74+
75+
// DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines decodes a /_bulk request's body,
76+
// returning the decoded documents and a response body and stats about request, per-request dynamic templates and pipelines specified in the event.
77+
func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request) (
78+
docs [][]byte,
79+
res esutil.BulkIndexerResponse,
80+
stats RequestStats,
81+
dynamicTemplates []map[string]string,
82+
pipelines []string) {
7083
body := r.Body
7184
switch r.Header.Get("Content-Encoding") {
7285
case "gzip":
@@ -89,6 +102,7 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
89102
action := make(map[string]struct {
90103
Index string `json:"_index"`
91104
DynamicTemplates map[string]string `json:"dynamic_templates"`
105+
Pipeline string `json:"pipeline"`
92106
})
93107
if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil {
94108
panic(err)
@@ -109,8 +123,9 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
109123
item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index}
110124
result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item})
111125
dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates)
126+
pipelines = append(pipelines, action[actionType].Pipeline)
112127
}
113-
return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates
128+
return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates, pipelines
114129
}
115130

116131
// NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.

0 commit comments

Comments
 (0)