Skip to content

Commit 391cee9

Browse files
committed
process once per client / seqid
1 parent d7a246b commit 391cee9

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

maintnotifications/manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ type Manager struct {
6969
// MOVING operation tracking - using sync.Map for better concurrent performance
7070
activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation
7171

72+
// SMIGRATED notification deduplication - tracks processed SeqIDs
73+
// Multiple connections may receive the same SMIGRATED notification
74+
processedSMigratedSeqIDs sync.Map // map[int64]bool
75+
7276
// Atomic state tracking - no locks needed for state queries
7377
activeOperationCount atomic.Int64 // Number of active operations
7478
closed atomic.Bool // Manager closed state
@@ -238,6 +242,15 @@ func (hm *Manager) GetActiveOperationCount() int64 {
238242
return hm.activeOperationCount.Load()
239243
}
240244

245+
// MarkSMigratedSeqIDProcessed attempts to mark a SMIGRATED SeqID as processed.
246+
// Returns true if this is the first time processing this SeqID (should process),
247+
// false if it was already processed (should skip).
248+
// This prevents duplicate processing when multiple connections receive the same notification.
249+
func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool {
250+
_, alreadyProcessed := hm.processedSMigratedSeqIDs.LoadOrStore(seqID, true)
251+
return !alreadyProcessed // Return true if NOT already processed
252+
}
253+
241254
// Close closes the manager.
242255
func (hm *Manager) Close() error {
243256
// Use atomic operation for thread-safe close check

maintnotifications/push_notification_handler.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx
341341
// SMIGRATED indicates that a cluster slot has finished migrating to a different node.
342342
// This is a cluster-level notification that triggers cluster state reload.
343343
// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...]
344+
// Note: Multiple connections may receive the same notification, so we deduplicate by SeqID.
344345
func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
345346
if len(notification) < 4 {
346347
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification))
@@ -354,6 +355,15 @@ func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx
354355
return ErrInvalidNotification
355356
}
356357

358+
// Deduplicate by SeqID - multiple connections may receive the same notification
359+
if !snh.manager.MarkSMigratedSeqIDProcessed(seqID) {
360+
// Already processed this SeqID, skip
361+
if internal.LogLevel.DebugOrAbove() {
362+
internal.Logger.Printf(ctx, "cluster: SMIGRATED notification with SeqID %d already processed, skipping", seqID)
363+
}
364+
return nil
365+
}
366+
357367
// Extract host:port (position 2)
358368
hostPort, ok := notification[2].(string)
359369
if !ok {

maintnotifications/smigrating_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,64 @@ func TestSMigratedNotificationHandler(t *testing.T) {
201201
}
202202
})
203203

204+
t.Run("SMigratedNotification_Deduplication", func(t *testing.T) {
205+
// Track callback invocations
206+
var callbackCount atomic.Int32
207+
208+
// Create a mock manager with callback
209+
manager := &Manager{
210+
clusterStateReloadCallback: func(ctx context.Context, hostPort string, slotRanges []string) {
211+
callbackCount.Add(1)
212+
},
213+
}
214+
215+
// Create notification handler
216+
handler := &NotificationHandler{
217+
manager: manager,
218+
operationsManager: manager,
219+
}
220+
221+
// Create SMIGRATED notification with SeqID 456
222+
notification := []interface{}{"SMIGRATED", int64(456), "127.0.0.1:6379", "1234"}
223+
224+
ctx := context.Background()
225+
handlerCtx := push.NotificationHandlerContext{}
226+
227+
// Handle the notification first time
228+
err := handler.handleSMigrated(ctx, handlerCtx, notification)
229+
if err != nil {
230+
t.Errorf("handleSMigrated should not error on first call: %v", err)
231+
}
232+
233+
// Verify callback was called once
234+
if callbackCount.Load() != 1 {
235+
t.Errorf("Expected callback to be called once, got %d", callbackCount.Load())
236+
}
237+
238+
// Handle the same notification again (simulating multiple connections)
239+
err = handler.handleSMigrated(ctx, handlerCtx, notification)
240+
if err != nil {
241+
t.Errorf("handleSMigrated should not error on second call: %v", err)
242+
}
243+
244+
// Verify callback was NOT called again (still 1)
245+
if callbackCount.Load() != 1 {
246+
t.Errorf("Expected callback to be called only once (deduplication), got %d", callbackCount.Load())
247+
}
248+
249+
// Handle a different notification with different SeqID
250+
notification2 := []interface{}{"SMIGRATED", int64(789), "127.0.0.1:6380", "5678"}
251+
err = handler.handleSMigrated(ctx, handlerCtx, notification2)
252+
if err != nil {
253+
t.Errorf("handleSMigrated should not error on third call: %v", err)
254+
}
255+
256+
// Verify callback was called again (now 2)
257+
if callbackCount.Load() != 2 {
258+
t.Errorf("Expected callback to be called twice (different SeqID), got %d", callbackCount.Load())
259+
}
260+
})
261+
204262
t.Run("InvalidSMigratedNotification_TooShort", func(t *testing.T) {
205263
manager := &Manager{}
206264
handler := &NotificationHandler{

0 commit comments

Comments
 (0)