Skip to content

Commit 01844e6

Browse files
committed
wip
1 parent 6fb87ca commit 01844e6

File tree

4 files changed

+39
-20
lines changed

4 files changed

+39
-20
lines changed

hitless/example_hooks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, not
3030

3131
// Store start time in context for duration calculation
3232
startTime := time.Now()
33-
ctx = context.WithValue(ctx, "start_time", startTime)
33+
_ = context.WithValue(ctx, "start_time", startTime) // Context not used further
3434

3535
return notification, true
3636
}

hitless/notification_handler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
8585
}
8686

8787
newEndpoint := ""
88-
ok = false
8988
if len(notification) > 3 {
9089
// Extract new endpoint
9190
newEndpoint, ok = notification[3].(string)
@@ -119,7 +118,10 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
119118
// do this in a goroutine to avoid blocking the notification handler
120119
go func() {
121120
time.Sleep(time.Duration(timeS/2) * time.Second)
122-
snh.markConnForHandoff(poolConn, newEndpoint, seqID, deadline)
121+
if err := snh.markConnForHandoff(poolConn, newEndpoint, seqID, deadline); err != nil {
122+
// Log error but don't fail the goroutine
123+
internal.Logger.Printf(context.Background(), "hitless: failed to mark connection for handoff: %v", err)
124+
}
123125
}()
124126
return nil
125127
}

hitless/pool_hook.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,6 @@ func (ph *PoolHook) GetScaleLevel() int {
124124
return ph.scaleLevel
125125
}
126126

127-
// log logs a message if the log level is appropriate
128-
func (ph *PoolHook) log(level int, message string) {
129-
if ph.config.LogLevel >= level {
130-
internal.Logger.Printf(context.Background(), message)
131-
}
132-
}
133127

134128
// IsHandoffPending returns true if the given connection has a pending handoff
135129
func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool {
@@ -172,7 +166,10 @@ func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool
172166
internal.Logger.Printf(ctx, "Failed to queue handoff: %v", err)
173167
return false, true, nil // Don't pool, remove connection, no error to caller
174168
}
175-
conn.MarkQueuedForHandoff()
169+
if err := conn.MarkQueuedForHandoff(); err != nil {
170+
// If marking fails, remove the connection instead
171+
return false, true, nil
172+
}
176173
return true, false, nil
177174
}
178175
}

redis.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,16 +216,21 @@ type baseClient struct {
216216
pushProcessor push.NotificationProcessor
217217

218218
// Hitless upgrade manager
219-
hitlessManager *hitless.HitlessManager
219+
hitlessManager *hitless.HitlessManager
220+
hitlessManagerLock sync.RWMutex
220221
}
221222

222223
func (c *baseClient) clone() *baseClient {
224+
c.hitlessManagerLock.RLock()
225+
hitlessManager := c.hitlessManager
226+
c.hitlessManagerLock.RUnlock()
227+
223228
clone := &baseClient{
224229
opt: c.opt,
225230
connPool: c.connPool,
226231
onClose: c.onClose,
227232
pushProcessor: c.pushProcessor,
228-
hitlessManager: c.hitlessManager,
233+
hitlessManager: hitlessManager,
229234
}
230235
return clone
231236
}
@@ -458,7 +463,10 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
458463
return fmt.Errorf("failed to enable maintenance notifications: %w", hitlessHandshakeErr)
459464
case hitless.MaintNotificationsAuto:
460465
// auto mode, disable hitless upgrades and continue
461-
c.disableHitlessUpgrades()
466+
if err := c.disableHitlessUpgrades(); err != nil {
467+
// Log error but continue - auto mode should be resilient
468+
internal.Logger.Printf(ctx, "hitless: failed to disable hitless upgrades in auto mode: %v", err)
469+
}
462470
c.optLock.RUnlock()
463471
c.optLock.Lock()
464472
c.opt.HitlessUpgradeConfig.Enabled = hitless.MaintNotificationsDisabled
@@ -663,13 +671,20 @@ func (c *baseClient) enableHitlessUpgrades() error {
663671
if err != nil {
664672
return err
665673
}
666-
// Set the manager reference
674+
// Set the manager reference and initialize pool hook
675+
c.hitlessManagerLock.Lock()
667676
c.hitlessManager = manager
668-
c.hitlessManager.InitPoolHook(c.dialHook)
677+
c.hitlessManagerLock.Unlock()
678+
679+
// Initialize pool hook (safe to call without lock since manager is now set)
680+
manager.InitPoolHook(c.dialHook)
669681
return nil
670682
}
671683

672684
func (c *baseClient) disableHitlessUpgrades() error {
685+
c.hitlessManagerLock.Lock()
686+
defer c.hitlessManagerLock.Unlock()
687+
673688
// Close the hitless manager
674689
if c.hitlessManager != nil {
675690
// Closing the manager will also shutdown the pool hook
@@ -686,8 +701,14 @@ func (c *baseClient) disableHitlessUpgrades() error {
686701
// long-lived and shared between many goroutines.
687702
func (c *baseClient) Close() error {
688703
var firstErr error
704+
705+
// Close hitless manager first
706+
if err := c.disableHitlessUpgrades(); err != nil {
707+
firstErr = err
708+
}
709+
689710
if c.onClose != nil {
690-
if err := c.onClose(); err != nil {
711+
if err := c.onClose(); err != nil && firstErr == nil {
691712
firstErr = err
692713
}
693714
}
@@ -957,6 +978,8 @@ func (c *Client) Options() *Options {
957978
// GetHitlessManager returns the hitless manager instance for monitoring and control.
958979
// Returns nil if hitless upgrades are not enabled.
959980
func (c *Client) GetHitlessManager() *hitless.HitlessManager {
981+
c.hitlessManagerLock.RLock()
982+
defer c.hitlessManagerLock.RUnlock()
960983
return c.hitlessManager
961984
}
962985

@@ -1241,7 +1264,4 @@ func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.Notifica
12411264
}
12421265
}
12431266

1244-
// initializeHitlessManager initializes hitless upgrade manager for a client.
1245-
func initializeHitlessManager(client *baseClient, config *HitlessUpgradeConfig) (*hitless.HitlessManager, error) {
1246-
return nil, nil // TODO: Implement hitless manager initialization
1247-
}
1267+

0 commit comments

Comments
 (0)