diff --git a/options.go b/options.go index 6887b602e..9b09b7d7d 100644 --- a/options.go +++ b/options.go @@ -231,6 +231,11 @@ type Options struct { // UnstableResp3 enables Unstable mode for Redis Search module with RESP3. // When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult UnstableResp3 bool + + // FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing. + // When a node is marked as failing, it will be avoided for this duration. + // Default is 15 seconds. + FailingTimeoutSeconds int } func (opt *Options) init() { diff --git a/osscluster.go b/osscluster.go index 0f678e602..8d839a0a6 100644 --- a/osscluster.go +++ b/osscluster.go @@ -124,6 +124,11 @@ type ClusterOptions struct { // UnstableResp3 enables Unstable mode for Redis Search module with RESP3. UnstableResp3 bool + + // FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing. + // When a node is marked as failing, it will be avoided for this duration. + // Default is 15 seconds. + FailingTimeoutSeconds int } func (opt *ClusterOptions) init() { @@ -180,6 +185,10 @@ func (opt *ClusterOptions) init() { if opt.NewClient == nil { opt.NewClient = NewClient } + + if opt.FailingTimeoutSeconds == 0 { + opt.FailingTimeoutSeconds = 15 + } } // ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis. @@ -284,6 +293,7 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er o.PoolTimeout = q.duration("pool_timeout") o.ConnMaxLifetime = q.duration("conn_max_lifetime") o.ConnMaxIdleTime = q.duration("conn_max_idle_time") + o.FailingTimeoutSeconds = q.int("failing_timeout_seconds") if q.err != nil { return nil, q.err @@ -330,20 +340,21 @@ func (opt *ClusterOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, ContextTimeoutEnabled: opt.ContextTimeoutEnabled, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - MinIdleConns: opt.MinIdleConns, - MaxIdleConns: opt.MaxIdleConns, - MaxActiveConns: opt.MaxActiveConns, - ConnMaxIdleTime: opt.ConnMaxIdleTime, - ConnMaxLifetime: opt.ConnMaxLifetime, - ReadBufferSize: opt.ReadBufferSize, - WriteBufferSize: opt.WriteBufferSize, - DisableIdentity: opt.DisableIdentity, - DisableIndentity: opt.DisableIdentity, - IdentitySuffix: opt.IdentitySuffix, - TLSConfig: opt.TLSConfig, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + MaxActiveConns: opt.MaxActiveConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, + DisableIdentity: opt.DisableIdentity, + DisableIndentity: opt.DisableIdentity, + IdentitySuffix: opt.IdentitySuffix, + FailingTimeoutSeconds: opt.FailingTimeoutSeconds, + TLSConfig: opt.TLSConfig, // If ClusterSlots is populated, then we probably have an artificial // cluster whose nodes are not in clustering mode (otherwise there isn't // much use for ClusterSlots config). This means we cannot execute the @@ -432,7 +443,7 @@ func (n *clusterNode) MarkAsFailing() { } func (n *clusterNode) Failing() bool { - const timeout = 15 // 15 seconds + timeout := int64(n.Client.opt.FailingTimeoutSeconds) failing := atomic.LoadUint32(&n.failing) if failing == 0 { diff --git a/osscluster_test.go b/osscluster_test.go index 2c7f40a5f..09c6d362c 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -1665,6 +1665,10 @@ var _ = Describe("ClusterClient ParseURL", func() { test: "UseDefault", url: "redis://localhost:123?conn_max_idle_time=", o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0}, + }, { + test: "FailingTimeoutSeconds", + url: "redis://localhost:123?failing_timeout_seconds=25", + o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, FailingTimeoutSeconds: 25}, }, { test: "Protocol", url: "redis://localhost:123?protocol=2", @@ -1729,7 +1733,79 @@ var _ = Describe("ClusterClient ParseURL", func() { Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime)) Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime)) Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout)) + Expect(tc.o.FailingTimeoutSeconds).To(Equal(actual.FailingTimeoutSeconds)) } } }) }) + +var _ = Describe("ClusterClient FailingTimeoutSeconds", func() { + var client *redis.ClusterClient + + AfterEach(func() { + if client != nil { + _ = client.Close() + } + }) + + It("should use default failing timeout of 15 seconds", func() { + opt := redisClusterOptions() + client = cluster.newClusterClient(ctx, opt) + + // Default should be 15 seconds + Expect(opt.FailingTimeoutSeconds).To(Equal(15)) + }) + + It("should use custom failing timeout", func() { + opt := redisClusterOptions() + opt.FailingTimeoutSeconds = 30 + client = cluster.newClusterClient(ctx, opt) + + // Should use custom value + Expect(opt.FailingTimeoutSeconds).To(Equal(30)) + }) + + It("should parse failing_timeout_seconds from URL", func() { + url := "redis://localhost:16600?failing_timeout_seconds=25" + opt, err := redis.ParseClusterURL(url) + Expect(err).NotTo(HaveOccurred()) + Expect(opt.FailingTimeoutSeconds).To(Equal(25)) + }) + + It("should handle node failing timeout correctly", func() { + opt := redisClusterOptions() + opt.FailingTimeoutSeconds = 2 // Short timeout for testing + client = cluster.newClusterClient(ctx, opt) + + // Get a node and mark it as failing + nodes, err := client.Nodes(ctx, "A") + Expect(err).NotTo(HaveOccurred()) + Expect(len(nodes)).To(BeNumerically(">", 0)) + + node := nodes[0] + + // Initially not failing + Expect(node.Failing()).To(BeFalse()) + + // Mark as failing + node.MarkAsFailing() + Expect(node.Failing()).To(BeTrue()) + + // Should still be failing after 1 second (less than timeout) + time.Sleep(1 * time.Second) + Expect(node.Failing()).To(BeTrue()) + + // Should not be failing after timeout expires + time.Sleep(2 * time.Second) // Total 3 seconds > 2 second timeout + Expect(node.Failing()).To(BeFalse()) + }) + + It("should handle zero timeout by using default", func() { + opt := redisClusterOptions() + opt.FailingTimeoutSeconds = 0 // Should use default + client = cluster.newClusterClient(ctx, opt) + + // After initialization, should be set to default + Expect(opt.FailingTimeoutSeconds).To(Equal(15)) + }) +}) diff --git a/sentinel.go b/sentinel.go index a858b0876..7963a0699 100644 --- a/sentinel.go +++ b/sentinel.go @@ -129,7 +129,13 @@ type FailoverOptions struct { DisableIdentity bool IdentitySuffix string - UnstableResp3 bool + + // FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing. + // When a node is marked as failing, it will be avoided for this duration. + // Only applies to failover cluster clients. Default is 15 seconds. + FailingTimeoutSeconds int + + UnstableResp3 bool } func (opt *FailoverOptions) clientOptions() *Options { @@ -263,10 +269,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { TLSConfig: opt.TLSConfig, - DisableIdentity: opt.DisableIdentity, - DisableIndentity: opt.DisableIndentity, - - IdentitySuffix: opt.IdentitySuffix, + DisableIdentity: opt.DisableIdentity, + DisableIndentity: opt.DisableIndentity, + IdentitySuffix: opt.IdentitySuffix, + FailingTimeoutSeconds: opt.FailingTimeoutSeconds, } } diff --git a/universal.go b/universal.go index 9d51b928e..9b150d7d4 100644 --- a/universal.go +++ b/universal.go @@ -98,7 +98,13 @@ type UniversalOptions struct { DisableIdentity bool IdentitySuffix string - UnstableResp3 bool + + // FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing. + // When a node is marked as failing, it will be avoided for this duration. + // Only applies to cluster clients. Default is 15 seconds. + FailingTimeoutSeconds int + + UnstableResp3 bool // IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint). IsClusterMode bool @@ -149,10 +155,11 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { TLSConfig: o.TLSConfig, - DisableIdentity: o.DisableIdentity, - DisableIndentity: o.DisableIndentity, - IdentitySuffix: o.IdentitySuffix, - UnstableResp3: o.UnstableResp3, + DisableIdentity: o.DisableIdentity, + DisableIndentity: o.DisableIndentity, + IdentitySuffix: o.IdentitySuffix, + FailingTimeoutSeconds: o.FailingTimeoutSeconds, + UnstableResp3: o.UnstableResp3, } }