Skip to content

Commit 668f578

Browse files
committed
still hunting this deadlock
1 parent 69569ea commit 668f578

File tree

4 files changed

+29
-56
lines changed

4 files changed

+29
-56
lines changed

hitless/pool_hook.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -388,24 +388,29 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
388388
}
389389

390390
select {
391-
case ph.handoffQueue <- request:
392-
// Store in pending map
393-
ph.pending.Store(request.ConnID, request.SeqID)
394-
return nil
391+
// priority to shutdown
395392
case <-ph.shutdown:
396-
ph.pending.Delete(request.ConnID)
397393
return errors.New("shutdown")
398394
default:
399-
// Queue is full - log and attempt scaling
400-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
401-
internal.Logger.Printf(context.Background(),
402-
"hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers",
403-
len(ph.handoffQueue), cap(ph.handoffQueue))
395+
select {
396+
case <-ph.shutdown:
397+
return errors.New("shutdown")
398+
case ph.handoffQueue <- request:
399+
// Store in pending map
400+
ph.pending.Store(request.ConnID, request.SeqID)
401+
return nil
402+
default:
403+
// Queue is full - log and attempt scaling
404+
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
405+
internal.Logger.Printf(context.Background(),
406+
"hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers",
407+
len(ph.handoffQueue), cap(ph.handoffQueue))
408+
}
404409
}
405-
406-
// Scale up workers to handle the load
407-
go ph.scaleUpWorkers()
408410
}
411+
412+
// Scale up workers to handle the load
413+
go ph.scaleUpWorkers()
409414
return errors.New("queue full")
410415
}
411416

redis.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -932,18 +932,14 @@ func NewClient(opt *Options) *Client {
932932
// Create connection pool with the hooks (nil if hitless upgrades disabled)
933933
c.connPool = newConnPool(opt, c.dialHook)
934934

935-
// Initialize hitless upgrades first if enabled to get the connection processor
936-
if opt.HitlessUpgradeConfig.IsEnabled() {
937-
if opt.Protocol != 3 {
938-
internal.Logger.Printf(context.Background(), "hitless: RESP3 protocol required for hitless upgrades, but Protocol is %d", opt.Protocol)
939-
} else {
940-
err := c.enableHitlessUpgrades()
941-
if err != nil {
942-
internal.Logger.Printf(context.Background(), "hitless: failed to initialize hitless upgrades: %v", err)
943-
if opt.HitlessUpgradeConfig.Enabled == hitless.MaintNotificationsEnabled {
944-
// panic so we fail fast without breaking existing clients api
945-
panic(fmt.Errorf("failed to enable hitless upgrades: %w", err))
946-
}
935+
// Initialize hitless upgrades first if enabled and protocol is RESP3
936+
if opt.HitlessUpgradeConfig.IsEnabled() && opt.Protocol == 3 {
937+
err := c.enableHitlessUpgrades()
938+
if err != nil {
939+
internal.Logger.Printf(context.Background(), "hitless: failed to initialize hitless upgrades: %v", err)
940+
if opt.HitlessUpgradeConfig.Enabled == hitless.MaintNotificationsEnabled {
941+
// panic so we fail fast without breaking existing clients api
942+
panic(fmt.Errorf("failed to enable hitless upgrades: %w", err))
947943
}
948944
}
949945
}

search_test.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
. "github.com/bsm/gomega"
1414
"github.com/redis/go-redis/v9"
1515
"github.com/redis/go-redis/v9/helper"
16-
"github.com/redis/go-redis/v9/hitless"
1716
)
1817

1918
func WaitForIndexing(c *redis.Client, index string) {
@@ -3334,14 +3333,7 @@ var _ = Describe("RediSearch FT.Config with Resp2 and Resp3", Label("search", "N
33343333
var clientResp3 *redis.Client
33353334
BeforeEach(func() {
33363335
clientResp2 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 2})
3337-
clientResp3 = redis.NewClient(&redis.Options{
3338-
Addr: ":6379",
3339-
Protocol: 3,
3340-
UnstableResp3: true,
3341-
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
3342-
Enabled: hitless.MaintNotificationsDisabled,
3343-
},
3344-
})
3336+
clientResp3 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3, UnstableResp3: true})
33453337
Expect(clientResp3.FlushDB(ctx).Err()).NotTo(HaveOccurred())
33463338
})
33473339

@@ -3381,21 +3373,8 @@ var _ = Describe("RediSearch commands Resp 3", Label("search"), func() {
33813373
var client2 *redis.Client
33823374

33833375
BeforeEach(func() {
3384-
client = redis.NewClient(&redis.Options{
3385-
Addr: ":6379",
3386-
Protocol: 3,
3387-
UnstableResp3: true,
3388-
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
3389-
Enabled: hitless.MaintNotificationsDisabled,
3390-
},
3391-
})
3392-
client2 = redis.NewClient(&redis.Options{
3393-
Addr: ":6379",
3394-
Protocol: 3,
3395-
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
3396-
Enabled: hitless.MaintNotificationsDisabled,
3397-
},
3398-
})
3376+
client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3, UnstableResp3: true})
3377+
client2 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3})
33993378
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
34003379
})
34013380

universal_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
. "github.com/bsm/gomega"
66

77
"github.com/redis/go-redis/v9"
8-
"github.com/redis/go-redis/v9/hitless"
98
)
109

1110
var _ = Describe("UniversalClient", func() {
@@ -54,9 +53,6 @@ var _ = Describe("UniversalClient", func() {
5453
Addrs: cluster.addrs(),
5554
Protocol: 3,
5655
UnstableResp3: true,
57-
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
58-
Enabled: hitless.MaintNotificationsDisabled,
59-
},
6056
})
6157
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
6258
a := func() { client.FTInfo(ctx, "all").Result() }
@@ -68,9 +64,6 @@ var _ = Describe("UniversalClient", func() {
6864
Addrs: cluster.addrs(),
6965
Protocol: 3,
7066
UnstableResp3: true,
71-
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
72-
Enabled: hitless.MaintNotificationsDisabled,
73-
},
7467
})
7568
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
7669
a := func() { client.FTInfo(ctx, "all").Result() }

0 commit comments

Comments
 (0)