Skip to content

Commit 7bec972

Browse files
committed
update config
1 parent 5f608ca commit 7bec972

File tree

7 files changed

+108
-52
lines changed

7 files changed

+108
-52
lines changed

.github/copilot-instructions.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,19 @@ func (opt *Options) init() {
307307
}
308308
```
309309

310+
### Hitless Upgrade Scaling
311+
312+
**MaxWorkers**: `min(10, max(1, PoolSize/3))`
313+
- Scales with pool size but caps at 10 workers
314+
- Minimum 1 worker for very small pools
315+
- Minimum 10 when explicitly set
316+
317+
**HandoffQueueSize**: `max(8×MaxWorkers, max(50, PoolSize/2))` capped by `2×PoolSize`
318+
- Hybrid scaling: worker-based (8 per worker) and pool-based (PoolSize/2)
319+
- Takes the larger value for optimal burst handling
320+
- Minimum 50 when explicitly set
321+
- Memory-efficient cap at 2× pool size
322+
310323
### Push Notification Handling
311324

312325
```go

hitless/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,17 @@ Config: &hitless.Config{
4646
- **On-demand**: Workers created when needed, cleaned up when idle
4747

4848
### Queue Sizing
49-
- **Auto-calculated**: `10 × MaxWorkers`, capped by pool size
50-
- **Always capped**: Queue size never exceeds pool size
49+
- **Auto-calculated**: `max(8 × MaxWorkers, max(50, PoolSize/2))` - hybrid scaling
50+
- Worker-based: 8 handoffs per worker for burst processing
51+
- Pool-based: Scales with pool size (minimum 50, up to PoolSize/2)
52+
- Takes the larger of the two for optimal performance
53+
- **Explicit values**: `max(50, set_value)` - enforces minimum 50 when set
54+
- **Always capped**: Queue size never exceeds `2 × PoolSize` for memory efficiency
55+
56+
**Examples:**
57+
- Pool 10: Queue 50 (max(8×3, max(50, 5)) = max(24, 50) = 50)
58+
- Pool 100: Queue 80 (max(8×10, max(50, 50)) = max(80, 50) = 80)
59+
- Pool 200: Queue 100 (max(8×10, max(50, 100)) = max(80, 100) = 100)
5160

5261
## Notification Hooks
5362

hitless/config.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,10 @@ type Config struct {
9797

9898
// HandoffQueueSize is the size of the buffered channel used to queue handoff requests.
9999
// If the queue is full, new handoff requests will be rejected.
100-
// Always capped by pool size since you can't handoff more connections than exist.
100+
// Scales with both worker count and pool size for better burst handling.
101101
//
102-
// Default: 10x max workers, capped by pool size, min 2
102+
// Default: max(8x workers, max(50, PoolSize/2)), capped by 2x pool size
103+
// When set: min 50, capped by 2x pool size
103104
HandoffQueueSize int
104105

105106
// PostHandoffRelaxedDuration is how long to keep relaxed timeouts on the new connection
@@ -230,32 +231,27 @@ func (c *Config) ApplyDefaultsWithPoolSize(poolSize int) *Config {
230231
result.HandoffTimeout = c.HandoffTimeout
231232
}
232233

233-
// Apply defaults for integer fields (zero means not set)
234-
if c.HandoffQueueSize <= 0 {
235-
result.HandoffQueueSize = defaults.HandoffQueueSize
236-
} else {
237-
result.HandoffQueueSize = c.HandoffQueueSize
238-
}
239-
240234
// Copy worker configuration
241235
result.MaxWorkers = c.MaxWorkers
242236

243237
// Apply worker defaults based on pool size
244238
result.applyWorkerDefaults(poolSize)
245239

246-
// Apply queue size defaults based on max workers, capped by pool size
240+
// Apply queue size defaults with hybrid scaling approach
247241
if c.HandoffQueueSize <= 0 {
248-
// Queue size: 10x max workers, but never more than pool size
249-
workerBasedSize := result.MaxWorkers * 10
250-
result.HandoffQueueSize = util.Min(workerBasedSize, poolSize)
242+
// Default: max(8x workers, max(50, PoolSize/2)), capped by 2x pool size
243+
workerBasedSize := result.MaxWorkers * 8
244+
poolBasedSize := util.Max(50, poolSize/2)
245+
result.HandoffQueueSize = util.Max(workerBasedSize, poolBasedSize)
251246
} else {
252-
result.HandoffQueueSize = c.HandoffQueueSize
247+
// When explicitly set: enforce minimum of 50
248+
result.HandoffQueueSize = util.Max(50, c.HandoffQueueSize)
253249
}
254250

255-
// Always cap queue size by pool size - no point having more queue slots than connections
256-
result.HandoffQueueSize = util.Min(result.HandoffQueueSize, poolSize)
251+
// Always cap queue size by 2x pool size - balances burst capacity with memory efficiency
252+
result.HandoffQueueSize = util.Min(result.HandoffQueueSize, poolSize*2)
257253

258-
// Ensure minimum queue size of 2
254+
// Ensure minimum queue size of 2 (fallback for very small pools)
259255
if result.HandoffQueueSize < 2 {
260256
result.HandoffQueueSize = 2
261257
}

hitless/config_test.go

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,15 @@ func TestApplyDefaults(t *testing.T) {
116116
t.Errorf("Expected MaxWorkers to be > 0 after applying defaults, got %d", result.MaxWorkers)
117117
}
118118

119-
// HandoffQueueSize should be auto-calculated (10 * MaxWorkers, capped by pool size)
120-
workerBasedSize := result.MaxWorkers * 10
119+
// HandoffQueueSize should be auto-calculated with hybrid scaling
120+
workerBasedSize := result.MaxWorkers * 8
121121
poolSize := 100 // Default pool size used in ApplyDefaults
122-
expectedQueueSize := util.Min(workerBasedSize, poolSize)
122+
poolBasedSize := util.Max(50, poolSize/2)
123+
expectedQueueSize := util.Max(workerBasedSize, poolBasedSize)
124+
expectedQueueSize = util.Min(expectedQueueSize, poolSize*2) // Cap by 2x pool size
123125
if result.HandoffQueueSize != expectedQueueSize {
124-
t.Errorf("Expected HandoffQueueSize to be %d (util.Min(10*MaxWorkers=%d, poolSize=%d)), got %d",
125-
expectedQueueSize, workerBasedSize, poolSize, result.HandoffQueueSize)
126+
t.Errorf("Expected HandoffQueueSize to be %d (max(8*MaxWorkers=%d, max(50, poolSize/2=%d)) capped by 2*poolSize=%d), got %d",
127+
expectedQueueSize, workerBasedSize, poolBasedSize, poolSize*2, result.HandoffQueueSize)
126128
}
127129
})
128130

@@ -139,24 +141,50 @@ func TestApplyDefaults(t *testing.T) {
139141
t.Errorf("Expected MaxWorkers to be 12 (explicitly set), got %d", result.MaxWorkers)
140142
}
141143

142-
// Should apply default for unset fields (auto-calculated queue size, capped by pool size)
143-
workerBasedSize := result.MaxWorkers * 10
144+
// Should apply default for unset fields (auto-calculated queue size with hybrid scaling)
145+
workerBasedSize := result.MaxWorkers * 8
144146
poolSize := 100 // Default pool size used in ApplyDefaults
145-
expectedQueueSize := util.Min(workerBasedSize, poolSize)
147+
poolBasedSize := util.Max(50, poolSize/2)
148+
expectedQueueSize := util.Max(workerBasedSize, poolBasedSize)
149+
expectedQueueSize = util.Min(expectedQueueSize, poolSize*2) // Cap by 2x pool size
146150
if result.HandoffQueueSize != expectedQueueSize {
147-
t.Errorf("Expected HandoffQueueSize to be %d (util.Min(10*MaxWorkers=%d, poolSize=%d)), got %d",
148-
expectedQueueSize, workerBasedSize, poolSize, result.HandoffQueueSize)
151+
t.Errorf("Expected HandoffQueueSize to be %d (max(8*MaxWorkers=%d, max(50, poolSize/2=%d)) capped by 2*poolSize=%d), got %d",
152+
expectedQueueSize, workerBasedSize, poolBasedSize, poolSize*2, result.HandoffQueueSize)
149153
}
150154

151-
// Test explicit queue size capping by pool size
155+
// Test explicit queue size capping by 2x pool size
152156
configWithLargeQueue := &Config{
153157
MaxWorkers: 5,
154-
HandoffQueueSize: 1000, // Much larger than pool size
158+
HandoffQueueSize: 1000, // Much larger than 2x pool size
155159
}
156160

157161
resultCapped := configWithLargeQueue.ApplyDefaultsWithPoolSize(20) // Small pool size
158-
if resultCapped.HandoffQueueSize != 20 {
159-
t.Errorf("Expected HandoffQueueSize to be capped by pool size (20), got %d", resultCapped.HandoffQueueSize)
162+
expectedCap := 20 * 2 // 2x pool size = 40
163+
if resultCapped.HandoffQueueSize != expectedCap {
164+
t.Errorf("Expected HandoffQueueSize to be capped by 2x pool size (%d), got %d", expectedCap, resultCapped.HandoffQueueSize)
165+
}
166+
167+
// Test explicit queue size minimum enforcement
168+
configWithSmallQueue := &Config{
169+
MaxWorkers: 5,
170+
HandoffQueueSize: 10, // Below minimum of 50
171+
}
172+
173+
resultMinimum := configWithSmallQueue.ApplyDefaultsWithPoolSize(100) // Large pool size
174+
if resultMinimum.HandoffQueueSize != 50 {
175+
t.Errorf("Expected HandoffQueueSize to be enforced minimum (50), got %d", resultMinimum.HandoffQueueSize)
176+
}
177+
178+
// Test that large explicit values are capped by 2x pool size
179+
configWithVeryLargeQueue := &Config{
180+
MaxWorkers: 5,
181+
HandoffQueueSize: 500, // Much larger than 2x pool size
182+
}
183+
184+
resultVeryLarge := configWithVeryLargeQueue.ApplyDefaultsWithPoolSize(100) // Pool size 100
185+
expectedVeryLargeCap := 100 * 2 // 2x pool size = 200
186+
if resultVeryLarge.HandoffQueueSize != expectedVeryLargeCap {
187+
t.Errorf("Expected very large HandoffQueueSize to be capped by 2x pool size (%d), got %d", expectedVeryLargeCap, resultVeryLarge.HandoffQueueSize)
160188
}
161189

162190
if result.RelaxedTimeout != 10*time.Second {
@@ -183,13 +211,15 @@ func TestApplyDefaults(t *testing.T) {
183211
t.Errorf("Expected MaxWorkers to be > 0 (auto-calculated), got %d", result.MaxWorkers)
184212
}
185213

186-
// HandoffQueueSize should be auto-calculated (10 * MaxWorkers, capped by pool size)
187-
workerBasedSize := result.MaxWorkers * 10
214+
// HandoffQueueSize should be auto-calculated with hybrid scaling
215+
workerBasedSize := result.MaxWorkers * 8
188216
poolSize := 100 // Default pool size used in ApplyDefaults
189-
expectedQueueSize := util.Min(workerBasedSize, poolSize)
217+
poolBasedSize := util.Max(50, poolSize/2)
218+
expectedQueueSize := util.Max(workerBasedSize, poolBasedSize)
219+
expectedQueueSize = util.Min(expectedQueueSize, poolSize*2) // Cap by 2x pool size
190220
if result.HandoffQueueSize != expectedQueueSize {
191-
t.Errorf("Expected HandoffQueueSize to be %d (util.Min(10*MaxWorkers=%d, poolSize=%d)), got %d",
192-
expectedQueueSize, workerBasedSize, poolSize, result.HandoffQueueSize)
221+
t.Errorf("Expected HandoffQueueSize to be %d (max(8*MaxWorkers=%d, max(50, poolSize/2=%d)) capped by 2*poolSize=%d), got %d",
222+
expectedQueueSize, workerBasedSize, poolBasedSize, poolSize*2, result.HandoffQueueSize)
193223
}
194224

195225
if result.RelaxedTimeout != 10*time.Second {
@@ -294,19 +324,21 @@ func TestIntegrationWithApplyDefaults(t *testing.T) {
294324
t.Errorf("Expected LogLevel to be 2, got %d", expectedConfig.LogLevel)
295325
}
296326

297-
// Should apply defaults for missing fields (auto-calculated queue size, capped by pool size)
298-
workerBasedSize := expectedConfig.MaxWorkers * 10
327+
// Should apply defaults for missing fields (auto-calculated queue size with hybrid scaling)
328+
workerBasedSize := expectedConfig.MaxWorkers * 8
299329
poolSize := 100 // Default pool size used in ApplyDefaults
300-
expectedQueueSize := util.Min(workerBasedSize, poolSize)
330+
poolBasedSize := util.Max(50, poolSize/2)
331+
expectedQueueSize := util.Max(workerBasedSize, poolBasedSize)
332+
expectedQueueSize = util.Min(expectedQueueSize, poolSize*2) // Cap by 2x pool size
301333
if expectedConfig.HandoffQueueSize != expectedQueueSize {
302-
t.Errorf("Expected HandoffQueueSize to be %d (util.Min(10*MaxWorkers=%d, poolSize=%d)), got %d",
303-
expectedQueueSize, workerBasedSize, poolSize, expectedConfig.HandoffQueueSize)
334+
t.Errorf("Expected HandoffQueueSize to be %d (max(8*MaxWorkers=%d, max(50, poolSize/2=%d)) capped by 2*poolSize=%d), got %d",
335+
expectedQueueSize, workerBasedSize, poolBasedSize, poolSize*2, expectedConfig.HandoffQueueSize)
304336
}
305337

306-
// Test that queue size is always capped by pool size
307-
if expectedConfig.HandoffQueueSize > poolSize {
308-
t.Errorf("HandoffQueueSize (%d) should never exceed pool size (%d)",
309-
expectedConfig.HandoffQueueSize, poolSize)
338+
// Test that queue size is always capped by 2x pool size
339+
if expectedConfig.HandoffQueueSize > poolSize*2 {
340+
t.Errorf("HandoffQueueSize (%d) should never exceed 2x pool size (%d)",
341+
expectedConfig.HandoffQueueSize, poolSize*2)
310342
}
311343

312344
if expectedConfig.RelaxedTimeout != 10*time.Second {

hitless/pool_hook.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
273273
afterTime = handoffTimeout / 2
274274
}
275275

276-
internal.Logger.Printf(context.Background(), "Handoff failed for connection WILL RETRY After %v: %v", afterTime, err)
276+
internal.Logger.Printf(context.Background(), "Handoff failed for conn[%d] WILL RETRY After %v: %v", request.ConnID, afterTime, err)
277277
time.AfterFunc(afterTime, func() {
278278
if err := ph.queueHandoff(request.Conn); err != nil {
279279
internal.Logger.Printf(context.Background(), "can't queue handoff for retry: %v", err)

internal/pool/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
289289
if pooled {
290290
// If pool is full remove the cn on next Put.
291291
currentPoolSize := p.poolSize.Load()
292-
if currentPoolSize >= int32(p.cfg.PoolSize) {
292+
if currentPoolSize >= p.cfg.PoolSize {
293293
cn.pooled = false
294294
} else {
295295
p.poolSize.Add(1)

osscluster.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/redis/go-redis/v9/internal/pool"
2121
"github.com/redis/go-redis/v9/internal/proto"
2222
"github.com/redis/go-redis/v9/internal/rand"
23+
"github.com/redis/go-redis/v9/push"
2324
)
2425

2526
const (
@@ -126,6 +127,10 @@ type ClusterOptions struct {
126127
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
127128
UnstableResp3 bool
128129

130+
// PushNotificationProcessor is the processor for handling push notifications.
131+
// If nil, a default processor will be created for RESP3 connections.
132+
PushNotificationProcessor push.NotificationProcessor
133+
129134
// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
130135
// When a node is marked as failing, it will be avoided for this duration.
131136
// Default is 15 seconds.
@@ -376,9 +381,10 @@ func (opt *ClusterOptions) clientOptions() *Options {
376381
// much use for ClusterSlots config). This means we cannot execute the
377382
// READONLY command against that node -- setting readOnly to false in such
378383
// situations in the options below will prevent that from happening.
379-
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
380-
UnstableResp3: opt.UnstableResp3,
381-
HitlessUpgradeConfig: hitlessConfig,
384+
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
385+
UnstableResp3: opt.UnstableResp3,
386+
HitlessUpgradeConfig: hitlessConfig,
387+
PushNotificationProcessor: opt.PushNotificationProcessor,
382388
}
383389
}
384390

0 commit comments

Comments
 (0)