Skip to content

Commit ba5dcb1

Browse files
committed
feat: add pub/sub message filtering to push notification processor
- Add isPubSubMessage() function to identify pub/sub message types - Filter out pub/sub messages in ProcessPendingNotifications - Allow pub/sub system to handle its own messages without interference - Process only cluster/system push notifications (MOVING, MIGRATING, etc.) - Add comprehensive test coverage for filtering logic Pub/sub message types filtered: - message (regular pub/sub) - pmessage (pattern pub/sub) - subscribe/unsubscribe (subscription management) - psubscribe/punsubscribe (pattern subscription management) - smessage (sharded pub/sub, Redis 7.0+) Benefits: - Clear separation of concerns between pub/sub and push notifications - Prevents interference between the two messaging systems - Ensures pub/sub messages reach their intended handlers - Eliminates message loss due to incorrect interception - Improved system reliability and performance - Better resource utilization and message flow Implementation: - Efficient O(1) switch statement for message type lookup - Case-sensitive matching for precise filtering - Early return to skip unnecessary processing - Maintains processing of other notifications in same batch - Applied to all processing points (WithReader, Pool.Put, isHealthyConn) Test coverage: - TestIsPubSubMessage - Function correctness and edge cases - TestPubSubFiltering - End-to-end integration testing - Mixed message scenarios and handler verification
1 parent b6e712b commit ba5dcb1

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

internal/pushnotif/processor.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
8282
if len(notification) > 0 {
8383
// Extract the notification type (first element)
8484
if notificationType, ok := notification[0].(string); ok {
85+
// Skip pub/sub messages - they should be handled by the pub/sub system
86+
if isPubSubMessage(notificationType) {
87+
continue
88+
}
89+
8590
// Get the handler for this notification type
8691
if handler := p.registry.GetHandler(notificationType); handler != nil {
8792
// Handle the notification
@@ -94,6 +99,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
9499
return nil
95100
}
96101

102+
// isPubSubMessage checks if a notification type is a pub/sub message that should be ignored
103+
// by the push notification processor and handled by the pub/sub system instead.
104+
func isPubSubMessage(notificationType string) bool {
105+
switch notificationType {
106+
case "message", // Regular pub/sub message
107+
"pmessage", // Pattern pub/sub message
108+
"subscribe", // Subscription confirmation
109+
"unsubscribe", // Unsubscription confirmation
110+
"psubscribe", // Pattern subscription confirmation
111+
"punsubscribe", // Pattern unsubscription confirmation
112+
"smessage": // Sharded pub/sub message (Redis 7.0+)
113+
return true
114+
default:
115+
return false
116+
}
117+
}
118+
97119
// VoidProcessor discards all push notifications without processing them.
98120
type VoidProcessor struct{}
99121

internal/pushnotif/pushnotif_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context,
136136
if len(notification) > 0 {
137137
// Extract the notification type (first element)
138138
if notificationType, ok := notification[0].(string); ok {
139+
// Skip pub/sub messages - they should be handled by the pub/sub system
140+
if isPubSubMessage(notificationType) {
141+
continue
142+
}
143+
139144
// Get the handler for this notification type
140145
if handler := processor.registry.GetHandler(notificationType); handler != nil {
141146
// Handle the notification
@@ -620,4 +625,112 @@ func TestVoidProcessor(t *testing.T) {
620625
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
621626
}
622627
})
628+
}
629+
630+
// TestIsPubSubMessage tests the isPubSubMessage function
631+
func TestIsPubSubMessage(t *testing.T) {
632+
t.Run("PubSubMessages", func(t *testing.T) {
633+
pubSubMessages := []string{
634+
"message", // Regular pub/sub message
635+
"pmessage", // Pattern pub/sub message
636+
"subscribe", // Subscription confirmation
637+
"unsubscribe", // Unsubscription confirmation
638+
"psubscribe", // Pattern subscription confirmation
639+
"punsubscribe", // Pattern unsubscription confirmation
640+
"smessage", // Sharded pub/sub message (Redis 7.0+)
641+
}
642+
643+
for _, msgType := range pubSubMessages {
644+
if !isPubSubMessage(msgType) {
645+
t.Errorf("isPubSubMessage(%q) should return true", msgType)
646+
}
647+
}
648+
})
649+
650+
t.Run("NonPubSubMessages", func(t *testing.T) {
651+
nonPubSubMessages := []string{
652+
"MOVING", // Cluster slot migration
653+
"MIGRATING", // Cluster slot migration
654+
"MIGRATED", // Cluster slot migration
655+
"FAILING_OVER", // Cluster failover
656+
"FAILED_OVER", // Cluster failover
657+
"unknown", // Unknown message type
658+
"", // Empty string
659+
"MESSAGE", // Case sensitive - should not match
660+
"PMESSAGE", // Case sensitive - should not match
661+
}
662+
663+
for _, msgType := range nonPubSubMessages {
664+
if isPubSubMessage(msgType) {
665+
t.Errorf("isPubSubMessage(%q) should return false", msgType)
666+
}
667+
}
668+
})
669+
}
670+
671+
// TestPubSubFiltering tests that pub/sub messages are filtered out during processing
672+
func TestPubSubFiltering(t *testing.T) {
673+
t.Run("PubSubMessagesIgnored", func(t *testing.T) {
674+
processor := NewProcessor()
675+
handler := NewTestHandler("test", true)
676+
ctx := context.Background()
677+
678+
// Register a handler for a non-pub/sub notification
679+
err := processor.RegisterHandler("MOVING", handler, false)
680+
if err != nil {
681+
t.Fatalf("Failed to register handler: %v", err)
682+
}
683+
684+
// Test with mock reader - pub/sub message should be ignored
685+
mockReader := NewMockReader()
686+
mockReader.AddPeekReplyType(proto.RespPush, nil)
687+
pubSubNotification := []interface{}{"message", "channel", "data"}
688+
mockReader.AddReadReply(pubSubNotification, nil)
689+
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
690+
691+
handler.Reset()
692+
err = testProcessPendingNotifications(processor, ctx, mockReader)
693+
if err != nil {
694+
t.Errorf("ProcessPendingNotifications should handle pub/sub messages gracefully, got: %v", err)
695+
}
696+
697+
// Check that handler was NOT called for pub/sub message
698+
handled := handler.GetHandledNotifications()
699+
if len(handled) != 0 {
700+
t.Errorf("Expected 0 handled notifications for pub/sub message, got: %d", len(handled))
701+
}
702+
})
703+
704+
t.Run("NonPubSubMessagesProcessed", func(t *testing.T) {
705+
processor := NewProcessor()
706+
handler := NewTestHandler("test", true)
707+
ctx := context.Background()
708+
709+
// Register a handler for a non-pub/sub notification
710+
err := processor.RegisterHandler("MOVING", handler, false)
711+
if err != nil {
712+
t.Fatalf("Failed to register handler: %v", err)
713+
}
714+
715+
// Test with mock reader - non-pub/sub message should be processed
716+
mockReader := NewMockReader()
717+
mockReader.AddPeekReplyType(proto.RespPush, nil)
718+
clusterNotification := []interface{}{"MOVING", "slot", "12345"}
719+
mockReader.AddReadReply(clusterNotification, nil)
720+
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
721+
722+
handler.Reset()
723+
err = testProcessPendingNotifications(processor, ctx, mockReader)
724+
if err != nil {
725+
t.Errorf("ProcessPendingNotifications should handle cluster notifications, got: %v", err)
726+
}
727+
728+
// Check that handler WAS called for cluster notification
729+
handled := handler.GetHandledNotifications()
730+
if len(handled) != 1 {
731+
t.Errorf("Expected 1 handled notification for cluster message, got: %d", len(handled))
732+
} else if len(handled[0]) != 3 || handled[0][0] != "MOVING" {
733+
t.Errorf("Expected MOVING notification, got: %v", handled[0])
734+
}
735+
})
623736
}

0 commit comments

Comments
 (0)