Skip to content

Clean failing timeout implementation #3472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ type Options struct {
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
UnstableResp3 bool

// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
// When a node is marked as failing, it will be avoided for this duration.
// Default is 15 seconds.
FailingTimeoutSeconds int
}

func (opt *Options) init() {
Expand Down
41 changes: 26 additions & 15 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ type ClusterOptions struct {

// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
UnstableResp3 bool

// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
// When a node is marked as failing, it will be avoided for this duration.
// Default is 15 seconds.
FailingTimeoutSeconds int
}

func (opt *ClusterOptions) init() {
Expand Down Expand Up @@ -180,6 +185,10 @@ func (opt *ClusterOptions) init() {
if opt.NewClient == nil {
opt.NewClient = NewClient
}

if opt.FailingTimeoutSeconds == 0 {
opt.FailingTimeoutSeconds = 15
}
}

// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
Expand Down Expand Up @@ -284,6 +293,7 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
o.PoolTimeout = q.duration("pool_timeout")
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
o.FailingTimeoutSeconds = q.int("failing_timeout_seconds")

if q.err != nil {
return nil, q.err
Expand Down Expand Up @@ -330,20 +340,21 @@ func (opt *ClusterOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,

PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIdentity,
IdentitySuffix: opt.IdentitySuffix,
TLSConfig: opt.TLSConfig,
PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIdentity,
IdentitySuffix: opt.IdentitySuffix,
FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
TLSConfig: opt.TLSConfig,
// If ClusterSlots is populated, then we probably have an artificial
// cluster whose nodes are not in clustering mode (otherwise there isn't
// much use for ClusterSlots config). This means we cannot execute the
Expand Down Expand Up @@ -432,7 +443,7 @@ func (n *clusterNode) MarkAsFailing() {
}

func (n *clusterNode) Failing() bool {
const timeout = 15 // 15 seconds
timeout := int64(n.Client.opt.FailingTimeoutSeconds)

failing := atomic.LoadUint32(&n.failing)
if failing == 0 {
Expand Down
76 changes: 76 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,10 @@ var _ = Describe("ClusterClient ParseURL", func() {
test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "FailingTimeoutSeconds",
url: "redis://localhost:123?failing_timeout_seconds=25",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, FailingTimeoutSeconds: 25},
}, {
test: "Protocol",
url: "redis://localhost:123?protocol=2",
Expand Down Expand Up @@ -1729,7 +1733,79 @@ var _ = Describe("ClusterClient ParseURL", func() {
Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
Expect(tc.o.FailingTimeoutSeconds).To(Equal(actual.FailingTimeoutSeconds))
}
}
})
})

var _ = Describe("ClusterClient FailingTimeoutSeconds", func() {
var client *redis.ClusterClient

AfterEach(func() {
if client != nil {
_ = client.Close()
}
})

It("should use default failing timeout of 15 seconds", func() {
opt := redisClusterOptions()
client = cluster.newClusterClient(ctx, opt)

// Default should be 15 seconds
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
})

It("should use custom failing timeout", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 30
client = cluster.newClusterClient(ctx, opt)

// Should use custom value
Expect(opt.FailingTimeoutSeconds).To(Equal(30))
})

It("should parse failing_timeout_seconds from URL", func() {
url := "redis://localhost:16600?failing_timeout_seconds=25"
opt, err := redis.ParseClusterURL(url)
Expect(err).NotTo(HaveOccurred())
Expect(opt.FailingTimeoutSeconds).To(Equal(25))
})

It("should handle node failing timeout correctly", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 2 // Short timeout for testing
client = cluster.newClusterClient(ctx, opt)

// Get a node and mark it as failing
nodes, err := client.Nodes(ctx, "A")
Expect(err).NotTo(HaveOccurred())
Expect(len(nodes)).To(BeNumerically(">", 0))

node := nodes[0]

// Initially not failing
Expect(node.Failing()).To(BeFalse())

// Mark as failing
node.MarkAsFailing()
Expect(node.Failing()).To(BeTrue())

// Should still be failing after 1 second (less than timeout)
time.Sleep(1 * time.Second)
Expect(node.Failing()).To(BeTrue())

// Should not be failing after timeout expires
time.Sleep(2 * time.Second) // Total 3 seconds > 2 second timeout
Expect(node.Failing()).To(BeFalse())
})

It("should handle zero timeout by using default", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 0 // Should use default
client = cluster.newClusterClient(ctx, opt)

// After initialization, should be set to default
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
})
})
16 changes: 11 additions & 5 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ type FailoverOptions struct {
DisableIdentity bool

IdentitySuffix string
UnstableResp3 bool

// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
// When a node is marked as failing, it will be avoided for this duration.
// Only applies to failover cluster clients. Default is 15 seconds.
FailingTimeoutSeconds int

UnstableResp3 bool
}

func (opt *FailoverOptions) clientOptions() *Options {
Expand Down Expand Up @@ -263,10 +269,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {

TLSConfig: opt.TLSConfig,

DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,

IdentitySuffix: opt.IdentitySuffix,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
}
}

Expand Down
17 changes: 12 additions & 5 deletions universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ type UniversalOptions struct {
DisableIdentity bool

IdentitySuffix string
UnstableResp3 bool

// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
// When a node is marked as failing, it will be avoided for this duration.
// Only applies to cluster clients. Default is 15 seconds.
FailingTimeoutSeconds int

UnstableResp3 bool

// IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint).
IsClusterMode bool
Expand Down Expand Up @@ -149,10 +155,11 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {

TLSConfig: o.TLSConfig,

DisableIdentity: o.DisableIdentity,
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
DisableIdentity: o.DisableIdentity,
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
FailingTimeoutSeconds: o.FailingTimeoutSeconds,
UnstableResp3: o.UnstableResp3,
}
}

Expand Down
Loading