Skip to content

Commit b5c1ced

Browse files
committed
improve logging
1 parent 7bec972 commit b5c1ced

File tree

7 files changed

+52
-34
lines changed

7 files changed

+52
-34
lines changed

hitless/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package hitless
22

33
import (
4+
"context"
45
"net"
56
"runtime"
67
"strings"
78
"time"
89

10+
"github.com/redis/go-redis/v9/internal"
911
"github.com/redis/go-redis/v9/internal/util"
1012
)
1113

@@ -279,6 +281,11 @@ func (c *Config) ApplyDefaultsWithPoolSize(poolSize int) *Config {
279281
result.MaxHandoffRetries = c.MaxHandoffRetries
280282
}
281283

284+
if result.LogLevel >= 3 {
285+
result.LogLevel = 3
286+
internal.Logger.Printf(context.Background(), "hitless: debug logging enabled")
287+
internal.Logger.Printf(context.Background(), "hitless: config: %+v", result)
288+
}
282289
return result
283290
}
284291

hitless/hitless_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ func (hm *HitlessManager) TrackMovingOperationWithConnID(ctx context.Context, ne
148148
// Use LoadOrStore for atomic check-and-set operation
149149
if _, loaded := hm.activeMovingOps.LoadOrStore(key, movingOp); loaded {
150150
// Duplicate MOVING notification, ignore
151-
internal.Logger.Printf(ctx, "Duplicate MOVING operation ignored: %s", key.String())
151+
if hm.config.LogLevel >= 3 { // Warning level
152+
internal.Logger.Printf(ctx, "hitless: conn[%d] seqID[%d] Duplicate MOVING operation ignored: %s", connID, seqID, key.String())
153+
}
152154
return nil
153155
}
154156

@@ -170,6 +172,10 @@ func (hm *HitlessManager) UntrackOperationWithConnID(seqID int64, connID uint64)
170172
if _, loaded := hm.activeMovingOps.LoadAndDelete(key); loaded {
171173
// Decrement active operation count only if operation existed
172174
hm.activeOperationCount.Add(-1)
175+
} else {
176+
if hm.config.LogLevel >= 3 { // Warning level
177+
internal.Logger.Printf(context.Background(), "hitless: conn[%d] seqID[%d] Operation not found for untracking: %s", connID, seqID, key.String())
178+
}
173179
}
174180
}
175181

hitless/notification_handler.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"github.com/redis/go-redis/v9/internal"
9-
"github.com/redis/go-redis/v9/internal/interfaces"
109
"github.com/redis/go-redis/v9/internal/pool"
1110
"github.com/redis/go-redis/v9/push"
1211
)
@@ -107,6 +106,10 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
107106
deadline := time.Now().Add(time.Duration(timeS) * time.Second)
108107
// If newEndpoint is empty, we should schedule a handoff to the current endpoint in timeS/2 seconds
109108
if newEndpoint == "" || newEndpoint == internal.RedisNull {
109+
if snh.manager.config.LogLevel >= 3 { // Debug level
110+
internal.Logger.Printf(ctx, "hitless: conn[%d] scheduling handoff to current endpoint in %v seconds",
111+
poolConn.GetID(), timeS/2)
112+
}
110113
// same as current endpoint
111114
newEndpoint = snh.manager.options.GetAddr()
112115
// delay the handoff for timeS/2 seconds to the same endpoint
@@ -137,7 +140,6 @@ func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint
137140
// Optionally track in hitless manager for monitoring/debugging
138141
if snh.manager != nil {
139142
connID := conn.GetID()
140-
141143
// Track the operation (ignore errors since this is optional)
142144
_ = snh.manager.TrackMovingOperationWithConnID(context.Background(), newEndpoint, deadline, seqID, connID)
143145
} else {
@@ -160,14 +162,18 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx
160162
return ErrInvalidNotification
161163
}
162164

163-
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
165+
conn, ok := handlerCtx.Conn.(*pool.Conn)
164166
if !ok {
165167
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification")
166168
return ErrInvalidNotification
167169
}
168170

169171
// Apply relaxed timeout to this specific connection
170-
connAdapter.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
172+
if snh.manager.config.LogLevel >= 3 { // Debug level
173+
internal.Logger.Printf(ctx, "hitless: conn[%d] applying relaxed timeout (%v) for MIGRATING notification",
174+
snh.manager.config.RelaxedTimeout, conn.GetID())
175+
}
176+
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
171177
return nil
172178
}
173179

@@ -185,14 +191,18 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p
185191
return ErrInvalidNotification
186192
}
187193

188-
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
194+
conn, ok := handlerCtx.Conn.(*pool.Conn)
189195
if !ok {
190196
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification")
191197
return ErrInvalidNotification
192198
}
193199

194200
// Clear relaxed timeout for this specific connection
195-
connAdapter.ClearRelaxedTimeout()
201+
if snh.manager.config.LogLevel >= 3 { // Debug level
202+
connID := conn.GetID()
203+
internal.Logger.Printf(ctx, "hitless: conn[%d] clearing relaxed timeout for MIGRATED notification", connID)
204+
}
205+
conn.ClearRelaxedTimeout()
196206
return nil
197207
}
198208

@@ -210,14 +220,18 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt
210220
return ErrInvalidNotification
211221
}
212222

213-
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
223+
conn, ok := handlerCtx.Conn.(*pool.Conn)
214224
if !ok {
215225
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification")
216226
return ErrInvalidNotification
217227
}
218228

219229
// Apply relaxed timeout to this specific connection
220-
connAdapter.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
230+
if snh.manager.config.LogLevel >= 3 { // Debug level
231+
connID := conn.GetID()
232+
internal.Logger.Printf(ctx, "hitless: conn[%d] applying relaxed timeout (%v) for FAILING_OVER notification", connID, snh.manager.config.RelaxedTimeout)
233+
}
234+
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
221235
return nil
222236
}
223237

@@ -235,13 +249,17 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
235249
return ErrInvalidNotification
236250
}
237251

238-
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
252+
conn, ok := handlerCtx.Conn.(*pool.Conn)
239253
if !ok {
240254
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification")
241255
return ErrInvalidNotification
242256
}
243257

244258
// Clear relaxed timeout for this specific connection
245-
connAdapter.ClearRelaxedTimeout()
259+
if snh.manager.config.LogLevel >= 3 { // Debug level
260+
connID := conn.GetID()
261+
internal.Logger.Printf(ctx, "hitless: conn[%d] clearing relaxed timeout for FAILED_OVER notification", connID)
262+
}
263+
conn.ClearRelaxedTimeout()
246264
return nil
247265
}

hitless/pool_hook.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ func NewPoolHook(baseDialer func(context.Context, string, string) (net.Conn, err
6868

6969
// NewPoolHookWithPoolSize creates a new pool hook with pool size for better worker defaults
7070
func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, hitlessManager HitlessManagerInterface, poolSize int) *PoolHook {
71-
// Apply defaults to any missing configuration fields, using pool size for worker calculations
72-
config = config.ApplyDefaultsWithPoolSize(poolSize)
71+
// Apply defaults if config is nil or has zero values
72+
if config == nil {
73+
config = config.ApplyDefaultsWithPoolSize(poolSize)
74+
}
7375

7476
ph := &PoolHook{
7577
// baseDialer is used to create connections to new endpoints during handoffs
@@ -86,9 +88,6 @@ func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (n
8688
// Hitless manager for operation completion tracking
8789
hitlessManager: hitlessManager,
8890
}
89-
90-
// No upfront worker creation - workers are created on demand
91-
9291
return ph
9392
}
9493

hitless/pool_hook_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ func TestConnectionHook(t *testing.T) {
528528
config := &Config{
529529
MaxWorkers: 2,
530530
HandoffQueueSize: 10,
531+
MaxHandoffRetries: 3, // Allow retries for successful handoff
531532
PostHandoffRelaxedDuration: 100 * time.Millisecond, // Fast expiration for testing
532533
RelaxedTimeout: 5 * time.Second,
533534
LogLevel: 2,
@@ -805,6 +806,7 @@ func TestConnectionHook(t *testing.T) {
805806
config := &Config{
806807
MaxWorkers: 2,
807808
HandoffQueueSize: 10,
809+
MaxHandoffRetries: 3, // Allow retries for successful handoff
808810
RelaxedTimeout: 5 * time.Second,
809811
PostHandoffRelaxedDuration: 100 * time.Millisecond, // Short for testing
810812
}

internal/interfaces/interfaces.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
"time"
99
)
1010

11-
// Forward declaration to avoid circular imports
11+
// NotificationProcessor is (most probably) a push.NotificationProcessor
12+
// forward declaration to avoid circular imports
1213
type NotificationProcessor interface {
1314
RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error
1415
UnregisterHandler(pushNotificationName string) error
@@ -25,6 +26,7 @@ type ClientInterface interface {
2526
}
2627

2728
// OptionsInterface defines the interface for client options.
29+
// Uses an adapter pattern to avoid circular dependencies.
2830
type OptionsInterface interface {
2931
// GetReadTimeout returns the read timeout.
3032
GetReadTimeout() time.Duration
@@ -50,18 +52,3 @@ type OptionsInterface interface {
5052
// NewDialer returns a new dialer function for the connection.
5153
NewDialer() func(context.Context) (net.Conn, error)
5254
}
53-
54-
// ConnectionWithRelaxedTimeout defines the interface for connections that support relaxed timeout adjustment.
55-
// This is used by the hitless upgrade system for per-connection timeout management.
56-
type ConnectionWithRelaxedTimeout interface {
57-
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
58-
// These timeouts remain active until explicitly cleared.
59-
SetRelaxedTimeout(readTimeout, writeTimeout time.Duration)
60-
61-
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
62-
// After the deadline, timeouts automatically revert to normal values.
63-
SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time)
64-
65-
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
66-
ClearRelaxedTimeout()
67-
}

pubsub.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,11 +485,10 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
485485
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
486486
// Log the error but don't fail the command execution
487487
// Push notification processing errors shouldn't break normal Redis operations
488-
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
488+
internal.Logger.Printf(ctx, "push: conn[%d] error processing pending notifications before reading reply: %v", cn.GetID(), err)
489489
}
490490
return c.cmd.readReply(rd)
491491
})
492-
493492
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
494493

495494
if err != nil {

0 commit comments

Comments
 (0)