Skip to content

Commit 608a1b0

Browse files
inishchithyaron2
andauthored
feat: Add Redis Sentinel failover support to Redis lock component (#3973)
Signed-off-by: inishchith <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent 506118f commit 608a1b0

File tree

4 files changed

+228
-7
lines changed

4 files changed

+228
-7
lines changed

common/component/redis/v8client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,12 @@ func newV8FailoverClient(s *Settings) (RedisClient, error) {
360360
}
361361
}
362362

363-
if s.RedisType == ClusterType {
363+
// If multiple sentinel addresses are provided, split them regardless of RedisType.
364+
if strings.Contains(s.Host, ",") {
364365
opts.SentinelAddrs = strings.Split(s.Host, ",")
366+
}
365367

368+
if s.RedisType == ClusterType {
366369
return v8Client{
367370
client: v8.NewFailoverClusterClient(opts),
368371
readTimeout: s.ReadTimeout,

common/component/redis/v9client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,12 @@ func newV9FailoverClient(s *Settings) (RedisClient, error) {
361361
}
362362
}
363363

364-
if s.RedisType == ClusterType {
364+
// If multiple sentinel addresses are provided, split them regardless of RedisType.
365+
if strings.Contains(s.Host, ",") {
365366
opts.SentinelAddrs = strings.Split(s.Host, ",")
367+
}
366368

369+
if s.RedisType == ClusterType {
367370
return v9Client{
368371
client: v9.NewFailoverClusterClient(opts),
369372
readTimeout: s.ReadTimeout,

lock/redis/standalone.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,21 @@ func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.M
6060
return errors.New("metadata property redisHost is empty")
6161
}
6262

63-
// We do not support failover or having replicas
64-
if r.clientSettings.Failover {
65-
return errors.New("this component does not support connecting to Redis with failover")
63+
// Failover (Sentinel) is supported via the shared redis client.
64+
// Redis Cluster remains unsupported for distributed locks.
65+
if r.clientSettings.RedisType == rediscomponent.ClusterType {
66+
return errors.New("this component does not support connecting to Redis Cluster")
6667
}
6768

6869
// Ping Redis to ensure the connection is uo
6970
if _, err = r.client.PingResult(ctx); err != nil {
7071
return fmt.Errorf("error connecting to Redis: %v", err)
7172
}
7273

73-
// Ensure there are no replicas
74+
// Ensure there are no replicas unless we are using failover (Sentinel)
7475
// Pass the validation if error occurs, since some Redis versions such as miniredis do not recognize the `INFO` command.
7576
replicas, err := rediscomponent.GetConnectedSlaves(ctx, r.client)
76-
if err == nil && replicas > 0 {
77+
if err == nil && replicas > 0 && !r.clientSettings.Failover {
7778
return errors.New("replication is not supported")
7879
}
7980
return nil

lock/redis/standalone_test.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,124 @@ func TestStandaloneRedisLock_InitError(t *testing.T) {
7979
})
8080
}
8181

82+
func TestStandaloneRedisLock_InitFailoverAndReplication(t *testing.T) {
83+
t.Run("error when cluster type is specified", func(t *testing.T) {
84+
// construct component
85+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
86+
defer comp.Close()
87+
88+
cfg := lock.Metadata{Base: metadata.Base{
89+
Properties: make(map[string]string),
90+
}}
91+
cfg.Properties["redisHost"] = "127.0.0.1:6379"
92+
cfg.Properties["redisType"] = "cluster"
93+
94+
// init should fail for cluster type
95+
err := comp.InitLockStore(t.Context(), cfg)
96+
require.Error(t, err)
97+
assert.Contains(t, err.Error(), "does not support connecting to Redis Cluster")
98+
})
99+
100+
t.Run("failover configuration", func(t *testing.T) {
101+
// Note: miniredis doesn't support Sentinel, so this test verifies
102+
// that failover configuration is properly parsed and fails appropriately
103+
// when Sentinel is not available
104+
105+
// construct component
106+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
107+
defer comp.Close()
108+
109+
cfg := lock.Metadata{Base: metadata.Base{
110+
Properties: make(map[string]string),
111+
}}
112+
cfg.Properties["redisHost"] = "127.0.0.1:26379" // Standard Sentinel port
113+
cfg.Properties["failover"] = "true"
114+
cfg.Properties["sentinelMasterName"] = "mymaster"
115+
116+
// init should fail due to no Sentinel available, but this validates
117+
// that failover configuration is properly handled
118+
err := comp.InitLockStore(t.Context(), cfg)
119+
require.Error(t, err)
120+
// The error should be related to connection, not configuration parsing
121+
assert.Contains(t, err.Error(), "error connecting to Redis")
122+
})
123+
124+
t.Run("success when no replicas detected", func(t *testing.T) {
125+
// start redis
126+
s, err := miniredis.Run()
127+
require.NoError(t, err)
128+
defer s.Close()
129+
130+
// construct component
131+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
132+
defer comp.Close()
133+
134+
cfg := lock.Metadata{Base: metadata.Base{
135+
Properties: make(map[string]string),
136+
}}
137+
cfg.Properties["redisHost"] = s.Addr()
138+
cfg.Properties["failover"] = "false"
139+
140+
// init should succeed when no replicas are present
141+
err = comp.InitLockStore(t.Context(), cfg)
142+
require.NoError(t, err)
143+
})
144+
145+
t.Run("error when replication detected without failover", func(t *testing.T) {
146+
// Note: This test would require a more complex setup with actual Redis replicas
147+
// For now, we test the validation logic path
148+
// In a real scenario, you would set up a Redis master with replicas
149+
// and ensure the component rejects it when failover=false
150+
151+
// This is a conceptual test - actual implementation would require:
152+
// 1. Setting up Redis with replication
153+
// 2. Ensuring GetConnectedSlaves returns > 0
154+
// 3. Verifying the error is returned
155+
156+
t.Skip("Requires complex Redis replication setup - implement when needed")
157+
})
158+
}
159+
160+
func TestStandaloneRedisLock_ConfigurationValidation(t *testing.T) {
161+
t.Run("error when invalid failover settings", func(t *testing.T) {
162+
// construct component
163+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
164+
defer comp.Close()
165+
166+
cfg := lock.Metadata{Base: metadata.Base{
167+
Properties: make(map[string]string),
168+
}}
169+
cfg.Properties["redisHost"] = "127.0.0.1:6379"
170+
cfg.Properties["failover"] = "true"
171+
// Missing sentinelMasterName should cause connection issues
172+
173+
// init should fail due to invalid failover configuration
174+
err := comp.InitLockStore(t.Context(), cfg)
175+
require.Error(t, err)
176+
})
177+
178+
t.Run("success with valid node type", func(t *testing.T) {
179+
// start redis
180+
s, err := miniredis.Run()
181+
require.NoError(t, err)
182+
defer s.Close()
183+
184+
// construct component
185+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
186+
defer comp.Close()
187+
188+
cfg := lock.Metadata{Base: metadata.Base{
189+
Properties: make(map[string]string),
190+
}}
191+
cfg.Properties["redisHost"] = s.Addr()
192+
cfg.Properties["redisType"] = "node"
193+
194+
// init should succeed with node type
195+
err = comp.InitLockStore(t.Context(), cfg)
196+
require.NoError(t, err)
197+
})
198+
}
199+
82200
func TestStandaloneRedisLock_TryLock(t *testing.T) {
83201
// 0. prepare
84202
// start redis
@@ -146,3 +264,99 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
146264
require.NoError(t, err)
147265
assert.EqualValues(t, 0, unlockResp.Status, "client2 failed to unlock!")
148266
}
267+
268+
func TestStandaloneRedisLock_ErrorScenarios(t *testing.T) {
269+
t.Run("error when connection ping fails", func(t *testing.T) {
270+
// construct component
271+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
272+
defer comp.Close()
273+
274+
cfg := lock.Metadata{Base: metadata.Base{
275+
Properties: make(map[string]string),
276+
}}
277+
cfg.Properties["redisHost"] = "127.0.0.1:9999" // Non-existent Redis port
278+
cfg.Properties["redisPassword"] = ""
279+
280+
// init should fail due to connection error
281+
err := comp.InitLockStore(t.Context(), cfg)
282+
require.Error(t, err)
283+
assert.Contains(t, err.Error(), "error connecting to Redis")
284+
})
285+
286+
t.Run("error with empty host", func(t *testing.T) {
287+
// construct component
288+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
289+
defer comp.Close()
290+
291+
cfg := lock.Metadata{Base: metadata.Base{
292+
Properties: make(map[string]string),
293+
}}
294+
cfg.Properties["redisHost"] = ""
295+
296+
// init should fail due to empty host
297+
err := comp.InitLockStore(t.Context(), cfg)
298+
require.Error(t, err)
299+
assert.Contains(t, err.Error(), "redisHost is empty")
300+
})
301+
302+
t.Run("unlock scenarios", func(t *testing.T) {
303+
// start redis
304+
s, err := miniredis.Run()
305+
require.NoError(t, err)
306+
defer s.Close()
307+
308+
// construct component
309+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
310+
defer comp.Close()
311+
312+
cfg := lock.Metadata{Base: metadata.Base{
313+
Properties: make(map[string]string),
314+
}}
315+
cfg.Properties["redisHost"] = s.Addr()
316+
317+
err = comp.InitLockStore(t.Context(), cfg)
318+
require.NoError(t, err)
319+
320+
// Test unlock non-existent lock
321+
unlockResp, err := comp.Unlock(t.Context(), &lock.UnlockRequest{
322+
ResourceID: "non-existent-resource",
323+
LockOwner: "some-owner",
324+
})
325+
require.NoError(t, err)
326+
assert.Equal(t, lock.LockDoesNotExist, unlockResp.Status)
327+
328+
// Create a lock first
329+
ownerID := uuid.New().String()
330+
_, err = comp.TryLock(t.Context(), &lock.TryLockRequest{
331+
ResourceID: resourceID,
332+
LockOwner: ownerID,
333+
ExpiryInSeconds: 10,
334+
})
335+
require.NoError(t, err)
336+
337+
// Test unlock with wrong owner
338+
unlockResp, err = comp.Unlock(t.Context(), &lock.UnlockRequest{
339+
ResourceID: resourceID,
340+
LockOwner: "wrong-owner",
341+
})
342+
require.NoError(t, err)
343+
assert.Equal(t, lock.LockBelongsToOthers, unlockResp.Status)
344+
345+
// Test successful unlock
346+
unlockResp, err = comp.Unlock(t.Context(), &lock.UnlockRequest{
347+
ResourceID: resourceID,
348+
LockOwner: ownerID,
349+
})
350+
require.NoError(t, err)
351+
assert.Equal(t, lock.Success, unlockResp.Status)
352+
})
353+
}
354+
355+
func TestStandaloneRedisLock_ComponentMetadata(t *testing.T) {
356+
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
357+
defer comp.Close()
358+
359+
metadata := comp.GetComponentMetadata()
360+
assert.NotNil(t, metadata)
361+
assert.NotEmpty(t, metadata)
362+
}

0 commit comments

Comments
 (0)