Skip to content

Commit 91d91ee

Browse files
committed
Fix for #2687
1 parent 4847922 commit 91d91ee

File tree

2 files changed

+240
-0
lines changed

2 files changed

+240
-0
lines changed

server/rpc_events.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,27 @@ func (s *rpcServer) reSubscribe(config Options) {
127127
if s.subscribers[sb] != nil {
128128
continue
129129
}
130+
// If we've already created a broker subscription for this topic
131+
// (from a different Subscriber entry) then don't create another
132+
// broker.Subscribe. We still need to register the subscriber with
133+
// the router so it receives dispatched messages.
134+
var already bool
135+
for other, subs := range s.subscribers {
136+
if other.Topic() == sb.Topic() && subs != nil {
137+
already = true
138+
break
139+
}
140+
}
141+
if already {
142+
// register with router only
143+
if err := s.router.Subscribe(sb); err != nil {
144+
config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err)
145+
continue
146+
}
147+
// mark this subscriber as having no broker subscription
148+
s.subscribers[sb] = nil
149+
continue
150+
}
130151
var opts []broker.SubscribeOption
131152
if queue := sb.Options().Queue; len(queue) > 0 {
132153
opts = append(opts, broker.Queue(queue))

server/rpc_events_test.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"go-micro.dev/v5/broker"
11+
"go-micro.dev/v5/registry"
12+
)
13+
14+
// TestSubscriberNoDuplicates verifies that when multiple subscribers are registered
15+
// for the same topic with different queues, each handler is called exactly once
16+
// per published message (no duplicate deliveries).
17+
func TestSubscriberNoDuplicates(t *testing.T) {
18+
// Create a memory broker
19+
memBroker := broker.NewMemoryBroker()
20+
if err := memBroker.Connect(); err != nil {
21+
t.Fatalf("Failed to connect broker: %v", err)
22+
}
23+
defer memBroker.Disconnect()
24+
25+
// Create a memory registry
26+
memRegistry := registry.NewMemoryRegistry()
27+
28+
// Create server with memory broker and registry
29+
srv := NewRPCServer(
30+
Broker(memBroker),
31+
Registry(memRegistry),
32+
Name("test.service"),
33+
Id("test-1"),
34+
Address("127.0.0.1:0"),
35+
)
36+
37+
// Track handler invocations
38+
var countA, countB, countC int32
39+
40+
// Handler functions
41+
handlerA := func(ctx context.Context, msg *TestMessage) error {
42+
atomic.AddInt32(&countA, 1)
43+
return nil
44+
}
45+
46+
handlerB := func(ctx context.Context, msg *TestMessage) error {
47+
atomic.AddInt32(&countB, 1)
48+
return nil
49+
}
50+
51+
handlerC := func(ctx context.Context, msg *TestMessage) error {
52+
atomic.AddInt32(&countC, 1)
53+
return nil
54+
}
55+
56+
// Register three subscribers with same topic but different queues
57+
topic := "EVENT_1"
58+
59+
subA := srv.NewSubscriber(topic, handlerA, SubscriberQueue("A"))
60+
if err := srv.Subscribe(subA); err != nil {
61+
t.Fatalf("Failed to subscribe A: %v", err)
62+
}
63+
64+
subB := srv.NewSubscriber(topic, handlerB, SubscriberQueue("B"))
65+
if err := srv.Subscribe(subB); err != nil {
66+
t.Fatalf("Failed to subscribe B: %v", err)
67+
}
68+
69+
subC := srv.NewSubscriber(topic, handlerC, SubscriberQueue("C"))
70+
if err := srv.Subscribe(subC); err != nil {
71+
t.Fatalf("Failed to subscribe C: %v", err)
72+
}
73+
74+
// Start the server (this will trigger reSubscribe)
75+
if err := srv.Start(); err != nil {
76+
t.Fatalf("Failed to start server: %v", err)
77+
}
78+
defer srv.Stop()
79+
80+
// Give server time to establish subscriptions
81+
time.Sleep(100 * time.Millisecond)
82+
83+
// Publish a message to the topic
84+
if err := memBroker.Publish(topic, &broker.Message{
85+
Header: map[string]string{
86+
"Micro-Topic": topic,
87+
"Content-Type": "application/json",
88+
},
89+
Body: []byte(`{"value":"test"}`),
90+
}); err != nil {
91+
t.Fatalf("Failed to publish message: %v", err)
92+
}
93+
94+
// Give handlers time to process
95+
time.Sleep(200 * time.Millisecond)
96+
97+
// Verify each handler was called exactly once
98+
if got := atomic.LoadInt32(&countA); got != 1 {
99+
t.Errorf("Handler A called %d times, expected 1", got)
100+
}
101+
if got := atomic.LoadInt32(&countB); got != 1 {
102+
t.Errorf("Handler B called %d times, expected 1", got)
103+
}
104+
if got := atomic.LoadInt32(&countC); got != 1 {
105+
t.Errorf("Handler C called %d times, expected 1", got)
106+
}
107+
}
108+
109+
// TestSubscriberMultipleTopics verifies that subscribers for different topics
110+
// each receive their respective messages correctly.
111+
func TestSubscriberMultipleTopics(t *testing.T) {
112+
// Create a memory broker
113+
memBroker := broker.NewMemoryBroker()
114+
if err := memBroker.Connect(); err != nil {
115+
t.Fatalf("Failed to connect broker: %v", err)
116+
}
117+
defer memBroker.Disconnect()
118+
119+
// Create a memory registry
120+
memRegistry := registry.NewMemoryRegistry()
121+
122+
// Create server
123+
srv := NewRPCServer(
124+
Broker(memBroker),
125+
Registry(memRegistry),
126+
Name("test.service"),
127+
Id("test-2"),
128+
Address("127.0.0.1:0"),
129+
)
130+
131+
// Track handler invocations
132+
var count1, count2 int32
133+
var wg sync.WaitGroup
134+
wg.Add(2)
135+
136+
// Handler functions
137+
handler1 := func(ctx context.Context, msg *TestMessage) error {
138+
atomic.AddInt32(&count1, 1)
139+
wg.Done()
140+
return nil
141+
}
142+
143+
handler2 := func(ctx context.Context, msg *TestMessage) error {
144+
atomic.AddInt32(&count2, 1)
145+
wg.Done()
146+
return nil
147+
}
148+
149+
// Register subscribers for different topics
150+
topic1 := "TOPIC_1"
151+
topic2 := "TOPIC_2"
152+
153+
sub1 := srv.NewSubscriber(topic1, handler1)
154+
if err := srv.Subscribe(sub1); err != nil {
155+
t.Fatalf("Failed to subscribe to topic1: %v", err)
156+
}
157+
158+
sub2 := srv.NewSubscriber(topic2, handler2)
159+
if err := srv.Subscribe(sub2); err != nil {
160+
t.Fatalf("Failed to subscribe to topic2: %v", err)
161+
}
162+
163+
// Start the server
164+
if err := srv.Start(); err != nil {
165+
t.Fatalf("Failed to start server: %v", err)
166+
}
167+
defer srv.Stop()
168+
169+
// Give server time to establish subscriptions
170+
time.Sleep(100 * time.Millisecond)
171+
172+
// Publish messages to different topics
173+
if err := memBroker.Publish(topic1, &broker.Message{
174+
Header: map[string]string{
175+
"Micro-Topic": topic1,
176+
"Content-Type": "application/json",
177+
},
178+
Body: []byte(`{"value":"test1"}`),
179+
}); err != nil {
180+
t.Fatalf("Failed to publish to topic1: %v", err)
181+
}
182+
183+
if err := memBroker.Publish(topic2, &broker.Message{
184+
Header: map[string]string{
185+
"Micro-Topic": topic2,
186+
"Content-Type": "application/json",
187+
},
188+
Body: []byte(`{"value":"test2"}`),
189+
}); err != nil {
190+
t.Fatalf("Failed to publish to topic2: %v", err)
191+
}
192+
193+
// Wait for handlers to be called
194+
done := make(chan struct{})
195+
go func() {
196+
wg.Wait()
197+
close(done)
198+
}()
199+
200+
select {
201+
case <-done:
202+
// Success
203+
case <-time.After(2 * time.Second):
204+
t.Fatal("Timeout waiting for handlers to be called")
205+
}
206+
207+
// Verify each handler was called exactly once
208+
if got := atomic.LoadInt32(&count1); got != 1 {
209+
t.Errorf("Handler 1 called %d times, expected 1", got)
210+
}
211+
if got := atomic.LoadInt32(&count2); got != 1 {
212+
t.Errorf("Handler 2 called %d times, expected 1", got)
213+
}
214+
}
215+
216+
// TestMessage is a test message type
217+
type TestMessage struct {
218+
Value string `json:"value"`
219+
}

0 commit comments

Comments
 (0)