Skip to content

Commit 34fd1d1

Browse files
authored
Fix super stream reconnection (#456)
* Fixes: #453 * Fixes: #458 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 62f5fa1 commit 34fd1d1

File tree

6 files changed

+106
-54
lines changed

6 files changed

+106
-54
lines changed

examples/reliable_super_stream_getting_started/reliable_super_stream_getting_started.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"errors"
55
"fmt"
6+
"time"
67

78
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
89
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
@@ -27,7 +28,7 @@ func main() {
2728
// Create a super stream
2829
streamName := "my-super-stream"
2930
// It is highly recommended to define the stream retention policy
30-
err = env.DeclareSuperStream(streamName, stream.NewPartitionsOptions(3).
31+
err = env.DeclareSuperStream(streamName, stream.NewPartitionsOptions(1).
3132
SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
3233

3334
// ignore the error if the stream already exists
@@ -75,17 +76,22 @@ func main() {
7576
return
7677
}
7778

78-
// Send a message
79-
for i := range 10 {
79+
// Send a messages
80+
for i := range 10000 {
8081
msg := amqp.NewMessage([]byte(fmt.Sprintf("Hello stream:%d", i)))
8182
msg.Properties = &amqp.MessageProperties{
8283
MessageID: fmt.Sprintf("msg-%d", i),
8384
}
8485
err = producer.Send(msg)
8586
if err != nil {
8687
fmt.Printf("Error sending message: %v\n", err)
87-
return
88+
time.Sleep(1 * time.Second)
8889
}
90+
if i%1000 == 0 {
91+
fmt.Printf("Sent %d messages\n", i)
92+
}
93+
// add a small delay in case you want to kill connection to see the reliable reconnection in action
94+
time.Sleep(200 * time.Millisecond)
8995
}
9096

9197
// press any key to exit

pkg/ha/ha_super_stream_consumer.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,29 @@ func NewReliableSuperStreamConsumer(env *stream.Environment, superStream string,
6464

6565
func (r *ReliableSuperStreamConsumer) handleNotifyClose(channelClose chan stream.CPartitionClose) {
6666
go func() {
67-
cPartitionClose := <-channelClose
68-
if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) {
69-
r.setStatus(StatusReconnecting)
70-
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason)
71-
r.bootstrap = false
72-
err, reconnected := retry(1, r, cPartitionClose.Partition)
73-
if err != nil {
74-
logs.LogInfo(""+
75-
"[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err)
76-
}
77-
if reconnected {
78-
r.setStatus(StatusOpen)
67+
// for channelClose until closed
68+
for cPartitionClose := range channelClose {
69+
if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) {
70+
r.setStatus(StatusReconnecting)
71+
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason)
72+
r.bootstrap = false
73+
err, reconnected := retry(1, r, cPartitionClose.Partition)
74+
if err != nil {
75+
logs.LogInfo(""+
76+
"[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err)
77+
}
78+
if reconnected {
79+
r.setStatus(StatusOpen)
80+
} else {
81+
r.setStatus(StatusClosed)
82+
}
7983
} else {
84+
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason)
8085
r.setStatus(StatusClosed)
86+
break
8187
}
82-
} else {
83-
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason)
84-
r.setStatus(StatusClosed)
8588
}
89+
logs.LogDebug("[ReliableSuperStreamConsumer] - cPartitionClose closed %s", r.getInfo())
8690
}()
8791
}
8892

@@ -103,6 +107,7 @@ func (r *ReliableSuperStreamConsumer) getEnv() *stream.Environment {
103107

104108
func (r *ReliableSuperStreamConsumer) getNewInstance(partition string) newEntityInstance {
105109
return func() error {
110+
c := r.consumer.Load()
106111
// by default the consumer will start from the consumerOptions.Offset
107112
off := r.consumerOptions.Offset
108113
var restartOffset int64
@@ -113,7 +118,8 @@ func (r *ReliableSuperStreamConsumer) getNewInstance(partition string) newEntity
113118
restartOffset = v.(int64)
114119
off = stream.OffsetSpecification{}.Offset(restartOffset + 1)
115120
}
116-
return r.consumer.Load().ConnectPartition(partition, off)
121+
122+
return c.ConnectPartition(partition, off)
117123
}
118124
}
119125

pkg/ha/ha_super_stream_publisher.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,27 +71,30 @@ func (r *ReliableSuperStreamProducer) handlePublishConfirm(confirm chan stream.P
7171

7272
func (r *ReliableSuperStreamProducer) handleNotifyClose(channelClose chan stream.PPartitionClose) {
7373
go func() {
74-
cPartitionClose := <-channelClose
75-
if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) {
76-
r.setStatus(StatusReconnecting)
77-
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason)
78-
err, reconnected := retry(1, r, cPartitionClose.Partition)
79-
if err != nil {
80-
logs.LogInfo(""+
81-
"[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err)
82-
}
83-
if reconnected {
84-
r.setStatus(StatusOpen)
74+
for cPartitionClose := range channelClose {
75+
if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) {
76+
r.setStatus(StatusReconnecting)
77+
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason)
78+
err, reconnected := retry(1, r, cPartitionClose.Partition)
79+
if err != nil {
80+
logs.LogInfo(""+
81+
"[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err)
82+
}
83+
if reconnected {
84+
r.setStatus(StatusOpen)
85+
} else {
86+
r.setStatus(StatusClosed)
87+
}
8588
} else {
89+
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason)
8690
r.setStatus(StatusClosed)
91+
break
8792
}
88-
} else {
89-
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason)
90-
r.setStatus(StatusClosed)
93+
r.reconnectionSignal.L.Lock()
94+
r.reconnectionSignal.Broadcast()
95+
r.reconnectionSignal.L.Unlock()
9196
}
92-
r.reconnectionSignal.L.Lock()
93-
r.reconnectionSignal.Broadcast()
94-
r.reconnectionSignal.L.Unlock()
97+
logs.LogDebug("[ReliableSuperStreamProducer] - closed %s", r.getInfo())
9598
}()
9699
}
97100

@@ -112,7 +115,8 @@ func (r *ReliableSuperStreamProducer) getEnv() *stream.Environment {
112115

113116
func (r *ReliableSuperStreamProducer) getNewInstance(streamName string) newEntityInstance {
114117
return func() error {
115-
return r.producer.Load().ConnectPartition(streamName)
118+
p := r.producer.Load()
119+
return p.ConnectPartition(streamName)
116120
}
117121
}
118122

pkg/stream/environment.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,13 @@ func (cc *environmentCoordinator) isConsumerListFull(clientsPerContextId int) bo
502502
}
503503

504504
func (cc *environmentCoordinator) maybeCleanClients() {
505-
cc.mutex.Lock()
506-
defer cc.mutex.Unlock()
505+
// Note: Mutex is not needed here because:
506+
// 1. sync.Map operations (Range, Delete) are thread-safe and can be called concurrently
507+
// 2. Deleting the current entry during Range iteration is safe per Go's sync.Map documentation
508+
// 3. This function is called from cleanup callbacks which may run concurrently, but
509+
// sync.Map handles concurrent access safely without requiring external synchronization
510+
// 4. We only delete entries for clients that are already closed (socket.isOpen() == false),
511+
// so there's no risk of deleting active clients that are being used elsewhere
507512

508513
cc.clientsPerContext.Range(func(key, value any) bool {
509514
client := value.(*Client)
@@ -620,7 +625,13 @@ func (cc *environmentCoordinator) validateBrokerConnection(client *Client, broke
620625
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
621626
client.connectionProperties.host,
622627
broker.advHost, broker.advPort)
623-
client.Close()
628+
// Safety check: Only close the client if there are no active consumers or producers.
629+
// This prevents premature disconnection during active operations, which could cause
630+
// message loss or connection errors. If there are active producers/consumers, we
631+
// create a new client without closing the old one, allowing graceful migration.
632+
if client.coordinator.ConsumersCount() == 0 && client.coordinator.ProducersCount() == 0 {
633+
client.Close()
634+
}
624635
client = newClientFunc()
625636
err := client.connect()
626637
if err != nil {

pkg/stream/super_stream_consumer.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,11 @@ type SuperStreamConsumer struct {
8787
// in a normal situation len(partitions) == len(consumers)
8888
// but in case of disconnection the len(partitions) can be > len(consumers)
8989
// since the consumer is in reconnection
90-
partitions []string
91-
env *Environment
92-
mutex sync.Mutex
90+
partitions []string
91+
env *Environment
92+
mutex sync.Mutex
93+
94+
chSuperStreamPartitionMutex sync.Mutex
9395
chSuperStreamPartitionClose chan CPartitionClose
9496

9597
SuperStream string
@@ -225,15 +227,15 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp
225227
}
226228
}
227229
s.mutex.Unlock()
230+
s.chSuperStreamPartitionMutex.Lock()
228231
if s.chSuperStreamPartitionClose != nil {
229-
s.mutex.Lock()
230232
s.chSuperStreamPartitionClose <- CPartitionClose{
231233
Partition: gpartion,
232234
Event: event,
233235
Context: s,
234236
}
235-
s.mutex.Unlock()
236237
}
238+
s.chSuperStreamPartitionMutex.Unlock()
237239
logs.LogDebug("[SuperStreamConsumer] chSuperStreamPartitionClose for partition: %s", gpartion)
238240
}(partition, closedEvent)
239241

@@ -255,11 +257,11 @@ func (s *SuperStreamConsumer) Close() error {
255257
// give the time to raise the close event
256258
go func() {
257259
time.Sleep(2 * time.Second)
258-
s.mutex.Lock()
260+
s.chSuperStreamPartitionMutex.Lock()
259261
if s.chSuperStreamPartitionClose != nil {
260262
close(s.chSuperStreamPartitionClose)
261263
}
262-
s.mutex.Unlock()
264+
s.chSuperStreamPartitionMutex.Unlock()
263265
}()
264266

265267
logs.LogDebug("[SuperStreamConsumer] Closed SuperStreamConsumer for: %s", s.SuperStream)

pkg/stream/super_stream_producer.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,12 @@ type SuperStreamProducer struct {
172172
// since the producer is in reconnection
173173
partitions []string
174174

175-
env *Environment
176-
mutex sync.Mutex
175+
env *Environment
176+
mutex sync.Mutex
177+
177178
chNotifyPublishConfirmation chan PartitionPublishConfirm
179+
180+
chSuperStreamPartitionMutex sync.Mutex
178181
chSuperStreamPartitionClose chan PPartitionClose
179182

180183
// public
@@ -220,8 +223,10 @@ func (s *SuperStreamProducer) init() error {
220223
return err
221224
}
222225
for _, p := range partitions {
226+
logs.LogDebug("Producing partition: %s", p)
223227
err = s.ConnectPartition(p)
224228
if err != nil {
229+
logs.LogError("Failed to connect partition: %s", p)
225230
return err
226231
}
227232
}
@@ -233,6 +238,10 @@ func (s *SuperStreamProducer) init() error {
233238
// that are hidden to the user.
234239
// with the ConnectPartition the user can re-connect a partition to the SuperStreamProducer
235240
// that should be used only in case of disconnection
241+
//
242+
// Note on mutex usage: The mutex is held while checking partition validity and activeProducers,
243+
// but is released before calling NewProducer() to avoid potential deadlocks. The mutex is
244+
// re-acquired after NewProducer() returns to safely update the activeProducers slice.
236245
func (s *SuperStreamProducer) ConnectPartition(partition string) error {
237246
logs.LogDebug("[SuperStreamProducer] ConnectPartition for partition: %s", partition)
238247

@@ -255,17 +264,25 @@ func (s *SuperStreamProducer) ConnectPartition(partition string) error {
255264
}
256265
}
257266

258-
s.mutex.Unlock()
259267
var options = NewProducerOptions()
260268
if s.SuperStreamProducerOptions.ClientProvidedName != "" {
261269
options.ClientProvidedName = s.SuperStreamProducerOptions.ClientProvidedName
262270
}
263271
options = options.SetFilter(s.SuperStreamProducerOptions.Filter)
264272

273+
// Unlock mutex before calling NewProducer() to avoid potential deadlocks.
274+
// NewProducer() may perform network I/O and could block, and it may also
275+
// need to acquire other locks internally. Holding this mutex during that
276+
// call could cause deadlocks if other goroutines are waiting on this mutex
277+
// or if NewProducer() needs to acquire locks that conflict with this one.
278+
s.mutex.Unlock()
279+
265280
producer, err := s.env.NewProducer(partition, options)
266281
if err != nil {
267282
return err
268283
}
284+
285+
// Re-acquire mutex to safely update activeProducers slice
269286
s.mutex.Lock()
270287
s.activeProducers = append(s.activeProducers, producer)
271288
chSingleStreamPublishConfirmation := producer.NotifyPublishConfirmation()
@@ -277,21 +294,23 @@ func (s *SuperStreamProducer) ConnectPartition(partition string) error {
277294
event := <-_closedEvent
278295

279296
s.mutex.Lock()
280-
defer s.mutex.Unlock()
281297
for i := range s.activeProducers {
282298
if s.activeProducers[i].GetStreamName() == gpartion {
283299
s.activeProducers = append(s.activeProducers[:i], s.activeProducers[i+1:]...)
284300
break
285301
}
286302
}
303+
s.mutex.Unlock()
287304

305+
s.chSuperStreamPartitionMutex.Lock()
288306
if s.chSuperStreamPartitionClose != nil {
289307
s.chSuperStreamPartitionClose <- PPartitionClose{
290308
Partition: gpartion,
291309
Event: event,
292310
Context: s,
293311
}
294312
}
313+
s.chSuperStreamPartitionMutex.Unlock()
295314
logs.LogDebug("[SuperStreamProducer] chSuperStreamPartitionClose for partition: %s", gpartion)
296315
}(partition, closedEvent)
297316

@@ -368,11 +387,12 @@ func (s *SuperStreamProducer) Send(message message.StreamMessage) error {
368387
for _, p := range ps {
369388
producer := s.getProducer(p)
370389
if producer == nil {
371-
// the producer is not. It can happen if the tcp connection for the partition is dropped
390+
// the producer is not in the list. It can happen if the tcp connection for the partition is dropped
372391
// the user can reconnect the partition using the ConnectPartition
373392
// The client returns an error. Even there could be other partitions where the message can be sent.
374393
// but won't to that to break the expectation of the user. The routing should be always the same
375394
// for the same message. The user has to handle the error and decide to send the message again
395+
376396
return ErrProducerNotFound
377397
}
378398

@@ -404,11 +424,14 @@ func (s *SuperStreamProducer) Close() error {
404424
close(s.chNotifyPublishConfirmation)
405425
s.chNotifyPublishConfirmation = nil
406426
}
427+
s.mutex.Unlock()
428+
429+
s.chSuperStreamPartitionMutex.Lock()
407430
if s.chSuperStreamPartitionClose != nil {
408431
close(s.chSuperStreamPartitionClose)
409432
s.chSuperStreamPartitionClose = nil
410433
}
411-
s.mutex.Unlock()
434+
s.chSuperStreamPartitionMutex.Unlock()
412435
}()
413436
logs.LogDebug("[SuperStreamProducer] Closed SuperStreamProducer for: %s", s.SuperStream)
414437
return nil

0 commit comments

Comments
 (0)