Skip to content

Commit 8e25763

Browse files
committed
feat: add reconnect for mq producer
1 parent 63b82f0 commit 8e25763

File tree

1 file changed

+80
-1
lines changed

1 file changed

+80
-1
lines changed

mq/producer.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,23 @@ package mq
33
import (
44
"context"
55
"crypto/md5"
6+
"errors"
67
"fmt"
8+
"math"
9+
"time"
10+
711
amqp "github.com/rabbitmq/amqp091-go"
812
"go.opentelemetry.io/otel"
13+
"golang.org/x/sync/singleflight"
914
)
1015

1116
type Producer struct {
1217
Conn *amqp.Connection
1318
Channel *amqp.Channel
1419

20+
amqpURI string // AMQP URI for RabbitMQ reconnection
21+
sf *singleflight.Group
22+
1523
appId string
1624
}
1725

@@ -21,7 +29,9 @@ func NewProducer(appId string, amqpURI string) (*Producer, error) {
2129
}
2230

2331
p := &Producer{
24-
appId: appId,
32+
appId: appId,
33+
amqpURI: amqpURI,
34+
sf: new(singleflight.Group),
2535
}
2636

2737
var err error
@@ -34,6 +44,53 @@ func NewProducer(appId string, amqpURI string) (*Producer, error) {
3444
return p, nil
3545
}
3646

47+
func (p *Producer) isConnected() bool {
48+
return !p.Conn.IsClosed() && !p.Channel.IsClosed()
49+
}
50+
51+
func (p *Producer) connectFn() error {
52+
if p.isConnected() {
53+
return nil
54+
}
55+
56+
_, err, _ := p.sf.Do("reconnect", func() (interface{}, error) {
57+
var lastErr error
58+
for i := 0; i < 3; i++ {
59+
if p.isConnected() {
60+
return nil, nil
61+
}
62+
63+
if i > 0 {
64+
time.Sleep(time.Second * time.Duration(math.Pow(2, float64(i-1))))
65+
}
66+
67+
conn, channel, err := initConnection(p.amqpURI)
68+
if err != nil {
69+
lastErr = fmt.Errorf("reconnect attempt %d failed: %s", i+1, err)
70+
continue
71+
}
72+
73+
oldConn := p.Conn
74+
oldChannel := p.Channel
75+
p.Conn = conn
76+
p.Channel = channel
77+
78+
if oldChannel != nil {
79+
_ = oldChannel.Close()
80+
}
81+
if oldConn != nil {
82+
_ = oldConn.Close()
83+
}
84+
85+
return nil, nil
86+
}
87+
88+
return nil, lastErr
89+
})
90+
91+
return err
92+
}
93+
3794
func (p *Producer) PublishNotice(ctx context.Context, data *NoticeTemplate, options ...string) error {
3895

3996
if data == nil {
@@ -107,6 +164,28 @@ func (p *Producer) publish(ctx context.Context, key string, msg []byte, opts map
107164
Headers: headers,
108165
})
109166

167+
if err != nil && errors.Is(err, amqp.ErrClosed) {
168+
if err = p.connectFn(); err != nil {
169+
return err
170+
}
171+
172+
err = p.Channel.PublishWithContext(
173+
ctx,
174+
exchangeName,
175+
key,
176+
false,
177+
false,
178+
amqp.Publishing{
179+
ContentType: "application/json",
180+
DeliveryMode: amqp.Persistent,
181+
Body: msg,
182+
AppId: p.appId,
183+
UserId: opts[UserIdKey],
184+
MessageId: fmt.Sprintf("%x", md5.Sum(msg)),
185+
Headers: headers,
186+
})
187+
}
188+
110189
return err
111190

112191
}

0 commit comments

Comments
 (0)