diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fdbd8a7cac..caea6c0aea7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 * [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 * [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086 +* [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088 ## 1.20.0 in progress diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 9c3e45b65b7..7f31484b22f 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -230,7 +230,9 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in any) (out any, r } func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) { - bo := backoff.New(ctx, c.backoffConfig) + watchBackoffConfig := c.backoffConfig + watchBackoffConfig.MaxRetries = 0 + bo := backoff.New(ctx, watchBackoffConfig) for bo.Ongoing() { out, _, err := c.kv.Query(ctx, dynamodbKey{ @@ -272,7 +274,9 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) { } func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, any) bool) { - bo := backoff.New(ctx, c.backoffConfig) + watchBackoffConfig := c.backoffConfig + watchBackoffConfig.MaxRetries = 0 + bo := backoff.New(ctx, watchBackoffConfig) for bo.Ongoing() { out, _, err := c.kv.Query(ctx, dynamodbKey{ diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 6885998d695..aed34fe9d4c 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -263,6 +263,42 @@ func Test_WatchKey_UpdateStale(t *testing.T) { }) } +func Test_WatchKey_AlwaysRetry(t *testing.T) { + casBackoffConfig := backoff.Config{ + MinBackoff: 1 * time.Millisecond, + MaxBackoff: 1 * time.Millisecond, + MaxRetries: 5, // CAS should retry, but WatchKey should not + } + + ddbMock := NewDynamodbClientMock() + codecMock := &CodecMock{} + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, casBackoffConfig) + + // Mock Query to always fail + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")) + + // WatchKey should not retry on failure (MaxRetries=0), so it should only call Query once + // and then fall back to stale data + staleData := &DescMock{} + staleData.On("Clone").Return(staleData).Once() + + // Set up some stale data first + c.updateStaleData(key, staleData, time.Now()) + + callCount := 0 + c.WatchKey(context.TODO(), key, func(i any) bool { + callCount++ + // Should only be called once with stale data after the first query fails + require.EqualValues(t, staleData, i) + return false // Stop watching + }) + + // Verify that Query was called exactly 11 times (1 initial + 10 retries due to hardcoded limit in WatchKey) + // This confirms WatchKey has its own retry logic separate from backoff MaxRetries + ddbMock.AssertNumberOfCalls(t, "Query", 11) + require.Equal(t, 1, callCount, "Callback should be called once with stale data") +} + func Test_CAS_UpdateStale(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{}