Skip to content

Commit a247360

Browse files
committed
smigrating/smigrated intro
1 parent 55aa026 commit a247360

File tree

8 files changed

+642
-17
lines changed

8 files changed

+642
-17
lines changed

cluster_smigrating_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
8+
"github.com/redis/go-redis/v9/maintnotifications"
9+
)
10+
11+
// TestClusterClientSMigratedCallback tests that ClusterClient sets up SMIGRATED callback on node clients
12+
func TestClusterClientSMigratedCallback(t *testing.T) {
13+
t.Run("CallbackSetupWithMaintNotifications", func(t *testing.T) {
14+
// Track if state reload was called
15+
var reloadCalled atomic.Bool
16+
17+
// Create cluster options with maintnotifications enabled
18+
opt := &ClusterOptions{
19+
Addrs: []string{"localhost:7000"}, // Dummy address
20+
MaintNotificationsConfig: &maintnotifications.Config{
21+
Mode: maintnotifications.ModeEnabled,
22+
},
23+
// Use custom NewClient to track when nodes are created
24+
NewClient: func(opt *Options) *Client {
25+
client := NewClient(opt)
26+
return client
27+
},
28+
}
29+
30+
// Create cluster client
31+
cluster := NewClusterClient(opt)
32+
defer cluster.Close()
33+
34+
// Manually trigger node creation by calling GetOrCreate
35+
// This simulates what happens during normal cluster operations
36+
node, err := cluster.nodes.GetOrCreate("localhost:7000")
37+
if err != nil {
38+
t.Fatalf("Failed to create node: %v", err)
39+
}
40+
41+
// Get the maintnotifications manager from the node client
42+
manager := node.Client.GetMaintNotificationsManager()
43+
if manager == nil {
44+
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
45+
return
46+
}
47+
48+
// Temporarily replace the cluster state reload with our test version
49+
var receivedSlot int
50+
originalCallback := manager
51+
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
52+
reloadCalled.Store(true)
53+
receivedSlot = slot
54+
})
55+
56+
// Trigger the callback (this is what SMIGRATED notification would do)
57+
ctx := context.Background()
58+
testSlot := 1234
59+
manager.TriggerClusterStateReload(ctx, testSlot)
60+
61+
// Verify callback was called
62+
if !reloadCalled.Load() {
63+
t.Error("Cluster state reload callback should have been called")
64+
}
65+
66+
// Verify slot was passed correctly
67+
if receivedSlot != testSlot {
68+
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
69+
}
70+
_ = originalCallback
71+
})
72+
73+
t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) {
74+
// Create cluster options WITHOUT maintnotifications
75+
opt := &ClusterOptions{
76+
Addrs: []string{"localhost:7000"}, // Dummy address
77+
// MaintNotificationsConfig is nil
78+
}
79+
80+
// Create cluster client
81+
cluster := NewClusterClient(opt)
82+
defer cluster.Close()
83+
84+
// The OnNewNode callback should not be registered when MaintNotificationsConfig is nil
85+
// This test just verifies that the cluster client doesn't panic
86+
})
87+
}
88+
89+
// TestClusterClientSMigratedIntegration tests SMIGRATED notification handling in cluster context
90+
func TestClusterClientSMigratedIntegration(t *testing.T) {
91+
t.Run("SMigratedTriggersStateReload", func(t *testing.T) {
92+
// This test verifies the integration between SMIGRATED notification and cluster state reload
93+
// We verify that the callback is properly set up to call cluster.state.LazyReload()
94+
95+
// Create cluster options with maintnotifications enabled
96+
opt := &ClusterOptions{
97+
Addrs: []string{"localhost:7000"},
98+
MaintNotificationsConfig: &maintnotifications.Config{
99+
Mode: maintnotifications.ModeEnabled,
100+
},
101+
}
102+
103+
// Create cluster client
104+
cluster := NewClusterClient(opt)
105+
defer cluster.Close()
106+
107+
// Create a node
108+
node, err := cluster.nodes.GetOrCreate("localhost:7000")
109+
if err != nil {
110+
t.Fatalf("Failed to create node: %v", err)
111+
}
112+
113+
// Get the maintnotifications manager
114+
manager := node.Client.GetMaintNotificationsManager()
115+
if manager == nil {
116+
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
117+
return
118+
}
119+
120+
// Verify that the callback is set by checking it's not nil
121+
// We can't directly test LazyReload being called without a real cluster,
122+
// but we can verify the callback mechanism works
123+
var callbackWorks atomic.Bool
124+
var receivedSlot int
125+
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
126+
callbackWorks.Store(true)
127+
receivedSlot = slot
128+
})
129+
130+
ctx := context.Background()
131+
testSlot := 5678
132+
manager.TriggerClusterStateReload(ctx, testSlot)
133+
134+
if !callbackWorks.Load() {
135+
t.Error("Callback mechanism should work")
136+
}
137+
138+
if receivedSlot != testSlot {
139+
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
140+
}
141+
})
142+
}
143+
144+
// TestSMigratingAndSMigratedConstants verifies the SMIGRATING and SMIGRATED constants are exported
145+
func TestSMigratingAndSMigratedConstants(t *testing.T) {
146+
// This test verifies that the SMIGRATING and SMIGRATED constants are properly defined
147+
// and accessible from the maintnotifications package
148+
if maintnotifications.NotificationSMigrating != "SMIGRATING" {
149+
t.Errorf("Expected NotificationSMigrating to be 'SMIGRATING', got: %s", maintnotifications.NotificationSMigrating)
150+
}
151+
152+
if maintnotifications.NotificationSMigrated != "SMIGRATED" {
153+
t.Errorf("Expected NotificationSMigrated to be 'SMIGRATED', got: %s", maintnotifications.NotificationSMigrated)
154+
}
155+
}
156+

internal/maintnotifications/logs/log_messages.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ const (
121121
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
122122
ManagerNotInitializedMessage = "manager not initialized"
123123
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
124+
InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification"
125+
InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification"
126+
SlotMigratingMessage = "slot is migrating, applying relaxed timeout"
127+
SlotMigratedMessage = "slot has migrated, triggering cluster state reload"
124128

125129
// ========================================
126130
// used in pool/conn
@@ -623,3 +627,33 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
623627
// If JSON parsing fails, return empty map
624628
return result
625629
}
630+
631+
// Cluster notification functions
632+
func InvalidSlotInSMigratingNotification(slot interface{}) string {
633+
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot)
634+
return appendJSONIfDebug(message, map[string]interface{}{
635+
"slot": fmt.Sprintf("%v", slot),
636+
})
637+
}
638+
639+
func InvalidSlotInSMigratedNotification(slot interface{}) string {
640+
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot)
641+
return appendJSONIfDebug(message, map[string]interface{}{
642+
"slot": fmt.Sprintf("%v", slot),
643+
})
644+
}
645+
646+
func SlotMigrating(connID uint64, slot int64) string {
647+
message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot)
648+
return appendJSONIfDebug(message, map[string]interface{}{
649+
"connID": connID,
650+
"slot": slot,
651+
})
652+
}
653+
654+
func SlotMigrated(slot int64) string {
655+
message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot)
656+
return appendJSONIfDebug(message, map[string]interface{}{
657+
"slot": slot,
658+
})
659+
}

maintnotifications/README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22

33
Seamless Redis connection handoffs during cluster maintenance operations without dropping connections.
44

5-
## ⚠️ **Important Note**
6-
**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality.
5+
## Cluster Support
6+
7+
**Cluster notifications are now supported for ClusterClient!**
8+
9+
- **SMIGRATING**: Relaxes timeouts when a slot is being migrated
10+
- **SMIGRATED**: Reloads cluster state when a slot migration completes
11+
12+
**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.
713

814
## Quick Start
915

maintnotifications/manager.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ import (
1818

1919
// Push notification type constants for maintenance
2020
const (
21-
NotificationMoving = "MOVING"
22-
NotificationMigrating = "MIGRATING"
23-
NotificationMigrated = "MIGRATED"
24-
NotificationFailingOver = "FAILING_OVER"
25-
NotificationFailedOver = "FAILED_OVER"
21+
NotificationMoving = "MOVING" // Per-connection handoff notification
22+
NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts
23+
NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts
24+
NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts
25+
NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts
26+
NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts
27+
NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload
2628
)
2729

2830
// maintenanceNotificationTypes contains all notification types that maintenance handles
@@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{
3234
NotificationMigrated,
3335
NotificationFailingOver,
3436
NotificationFailedOver,
37+
NotificationSMigrating,
38+
NotificationSMigrated,
3539
}
3640

3741
// NotificationHook is called before and after notification processing
@@ -73,6 +77,9 @@ type Manager struct {
7377
hooks []NotificationHook
7478
hooksMu sync.RWMutex // Protects hooks slice
7579
poolHooksRef *PoolHook
80+
81+
// Cluster state reload callback for SMIGRATED notifications
82+
clusterStateReloadCallback ClusterStateReloadCallback
7683
}
7784

7885
// MovingOperation tracks an active MOVING operation.
@@ -83,6 +90,13 @@ type MovingOperation struct {
8390
Deadline time.Time
8491
}
8592

93+
// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
94+
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
95+
// The slot parameter indicates which slot has migrated (0-16383).
96+
// Currently, implementations typically reload the entire cluster state, but in the future
97+
// this could be optimized to reload only the specific slot.
98+
type ClusterStateReloadCallback func(ctx context.Context, slot int)
99+
86100
// NewManager creates a new simplified manager.
87101
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
88102
if client == nil {
@@ -318,3 +332,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) {
318332
defer hm.hooksMu.Unlock()
319333
hm.hooks = append(hm.hooks, notificationHook)
320334
}
335+
336+
// SetClusterStateReloadCallback sets the callback function that will be called when a SMOVED notification is received.
337+
// This allows node clients to notify their parent ClusterClient to reload cluster state.
338+
func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) {
339+
hm.clusterStateReloadCallback = callback
340+
}
341+
342+
// TriggerClusterStateReload calls the cluster state reload callback if it's set.
343+
// This is called when a SMOVED notification is received.
344+
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) {
345+
if hm.clusterStateReloadCallback != nil {
346+
hm.clusterStateReloadCallback(ctx, slot)
347+
}
348+
}

maintnotifications/manager_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) {
217217
NotificationMigrated,
218218
NotificationFailingOver,
219219
NotificationFailedOver,
220+
NotificationSMigrating,
221+
NotificationSMigrated,
220222
}
221223

222224
if len(maintenanceNotificationTypes) != len(expectedTypes) {

0 commit comments

Comments
 (0)