Skip to content

Commit 1a07a14

Browse files
Update ring DynamoDB Client to write in a transaction when batching (#6987)
* Update ring DynamoDB Client to write in a transaction when batching Signed-off-by: Anna Tran <[email protected]> * Retry on DDB KV batch error for conditional check failed Signed-off-by: Anna Tran <[email protected]> * Use int64 as type for DDB KV item version Signed-off-by: Anna Tran <[email protected]> * Default version 0 if instance joining DDB ring for the first time Signed-off-by: Anna Tran <[email protected]> * Add test verifying retry on DDB ConditionalCheckFailedException Signed-off-by: Anna Tran <[email protected]> * Add metrics and logs to track conditional check failures Signed-off-by: Anna Tran <[email protected]> --------- Signed-off-by: Anna Tran <[email protected]> Signed-off-by: Daniel Blando <[email protected]> Co-authored-by: Daniel Blando <[email protected]>
1 parent a022332 commit 1a07a14

File tree

6 files changed

+276
-172
lines changed

6 files changed

+276
-172
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
7474
* [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950
7575
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
76+
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
7677
* [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947
7778
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
7879
* [BUGFIX] Ingester: Fix labelset data race condition. #6573

pkg/ring/kv/dynamodb/client.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,16 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
185185
continue
186186
}
187187

188-
putRequests := map[dynamodbKey][]byte{}
188+
putRequests := map[dynamodbKey]dynamodbItem{}
189189
for childKey, bytes := range buf {
190-
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes
190+
version := int64(0)
191+
if ddbItem, ok := resp[childKey]; ok {
192+
version = ddbItem.version
193+
}
194+
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{
195+
data: bytes,
196+
version: version,
197+
}
191198
}
192199

193200
deleteRequests := make([]dynamodbKey, 0, len(toDelete))
@@ -196,9 +203,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
196203
}
197204

198205
if len(putRequests) > 0 || len(deleteRequests) > 0 {
199-
err = c.kv.Batch(ctx, putRequests, deleteRequests)
206+
retry, err := c.kv.Batch(ctx, putRequests, deleteRequests)
200207
if err != nil {
201-
return err
208+
if !retry {
209+
return err
210+
}
211+
bo.Wait()
212+
continue
202213
}
203214
c.updateStaleData(key, r, time.Now().UTC())
204215
return nil
@@ -273,8 +284,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
273284
continue
274285
}
275286

276-
for key, bytes := range out {
277-
decoded, err := c.codec.Decode(bytes)
287+
for key, ddbItem := range out {
288+
decoded, err := c.codec.Decode(ddbItem.data)
278289
if err != nil {
279290
level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
280291
continue
@@ -293,8 +304,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
293304
}
294305
}
295306

296-
func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) {
297-
res, err := c.codec.DecodeMultiKey(data)
307+
func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) {
308+
multiKeyData := make(map[string][]byte, len(data))
309+
for key, ddbItem := range data {
310+
multiKeyData[key] = ddbItem.data
311+
}
312+
res, err := c.codec.DecodeMultiKey(multiKeyData)
298313
if err != nil {
299314
return nil, err
300315
}

pkg/ring/kv/dynamodb/client_test.go

Lines changed: 91 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
3434
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
3535
expectedErr := errors.Errorf("test")
3636

37-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
37+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
3838
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
3939
descMock.On("Clone").Return(descMock).Once()
4040

@@ -46,25 +46,60 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
4646
}
4747

4848
func Test_CAS_Backoff(t *testing.T) {
49-
ddbMock := NewDynamodbClientMock()
50-
codecMock := &CodecMock{}
51-
descMock := &DescMock{}
52-
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
53-
expectedErr := errors.Errorf("test")
49+
testCases := []struct {
50+
name string
51+
setupMocks func(*MockDynamodbClient, *CodecMock, *DescMock, map[dynamodbKey]dynamodbItem, []dynamodbKey)
52+
expectedQueryCalls int
53+
expectedBatchCalls int
54+
}{
55+
{
56+
name: "query_fails_and_backs_off",
57+
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
58+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")).Once()
59+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
60+
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
61+
},
62+
expectedQueryCalls: 2,
63+
expectedBatchCalls: 1,
64+
},
65+
{
66+
name: "batch_fails_and_backs_off",
67+
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
68+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Twice()
69+
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(true, errors.Errorf("batch failed")).Once()
70+
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
71+
},
72+
expectedQueryCalls: 2,
73+
expectedBatchCalls: 2,
74+
},
75+
}
5476

55-
ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once()
56-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
57-
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once()
58-
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
59-
descMock.On("Clone").Return(descMock).Once()
60-
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once()
61-
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Twice()
77+
for _, tc := range testCases {
78+
t.Run(tc.name, func(t *testing.T) {
79+
ddbMock := NewDynamodbClientMock()
80+
codecMock := &CodecMock{}
81+
descMock := &DescMock{}
82+
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
6283

63-
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
64-
return descMock, true, nil
65-
})
84+
expectedBatch := map[dynamodbKey]dynamodbItem{}
85+
expectedDelete := []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}
6686

67-
require.NoError(t, err)
87+
tc.setupMocks(ddbMock, codecMock, descMock, expectedBatch, expectedDelete)
88+
89+
codecMock.On("DecodeMultiKey").Return(descMock, nil).Times(tc.expectedQueryCalls)
90+
descMock.On("Clone").Return(descMock).Times(tc.expectedQueryCalls)
91+
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Times(tc.expectedBatchCalls)
92+
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Times(tc.expectedBatchCalls)
93+
94+
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
95+
return descMock, true, nil
96+
})
97+
98+
require.NoError(t, err)
99+
ddbMock.AssertNumberOfCalls(t, "Query", tc.expectedQueryCalls)
100+
ddbMock.AssertNumberOfCalls(t, "Batch", tc.expectedBatchCalls)
101+
})
102+
}
68103
}
69104

70105
func Test_CAS_Failed(t *testing.T) {
@@ -78,7 +113,7 @@ func Test_CAS_Failed(t *testing.T) {
78113
descMock := &DescMock{}
79114
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config)
80115

81-
ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test"))
116+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test"))
82117

83118
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
84119
return descMock, true, nil
@@ -98,17 +133,17 @@ func Test_CAS_Update(t *testing.T) {
98133
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
99134
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
100135
}
101-
expectedBatch := map[dynamodbKey][]byte{
102-
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
103-
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
136+
expectedBatch := map[dynamodbKey]dynamodbItem{
137+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
138+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
104139
}
105140

106-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
141+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
107142
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
108143
descMock.On("Clone").Return(descMock).Once()
109144
descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once()
110145
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
111-
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
146+
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()
112147

113148
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
114149
return descMock, true, nil
@@ -130,20 +165,20 @@ func Test_CAS_Delete(t *testing.T) {
130165
{primaryKey: key, sortKey: expectedToDelete[1]},
131166
}
132167

133-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
168+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
134169
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
135170
descMock.On("Clone").Return(descMock).Once()
136171
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
137172
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once()
138-
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
173+
ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch).Return(false, nil).Once()
139174

140175
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
141176
return descMock, true, nil
142177
})
143178

144179
require.NoError(t, err)
145180
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
146-
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
181+
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch)
147182
}
148183

149184
func Test_CAS_Update_Delete(t *testing.T) {
@@ -156,22 +191,22 @@ func Test_CAS_Update_Delete(t *testing.T) {
156191
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
157192
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
158193
}
159-
expectedUpdateBatch := map[dynamodbKey][]byte{
160-
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
161-
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
194+
expectedUpdateBatch := map[dynamodbKey]dynamodbItem{
195+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
196+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
162197
}
163198
expectedToDelete := []string{"test", "test2"}
164199
expectedDeleteBatch := []dynamodbKey{
165200
{primaryKey: key, sortKey: expectedToDelete[0]},
166201
{primaryKey: key, sortKey: expectedToDelete[1]},
167202
}
168203

169-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
204+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
170205
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
171206
descMock.On("Clone").Return(descMock).Once()
172207
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
173208
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
174-
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch)
209+
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch).Return(false, nil).Once()
175210

176211
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
177212
return descMock, true, nil
@@ -189,7 +224,7 @@ func Test_WatchKey(t *testing.T) {
189224
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff)
190225
timesCalled := 0
191226

192-
ddbMock.On("Query").Return(map[string][]byte{}, nil)
227+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil)
193228
codecMock.On("DecodeMultiKey").Return(descMock, nil)
194229

195230
c.WatchKey(context.TODO(), key, func(i interface{}) bool {
@@ -207,7 +242,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
207242
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
208243
staleData := &DescMock{}
209244

210-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
245+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
211246
codecMock.On("DecodeMultiKey").Return(staleData, nil)
212247

213248
c.WatchKey(context.TODO(), key, func(i interface{}) bool {
@@ -217,7 +252,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
217252
return false
218253
})
219254

220-
ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed"))
255+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed"))
221256
staleData.On("Clone").Return(staleData).Once()
222257

223258
c.WatchKey(context.TODO(), key, func(i interface{}) bool {
@@ -241,17 +276,17 @@ func Test_CAS_UpdateStale(t *testing.T) {
241276
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
242277
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
243278
}
244-
expectedBatch := map[dynamodbKey][]byte{
245-
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
246-
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
279+
expectedBatch := map[dynamodbKey]dynamodbItem{
280+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
281+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
247282
}
248283

249-
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
284+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
250285
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
251286
descMock.On("Clone").Return(descMock).Once()
252287
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
253288
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
254-
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
289+
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()
255290

256291
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
257292
return descMockResult, true, nil
@@ -266,17 +301,17 @@ func Test_WatchPrefix(t *testing.T) {
266301
ddbMock := NewDynamodbClientMock()
267302
codecMock := &CodecMock{}
268303
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
269-
data := map[string][]byte{}
304+
data := map[string]dynamodbItem{}
270305
dataKey := []string{"t1", "t2"}
271-
data[dataKey[0]] = []byte(dataKey[0])
272-
data[dataKey[1]] = []byte(dataKey[1])
306+
data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])}
307+
data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])}
273308
calls := 0
274309

275310
ddbMock.On("Query").Return(data, nil)
276311
codecMock.On("Decode").Twice()
277312

278313
c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool {
279-
require.EqualValues(t, string(data[key]), i)
314+
require.EqualValues(t, string(data[key].data), i)
280315
delete(data, key)
281316
calls++
282317
return calls < 2
@@ -321,7 +356,7 @@ func Test_DynamodbKVWithTimeout(t *testing.T) {
321356
err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{})
322357
require.True(t, errors.Is(err, context.DeadlineExceeded))
323358

324-
err = dbWithTimeout.Batch(ctx, nil, nil)
359+
_, err = dbWithTimeout.Batch(ctx, nil, nil)
325360
require.True(t, errors.Is(err, context.DeadlineExceeded))
326361
}
327362

@@ -358,13 +393,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float
358393
}
359394
return args.Get(0).([]string), 0, err
360395
}
361-
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) {
396+
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) {
362397
args := m.Called()
363398
var err error
364399
if args.Get(1) != nil {
365400
err = args.Get(1).(error)
366401
}
367-
return args.Get(0).(map[string][]byte), 0, err
402+
return args.Get(0).(map[string]dynamodbItem), 0, err
368403
}
369404
func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error {
370405
m.Called(ctx, key)
@@ -374,9 +409,13 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by
374409
m.Called(ctx, key, data)
375410
return nil
376411
}
377-
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
378-
m.Called(ctx, put, delete)
379-
return nil
412+
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
413+
args := m.Called(ctx, put, delete)
414+
var err error
415+
if args.Get(1) != nil {
416+
err = args.Get(1).(error)
417+
}
418+
return args.Get(0).(bool), err
380419
}
381420

382421
type TestLogger struct {
@@ -471,7 +510,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam
471510
}
472511
}
473512

474-
func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
513+
func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) {
475514
select {
476515
case <-ctx.Done():
477516
return nil, 0, ctx.Err()
@@ -498,10 +537,10 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo
498537
}
499538
}
500539

501-
func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
540+
func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
502541
select {
503542
case <-ctx.Done():
504-
return ctx.Err()
543+
return false, ctx.Err()
505544
case <-time.After(d.delay):
506545
return d.ddbClient.Batch(ctx, put, delete)
507546
}

0 commit comments

Comments
 (0)