Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
* [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
31 changes: 23 additions & 8 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,16 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
continue
}

putRequests := map[dynamodbKey][]byte{}
putRequests := map[dynamodbKey]dynamodbItem{}
for childKey, bytes := range buf {
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes
version := int64(0)
if ddbItem, ok := resp[childKey]; ok {
version = ddbItem.version
}
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{
data: bytes,
version: version,
}
}

deleteRequests := make([]dynamodbKey, 0, len(toDelete))
Expand All @@ -196,9 +203,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
}

if len(putRequests) > 0 || len(deleteRequests) > 0 {
err = c.kv.Batch(ctx, putRequests, deleteRequests)
retry, err := c.kv.Batch(ctx, putRequests, deleteRequests)
if err != nil {
return err
if !retry {
return err
}
bo.Wait()
continue
}
c.updateStaleData(key, r, time.Now().UTC())
return nil
Expand Down Expand Up @@ -273,8 +284,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
continue
}

for key, bytes := range out {
decoded, err := c.codec.Decode(bytes)
for key, ddbItem := range out {
decoded, err := c.codec.Decode(ddbItem.data)
if err != nil {
level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
continue
Expand All @@ -293,8 +304,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
}
}

func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) {
res, err := c.codec.DecodeMultiKey(data)
func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) {
multiKeyData := make(map[string][]byte, len(data))
for key, ddbItem := range data {
multiKeyData[key] = ddbItem.data
}
res, err := c.codec.DecodeMultiKey(multiKeyData)
if err != nil {
return nil, err
}
Expand Down
143 changes: 91 additions & 52 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()

Expand All @@ -46,25 +46,60 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
}

func Test_CAS_Backoff(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")
testCases := []struct {
name string
setupMocks func(*MockDynamodbClient, *CodecMock, *DescMock, map[dynamodbKey]dynamodbItem, []dynamodbKey)
expectedQueryCalls int
expectedBatchCalls int
}{
{
name: "query_fails_and_backs_off",
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
},
expectedQueryCalls: 2,
expectedBatchCalls: 1,
},
{
name: "batch_fails_and_backs_off",
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Twice()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(true, errors.Errorf("batch failed")).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
},
expectedQueryCalls: 2,
expectedBatchCalls: 2,
},
}

ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once()
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once()
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Twice()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})
expectedBatch := map[dynamodbKey]dynamodbItem{}
expectedDelete := []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}

require.NoError(t, err)
tc.setupMocks(ddbMock, codecMock, descMock, expectedBatch, expectedDelete)

codecMock.On("DecodeMultiKey").Return(descMock, nil).Times(tc.expectedQueryCalls)
descMock.On("Clone").Return(descMock).Times(tc.expectedQueryCalls)
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Times(tc.expectedBatchCalls)
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Times(tc.expectedBatchCalls)

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})

require.NoError(t, err)
ddbMock.AssertNumberOfCalls(t, "Query", tc.expectedQueryCalls)
ddbMock.AssertNumberOfCalls(t, "Batch", tc.expectedBatchCalls)
})
}
}

func Test_CAS_Failed(t *testing.T) {
Expand All @@ -78,7 +113,7 @@ func Test_CAS_Failed(t *testing.T) {
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config)

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test"))

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -98,17 +133,17 @@ func Test_CAS_Update(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -130,20 +165,20 @@ func Test_CAS_Delete(t *testing.T) {
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})

require.NoError(t, err)
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch)
}

func Test_CAS_Update_Delete(t *testing.T) {
Expand All @@ -156,22 +191,22 @@ func Test_CAS_Update_Delete(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedUpdateBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedUpdateBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}
expectedToDelete := []string{"test", "test2"}
expectedDeleteBatch := []dynamodbKey{
{primaryKey: key, sortKey: expectedToDelete[0]},
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch)
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -189,7 +224,7 @@ func Test_WatchKey(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff)
timesCalled := 0

ddbMock.On("Query").Return(map[string][]byte{}, nil)
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil)
codecMock.On("DecodeMultiKey").Return(descMock, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -207,7 +242,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
staleData := &DescMock{}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(staleData, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -217,7 +252,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
return false
})

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed"))
staleData.On("Clone").Return(staleData).Once()

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -241,17 +276,17 @@ func Test_CAS_UpdateStale(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMockResult, true, nil
Expand All @@ -266,17 +301,17 @@ func Test_WatchPrefix(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
data := map[string][]byte{}
data := map[string]dynamodbItem{}
dataKey := []string{"t1", "t2"}
data[dataKey[0]] = []byte(dataKey[0])
data[dataKey[1]] = []byte(dataKey[1])
data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])}
data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])}
calls := 0

ddbMock.On("Query").Return(data, nil)
codecMock.On("Decode").Twice()

c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool {
require.EqualValues(t, string(data[key]), i)
require.EqualValues(t, string(data[key].data), i)
delete(data, key)
calls++
return calls < 2
Expand Down Expand Up @@ -321,7 +356,7 @@ func Test_DynamodbKVWithTimeout(t *testing.T) {
err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Batch(ctx, nil, nil)
_, err = dbWithTimeout.Batch(ctx, nil, nil)
require.True(t, errors.Is(err, context.DeadlineExceeded))
}

Expand Down Expand Up @@ -358,13 +393,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float
}
return args.Get(0).([]string), 0, err
}
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) {
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) {
args := m.Called()
var err error
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return args.Get(0).(map[string][]byte), 0, err
return args.Get(0).(map[string]dynamodbItem), 0, err
}
func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error {
m.Called(ctx, key)
Expand All @@ -374,9 +409,13 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by
m.Called(ctx, key, data)
return nil
}
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
m.Called(ctx, put, delete)
return nil
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
args := m.Called(ctx, put, delete)
var err error
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return args.Get(0).(bool), err
}

type TestLogger struct {
Expand Down Expand Up @@ -471,7 +510,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
Expand All @@ -498,10 +537,10 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
select {
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Batch(ctx, put, delete)
}
Expand Down
Loading
Loading