Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088

## 1.20.0 in progress

Expand Down
8 changes: 6 additions & 2 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in any) (out any, r
}

func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) {
bo := backoff.New(ctx, c.backoffConfig)
watchBackoffConfig := c.backoffConfig
watchBackoffConfig.MaxRetries = 0
bo := backoff.New(ctx, watchBackoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand Down Expand Up @@ -272,7 +274,9 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) {
}

func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, any) bool) {
bo := backoff.New(ctx, c.backoffConfig)
watchBackoffConfig := c.backoffConfig
watchBackoffConfig.MaxRetries = 0
bo := backoff.New(ctx, watchBackoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand Down
36 changes: 36 additions & 0 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,42 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
})
}

func Test_WatchKey_AlwaysRetry(t *testing.T) {
casBackoffConfig := backoff.Config{
MinBackoff: 1 * time.Millisecond,
MaxBackoff: 1 * time.Millisecond,
MaxRetries: 5, // CAS should retry, but WatchKey should not
}

ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, casBackoffConfig)

// Mock Query to always fail
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed"))

// WatchKey should not retry on failure (MaxRetries=0), so it should only call Query once
// and then fall back to stale data
staleData := &DescMock{}
staleData.On("Clone").Return(staleData).Once()

// Set up some stale data first
c.updateStaleData(key, staleData, time.Now())

callCount := 0
c.WatchKey(context.TODO(), key, func(i any) bool {
callCount++
// Should only be called once with stale data after the first query fails
require.EqualValues(t, staleData, i)
return false // Stop watching
})

// Verify that Query was called exactly 11 times (1 initial + 10 retries due to hardcoded limit in WatchKey)
// This confirms WatchKey has its own retry logic separate from backoff MaxRetries
ddbMock.AssertNumberOfCalls(t, "Query", 11)
require.Equal(t, 1, callCount, "Callback should be called once with stale data")
}

func Test_CAS_UpdateStale(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
Expand Down
Loading