Skip to content

Commit 021b92d

Browse files
authored
Allow passing a custom item action (#231)
When manipulating profiles data, we need to be able to use the Update action rather than Create all the time. So this change allows passing custom actions rather than hardcoding create. If no actions is provided, the default value will be create.
1 parent 6cb502c commit 021b92d

File tree

2 files changed

+90
-3
lines changed

2 files changed

+90
-3
lines changed

bulk_indexer.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ import (
4949
// of concurrent bulk requests. This way we can ensure bulk requests have the
5050
// maximum possible size, based on configuration and throughput.
5151

52+
const (
53+
// Actions are all the actions that can be used when indexing data.
54+
// `create` will be used by default.
55+
ActionCreate = "create"
56+
ActionDelete = "delete"
57+
ActionIndex = "index"
58+
ActionUpdate = "update"
59+
)
60+
5261
// BulkIndexerConfig holds configuration for BulkIndexer.
5362
type BulkIndexerConfig struct {
5463
// Client holds the Elasticsearch client.
@@ -280,13 +289,25 @@ type BulkIndexerItem struct {
280289
Index string
281290
DocumentID string
282291
Pipeline string
292+
Action string
283293
Body io.WriterTo
284294
DynamicTemplates map[string]string
285295
}
286296

287297
// Add encodes an item in the buffer.
288298
func (b *BulkIndexer) Add(item BulkIndexerItem) error {
289-
b.writeMeta(item.Index, item.DocumentID, item.Pipeline, item.DynamicTemplates)
299+
action := item.Action
300+
if action == "" {
301+
action = ActionCreate
302+
}
303+
304+
switch action {
305+
case ActionCreate, ActionDelete, ActionIndex, ActionUpdate:
306+
default:
307+
return fmt.Errorf("%s is not a valid action", action)
308+
}
309+
310+
b.writeMeta(item.Index, item.DocumentID, item.Pipeline, action, item.DynamicTemplates)
290311
if _, err := item.Body.WriteTo(b.writer); err != nil {
291312
return fmt.Errorf("failed to write bulk indexer item: %w", err)
292313
}
@@ -297,8 +318,11 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
297318
return nil
298319
}
299320

300-
func (b *BulkIndexer) writeMeta(index, documentID, pipeline string, dynamicTemplates map[string]string) {
301-
b.jsonw.RawString(`{"create":{`)
321+
func (b *BulkIndexer) writeMeta(index, documentID, pipeline, action string, dynamicTemplates map[string]string) {
322+
b.jsonw.RawString(`{"`)
323+
b.jsonw.RawString(action)
324+
b.jsonw.RawString(`":{`)
325+
302326
first := true
303327
if documentID != "" {
304328
b.jsonw.RawString(`"_id":`)

bulk_indexer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/elastic/go-docappender/v2"
3131
"github.com/elastic/go-docappender/v2/docappendertest"
32+
"github.com/stretchr/testify/assert"
3233
"github.com/stretchr/testify/require"
3334
)
3435

@@ -203,3 +204,65 @@ func TestPipeline(t *testing.T) {
203204
require.NoError(t, err)
204205
require.Equal(t, int64(2), stat.Indexed)
205206
}
207+
208+
func TestAction(t *testing.T) {
209+
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
210+
_, result := docappendertest.DecodeBulkRequest(r)
211+
err := json.NewEncoder(w).Encode(result)
212+
require.NoError(t, err)
213+
214+
actions := []string{}
215+
for _, itemsMap := range result.Items {
216+
for a := range itemsMap {
217+
actions = append(actions, a)
218+
}
219+
}
220+
221+
require.Equal(t, []string{"create", "update", "delete"}, actions)
222+
})
223+
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
224+
Client: client,
225+
})
226+
require.NoError(t, err)
227+
228+
err = indexer.Add(docappender.BulkIndexerItem{
229+
Index: "testidx",
230+
Body: newJSONReader(map[string]any{
231+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
232+
}),
233+
})
234+
require.NoError(t, err)
235+
236+
err = indexer.Add(docappender.BulkIndexerItem{
237+
Index: "testidx",
238+
Action: "update",
239+
Body: newJSONReader(map[string]any{
240+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
241+
}),
242+
})
243+
require.NoError(t, err)
244+
245+
err = indexer.Add(docappender.BulkIndexerItem{
246+
Index: "testidx",
247+
Action: "delete",
248+
Pipeline: "test-pipeline2",
249+
Body: newJSONReader(map[string]any{
250+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
251+
}),
252+
})
253+
require.NoError(t, err)
254+
255+
err = indexer.Add(docappender.BulkIndexerItem{
256+
Index: "testidx",
257+
Action: "foobar",
258+
Pipeline: "test-pipeline2",
259+
Body: newJSONReader(map[string]any{
260+
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
261+
}),
262+
})
263+
assert.Error(t, err)
264+
265+
stat, err := indexer.Flush(context.Background())
266+
require.NoError(t, err)
267+
require.Equal(t, int64(3), stat.Indexed)
268+
}

0 commit comments

Comments
 (0)