Skip to content

Commit 4631320

Browse files
committed
proper notification format
1 parent a247360 commit 4631320

File tree

7 files changed

+190
-88
lines changed

7 files changed

+190
-88
lines changed

cluster_smigrating_test.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,29 +45,35 @@ func TestClusterClientSMigratedCallback(t *testing.T) {
4545
return
4646
}
4747

48-
// Temporarily replace the cluster state reload with our test version
49-
var receivedSlot int
50-
originalCallback := manager
51-
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
48+
// Set up cluster state reload callback for testing
49+
var receivedHostPort string
50+
var receivedSlotRanges []string
51+
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
5252
reloadCalled.Store(true)
53-
receivedSlot = slot
53+
receivedHostPort = hostPort
54+
receivedSlotRanges = slotRanges
5455
})
5556

5657
// Trigger the callback (this is what SMIGRATED notification would do)
5758
ctx := context.Background()
58-
testSlot := 1234
59-
manager.TriggerClusterStateReload(ctx, testSlot)
59+
testHostPort := "127.0.0.1:6379"
60+
testSlotRanges := []string{"1234", "5000-6000"}
61+
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)
6062

6163
// Verify callback was called
6264
if !reloadCalled.Load() {
6365
t.Error("Cluster state reload callback should have been called")
6466
}
6567

66-
// Verify slot was passed correctly
67-
if receivedSlot != testSlot {
68-
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
68+
// Verify host:port was passed correctly
69+
if receivedHostPort != testHostPort {
70+
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
71+
}
72+
73+
// Verify slot ranges were passed correctly
74+
if len(receivedSlotRanges) != len(testSlotRanges) {
75+
t.Errorf("Expected %d slot ranges, got %d", len(testSlotRanges), len(receivedSlotRanges))
6976
}
70-
_ = originalCallback
7177
})
7278

7379
t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) {
@@ -121,22 +127,29 @@ func TestClusterClientSMigratedIntegration(t *testing.T) {
121127
// We can't directly test LazyReload being called without a real cluster,
122128
// but we can verify the callback mechanism works
123129
var callbackWorks atomic.Bool
124-
var receivedSlot int
125-
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
130+
var receivedHostPort string
131+
var receivedSlotRanges []string
132+
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
126133
callbackWorks.Store(true)
127-
receivedSlot = slot
134+
receivedHostPort = hostPort
135+
receivedSlotRanges = slotRanges
128136
})
129137

130138
ctx := context.Background()
131-
testSlot := 5678
132-
manager.TriggerClusterStateReload(ctx, testSlot)
139+
testHostPort := "127.0.0.1:7000"
140+
testSlotRanges := []string{"5678"}
141+
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)
133142

134143
if !callbackWorks.Load() {
135144
t.Error("Callback mechanism should work")
136145
}
137146

138-
if receivedSlot != testSlot {
139-
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
147+
if receivedHostPort != testHostPort {
148+
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
149+
}
150+
151+
if len(receivedSlotRanges) != 1 || receivedSlotRanges[0] != "5678" {
152+
t.Errorf("Expected slot ranges [5678], got %v", receivedSlotRanges)
140153
}
141154
})
142155
}

internal/maintnotifications/logs/log_messages.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,11 @@ const (
121121
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
122122
ManagerNotInitializedMessage = "manager not initialized"
123123
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
124-
InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification"
125-
InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification"
126-
SlotMigratingMessage = "slot is migrating, applying relaxed timeout"
127-
SlotMigratedMessage = "slot has migrated, triggering cluster state reload"
124+
InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification"
125+
InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification"
126+
InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification"
127+
SlotMigratingMessage = "slots migrating, applying relaxed timeout"
128+
SlotMigratedMessage = "slots migrated, triggering cluster state reload"
128129

129130
// ========================================
130131
// used in pool/conn
@@ -629,31 +630,41 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
629630
}
630631

631632
// Cluster notification functions
632-
func InvalidSlotInSMigratingNotification(slot interface{}) string {
633-
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot)
633+
func InvalidSeqIDInSMigratingNotification(seqID interface{}) string {
634+
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID)
634635
return appendJSONIfDebug(message, map[string]interface{}{
635-
"slot": fmt.Sprintf("%v", slot),
636+
"seqID": fmt.Sprintf("%v", seqID),
636637
})
637638
}
638639

639-
func InvalidSlotInSMigratedNotification(slot interface{}) string {
640-
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot)
640+
func InvalidSeqIDInSMigratedNotification(seqID interface{}) string {
641+
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID)
641642
return appendJSONIfDebug(message, map[string]interface{}{
642-
"slot": fmt.Sprintf("%v", slot),
643+
"seqID": fmt.Sprintf("%v", seqID),
643644
})
644645
}
645646

646-
func SlotMigrating(connID uint64, slot int64) string {
647-
message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot)
647+
func InvalidHostPortInSMigratedNotification(hostPort interface{}) string {
648+
message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort)
648649
return appendJSONIfDebug(message, map[string]interface{}{
649-
"connID": connID,
650-
"slot": slot,
650+
"hostPort": fmt.Sprintf("%v", hostPort),
651+
})
652+
}
653+
654+
func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string {
655+
message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges)
656+
return appendJSONIfDebug(message, map[string]interface{}{
657+
"connID": connID,
658+
"seqID": seqID,
659+
"slotRanges": slotRanges,
651660
})
652661
}
653662

654-
func SlotMigrated(slot int64) string {
655-
message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot)
663+
func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string {
664+
message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges)
656665
return appendJSONIfDebug(message, map[string]interface{}{
657-
"slot": slot,
666+
"seqID": seqID,
667+
"hostPort": hostPort,
668+
"slotRanges": slotRanges,
658669
})
659670
}

maintnotifications/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ Seamless Redis connection handoffs during cluster maintenance operations without
66

77
**Cluster notifications are now supported for ClusterClient!**
88

9-
- **SMIGRATING**: Relaxes timeouts when a slot is being migrated
10-
- **SMIGRATED**: Reloads cluster state when a slot migration completes
9+
- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated
10+
- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes
1111

1212
**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.
1313

maintnotifications/manager.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,11 @@ type MovingOperation struct {
9292

9393
// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
9494
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
95-
// The slot parameter indicates which slot has migrated (0-16383).
95+
// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379").
96+
// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]).
9697
// Currently, implementations typically reload the entire cluster state, but in the future
97-
// this could be optimized to reload only the specific slot.
98-
type ClusterStateReloadCallback func(ctx context.Context, slot int)
98+
// this could be optimized to reload only the specific slots.
99+
type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string)
99100

100101
// NewManager creates a new simplified manager.
101102
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
@@ -340,9 +341,9 @@ func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCall
340341
}
341342

342343
// TriggerClusterStateReload calls the cluster state reload callback if it's set.
343-
// This is called when a SMOVED notification is received.
344-
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) {
344+
// This is called when a SMIGRATED notification is received.
345+
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) {
345346
if hm.clusterStateReloadCallback != nil {
346-
hm.clusterStateReloadCallback(ctx, slot)
347+
hm.clusterStateReloadCallback(ctx, hostPort, slotRanges)
347348
}
348349
}

maintnotifications/push_notification_handler.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -294,19 +294,30 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
294294
// handleSMigrating processes SMIGRATING notifications.
295295
// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node.
296296
// This is a per-connection notification that applies relaxed timeouts during slot migration.
297-
// Expected format: ["SMIGRATING", slot, ...]
297+
// Expected format: ["SMIGRATING", SeqID, slot/range1-range2, ...]
298298
func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
299-
if len(notification) < 2 {
299+
if len(notification) < 3 {
300300
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification))
301301
return ErrInvalidNotification
302302
}
303303

304-
slot, ok := notification[1].(int64)
304+
// Extract SeqID (position 1)
305+
seqID, ok := notification[1].(int64)
305306
if !ok {
306-
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratingNotification(notification[1]))
307+
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratingNotification(notification[1]))
307308
return ErrInvalidNotification
308309
}
309310

311+
// Extract slot ranges (position 2+)
312+
// For now, we just extract them for logging
313+
// Format can be: single slot "1234" or range "100-200"
314+
var slotRanges []string
315+
for i := 2; i < len(notification); i++ {
316+
if slotRange, ok := notification[i].(string); ok {
317+
slotRanges = append(slotRanges, slotRange)
318+
}
319+
}
320+
310321
if handlerCtx.Conn == nil {
311322
internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING"))
312323
return ErrInvalidNotification
@@ -320,7 +331,7 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx
320331

321332
// Apply relaxed timeout to this specific connection
322333
if internal.LogLevel.InfoOrAbove() {
323-
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), slot))
334+
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), seqID, slotRanges))
324335
}
325336
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
326337
return nil
@@ -329,26 +340,48 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx
329340
// handleSMigrated processes SMIGRATED notifications.
330341
// SMIGRATED indicates that a cluster slot has finished migrating to a different node.
331342
// This is a cluster-level notification that triggers cluster state reload.
332-
// Expected format: ["SMIGRATED", slot, ...]
343+
// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...]
333344
func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
334-
if len(notification) < 2 {
345+
if len(notification) < 4 {
335346
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification))
336347
return ErrInvalidNotification
337348
}
338349

339-
slot, ok := notification[1].(int64)
350+
// Extract SeqID (position 1)
351+
seqID, ok := notification[1].(int64)
352+
if !ok {
353+
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratedNotification(notification[1]))
354+
return ErrInvalidNotification
355+
}
356+
357+
// Extract host:port (position 2)
358+
hostPort, ok := notification[2].(string)
340359
if !ok {
341-
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratedNotification(notification[1]))
360+
internal.Logger.Printf(ctx, logs.InvalidHostPortInSMigratedNotification(notification[2]))
342361
return ErrInvalidNotification
343362
}
344363

364+
// Extract slot ranges (position 3+)
365+
// For now, we just extract them for logging
366+
// Format can be: single slot "1234" or range "100-200"
367+
var slotRanges []string
368+
for i := 3; i < len(notification); i++ {
369+
if slotRange, ok := notification[i].(string); ok {
370+
slotRanges = append(slotRanges, slotRange)
371+
}
372+
}
373+
345374
if internal.LogLevel.InfoOrAbove() {
346-
internal.Logger.Printf(ctx, logs.SlotMigrated(slot))
375+
internal.Logger.Printf(ctx, logs.SlotMigrated(seqID, hostPort, slotRanges))
347376
}
348377

349-
// Trigger cluster state reload via callback, passing the slot ID
350-
// This allows for future optimization of partial slot reloads
351-
snh.manager.TriggerClusterStateReload(ctx, int(slot))
378+
// Trigger cluster state reload via callback, passing host:port and slot ranges
379+
// For now, implementations just log these and trigger a full reload
380+
// In the future, this could be optimized to reload only the specific slots
381+
snh.manager.TriggerClusterStateReload(ctx, hostPort, slotRanges)
382+
383+
// TODO: Should we also clear the relaxed timeout here (like MIGRATED does)?
384+
// Currently we only trigger state reload, but the timeout stays relaxed
352385

353386
return nil
354387
}

0 commit comments

Comments
 (0)