Skip to content

Commit 060ecf0

Browse files
[release-1.11] Added hard reset conditions to RabbitMQ connections in the Adapter and Dispatcher (#1222)
* implemented rabbitmq reconnection pod hard reset that kill the pods after 5 soft retries, now the state of the whole thing is clear to the user * refactore to improve testing --------- Co-authored-by: Gabriel Freites <[email protected]>
1 parent 3704acb commit 060ecf0

File tree

6 files changed

+33
-24
lines changed

6 files changed

+33
-24
lines changed

cmd/dispatcher/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func main() {
104104
}
105105

106106
var err error
107-
rmqHelper := rabbit.NewRabbitMQConnectionHandler(1000, logger)
107+
rmqHelper := rabbit.NewRabbitMQConnectionHandler(5, 1000, logger)
108108
rmqHelper.Setup(ctx, rabbit.VHostHandler(
109109
env.RabbitURL,
110110
env.RabbitMQVhost),

cmd/ingress/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func main() {
9191
logger.Errorw("failed to create the metrics exporter", zap.Error(err))
9292
}
9393

94-
env.rmqHelper = rabbit.NewRabbitMQConnectionHandler(1000, logger)
94+
env.rmqHelper = rabbit.NewRabbitMQConnectionHandler(5, 1000, logger)
9595
env.rmqHelper.Setup(ctx, rabbit.VHostHandler(env.BrokerURL, env.RabbitMQVhost), rabbit.ChannelConfirm, rabbit.DialWrapper)
9696

9797
env.reporter = ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

pkg/adapter/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (a *Adapter) start(stopCh <-chan struct{}) error {
9898
zap.String("SinkURI", a.config.Sink))
9999

100100
if a.rmqHelper == nil {
101-
a.rmqHelper = rabbit.NewRabbitMQConnectionHandler(1000, logger)
101+
a.rmqHelper = rabbit.NewRabbitMQConnectionHandler(5, 1000, logger)
102102
a.rmqHelper.Setup(a.context, rabbit.VHostHandler(a.config.RabbitURL, a.config.Vhost), rabbit.ChannelQoS, rabbit.DialWrapper)
103103
}
104104
return a.PollForMessages(stopCh)

pkg/adapter/adapter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func TestAdapter_PollForMessages(t *testing.T) {
250250
context: ctx,
251251
logger: zap.NewNop(),
252252
reporter: statsReporter,
253-
rmqHelper: rabbit.NewRabbitMQConnectionHandler(500, zap.NewNop().Sugar()),
253+
rmqHelper: rabbit.NewRabbitMQConnectionHandler(5, 500, zap.NewNop().Sugar()),
254254
}
255255
a.rmqHelper.Setup(ctx, "", nil, rabbit.ValidDial)
256256

pkg/rabbit/connections_handler.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ type RabbitMQChannelInterface interface {
5555
}
5656

5757
type RabbitMQConnectionHandler struct {
58-
firstSetup bool
59-
cycleDuration time.Duration
60-
Connection RabbitMQConnectionWrapperInterface
61-
Channel RabbitMQChannelInterface
58+
firstSetup bool
59+
reconnTries, reconnectionTriesThreshold int
60+
cycleDuration time.Duration
61+
Connection RabbitMQConnectionWrapperInterface
62+
Channel RabbitMQChannelInterface
6263

6364
logger *zap.SugaredLogger
6465
}
@@ -102,11 +103,12 @@ func (r *RabbitMQConnection) IsClosed() bool {
102103
return true
103104
}
104105

105-
func NewRabbitMQConnectionHandler(cycleDuration time.Duration, logger *zap.SugaredLogger) RabbitMQConnectionsHandlerInterface {
106+
func NewRabbitMQConnectionHandler(reconnectionTriesThreshold int, cycleDuration time.Duration, logger *zap.SugaredLogger) RabbitMQConnectionsHandlerInterface {
106107
return &RabbitMQConnectionHandler{
107-
firstSetup: true,
108-
cycleDuration: cycleDuration,
109-
logger: logger,
108+
reconnectionTriesThreshold: reconnectionTriesThreshold,
109+
firstSetup: true,
110+
cycleDuration: cycleDuration,
111+
logger: logger,
110112
}
111113
}
112114

@@ -127,8 +129,7 @@ func (r *RabbitMQConnectionHandler) createConnectionAndChannel(
127129
ctx context.Context,
128130
rabbitMQURL string,
129131
configFunction func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error,
130-
dialFunc func(string) (RabbitMQConnectionWrapperInterface, error)) {
131-
var err error
132+
dialFunc func(string) (RabbitMQConnectionWrapperInterface, error)) (err error) {
132133
for {
133134
// create the connection
134135
r.Connection, err = r.createConnection(rabbitMQURL, dialFunc)
@@ -142,10 +143,17 @@ func (r *RabbitMQConnectionHandler) createConnectionAndChannel(
142143
}
143144
// setup completed successfully
144145
if err == nil {
145-
break
146+
r.reconnTries = 0
147+
return
146148
}
147149
r.logger.Error(err)
148150
r.closeRabbitMQConnections()
151+
152+
r.reconnTries += 1
153+
if r.reconnTries > r.reconnectionTriesThreshold {
154+
err = errors.New("could not communicate to rabbitmq, restarting pods...")
155+
return
156+
}
149157
}
150158
}
151159

@@ -164,7 +172,9 @@ func (r *RabbitMQConnectionHandler) watchRabbitMQConnections(
164172
case <-r.GetChannel().NotifyClose(make(chan *amqp091.Error)):
165173
}
166174
r.closeRabbitMQConnections()
167-
r.createConnectionAndChannel(ctx, rabbitMQURL, configFunction, dialFunc)
175+
if err := r.createConnectionAndChannel(ctx, rabbitMQURL, configFunction, dialFunc); err != nil {
176+
r.logger.Fatal(err)
177+
}
168178
}
169179
}
170180

pkg/rabbit/connections_handler_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func Test_ValidSetupRabbitMQ(t *testing.T) {
3030
logger := zap.NewNop().Sugar()
3131
ctx, cancelFunc := context.WithCancel(context.Background())
32-
rabbitMQHelper := NewRabbitMQConnectionHandler(100, logger).(*RabbitMQConnectionHandler)
32+
rabbitMQHelper := NewRabbitMQConnectionHandler(1, 100, logger).(*RabbitMQConnectionHandler)
3333
rabbitMQHelper.Setup(ctx, "amqp://localhost:5672/%2f", ConfigTest, ValidDial)
3434
if rabbitMQHelper.Connection == nil || rabbitMQHelper.Channel == nil {
3535
t.Errorf("rabbitMQHelper connection and channel should be set %s %s", rabbitMQHelper.Connection, rabbitMQHelper.Channel)
@@ -41,15 +41,15 @@ func Test_InvalidSetupRabbitMQ(t *testing.T) {
4141
var err error
4242
logger := zap.NewNop().Sugar()
4343
// test invalid connection setup
44-
rabbitMQHelper := NewRabbitMQConnectionHandler(100, logger).(*RabbitMQConnectionHandler)
45-
rabbitMQHelper.Connection, err = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", BadConnectionDial)
44+
rabbitMQHelper := NewRabbitMQConnectionHandler(1, 100, logger).(*RabbitMQConnectionHandler)
45+
err = rabbitMQHelper.createConnectionAndChannel(context.TODO(), "amqp://localhost:5672/%2f", nil, BadConnectionDial)
4646
if err == nil || rabbitMQHelper.GetConnection() != nil {
4747
t.Errorf("unexpected error == nil when setting up invalid connection %s %s %s", rabbitMQHelper.GetConnection(), rabbitMQHelper.GetChannel(), err)
4848
}
4949
rabbitMQHelper.Connection = nil
5050
rabbitMQHelper.closeRabbitMQConnections()
5151
// test invalid channel setup
52-
rabbitMQHelper = NewRabbitMQConnectionHandler(100, logger).(*RabbitMQConnectionHandler)
52+
rabbitMQHelper = NewRabbitMQConnectionHandler(1, 100, logger).(*RabbitMQConnectionHandler)
5353
rabbitMQHelper.Connection, _ = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", BadChannelDial)
5454
rabbitMQHelper.Channel, err = rabbitMQHelper.createChannel()
5555
if err == nil || rabbitMQHelper.GetChannel() != nil {
@@ -58,7 +58,7 @@ func Test_InvalidSetupRabbitMQ(t *testing.T) {
5858
rabbitMQHelper.closeRabbitMQConnections()
5959
rabbitMQHelper.Connection, rabbitMQHelper.Channel = nil, nil
6060
// test invalid config setup
61-
rabbitMQHelper = NewRabbitMQConnectionHandler(100, logger).(*RabbitMQConnectionHandler)
61+
rabbitMQHelper = NewRabbitMQConnectionHandler(1, 100, logger).(*RabbitMQConnectionHandler)
6262

6363
rabbitMQHelper.Connection, _ = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", ValidDial)
6464
rabbitMQHelper.Channel, _ = rabbitMQHelper.createChannel()
@@ -104,9 +104,8 @@ func Test_WatchConnectionsRabbitMQ(t *testing.T) {
104104
ctx, cancelFunc := context.WithCancel(context.Background())
105105
logger := zap.NewNop().Sugar()
106106
// test invalid connection setup
107-
rabbitMQHelper := NewRabbitMQConnectionHandler(100, logger).(*RabbitMQConnectionHandler)
108-
rabbitMQHelper.Connection, _ = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", ValidDial)
109-
rabbitMQHelper.Channel, _ = rabbitMQHelper.createChannel()
107+
rabbitMQHelper := NewRabbitMQConnectionHandler(1, 100, logger).(*RabbitMQConnectionHandler)
108+
rabbitMQHelper.createConnectionAndChannel(context.TODO(), "amqp://localhost:5672/%2f", nil, ValidDial)
110109
go func() {
111110
time.Sleep(time.Millisecond * 200)
112111
if tt.endFunc == "connection" {

0 commit comments

Comments
 (0)