Skip to content

Commit 2c92a45

Browse files
committed
Back-off while reconnecting
1 parent bd40a3b commit 2c92a45

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

extension/amqp/publisher.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"io"
66
"sync"
7+
"time"
78

89
"github.com/hellofresh/goengine"
910
"github.com/hellofresh/goengine/driver/sql"
@@ -15,9 +16,11 @@ var _ sql.ProjectionTrigger = (&NotificationPublisher{}).Publish
1516

1617
// NotificationPublisher is responsible of publishing a notification to queue
1718
type NotificationPublisher struct {
18-
amqpDSN string
19-
queue string
20-
logger goengine.Logger
19+
amqpDSN string
20+
queue string
21+
minReconnectInterval time.Duration
22+
maxReconnectInterval time.Duration
23+
logger goengine.Logger
2124

2225
connection io.Closer
2326
channel NotificationChannel
@@ -26,7 +29,11 @@ type NotificationPublisher struct {
2629
}
2730

2831
// NewNotificationPublisher returns an instance of NotificationPublisher
29-
func NewNotificationPublisher(amqpDSN, queue string,
32+
func NewNotificationPublisher(
33+
amqpDSN,
34+
queue string,
35+
minReconnectInterval time.Duration,
36+
maxReconnectInterval time.Duration,
3037
logger goengine.Logger,
3138
connection io.Closer,
3239
channel NotificationChannel,
@@ -39,16 +46,19 @@ func NewNotificationPublisher(amqpDSN, queue string,
3946
return nil, goengine.InvalidArgumentError("queue")
4047
}
4148
return &NotificationPublisher{
42-
amqpDSN: amqpDSN,
43-
queue: queue,
44-
logger: logger,
45-
connection: connection,
46-
channel: channel,
49+
amqpDSN: amqpDSN,
50+
queue: queue,
51+
minReconnectInterval: minReconnectInterval,
52+
maxReconnectInterval: maxReconnectInterval,
53+
logger: logger,
54+
connection: connection,
55+
channel: channel,
4756
}, nil
4857
}
4958

5059
// Publish sends a ProjectionNotification to Queue
5160
func (p *NotificationPublisher) Publish(ctx context.Context, notification *sql.ProjectionNotification) error {
61+
reconnectInterval := p.minReconnectInterval
5262
// Ignore nil notifications since this is not supported
5363
if notification == nil {
5464
p.logger.Warn("unable to handle nil notification, skipping", nil)
@@ -79,6 +89,13 @@ func (p *NotificationPublisher) Publish(ctx context.Context, notification *sql.P
7989
entry.Error(err)
8090
})
8191
}
92+
93+
time.Sleep(reconnectInterval)
94+
reconnectInterval *= 2
95+
if reconnectInterval > p.maxReconnectInterval {
96+
reconnectInterval = p.maxReconnectInterval
97+
}
98+
8299
p.connection = nil
83100
p.channel = nil
84101
continue

extension/amqp/publisher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ func TestNotificationPublisher_Publish(t *testing.T) {
2626
t.Run("Invalid arguments", func(t *testing.T) {
2727
logger, _ := getLogger()
2828

29-
_, err := goengineAmqp.NewNotificationPublisher("http://localhost:5672/", "my-queue", logger, connection, channel)
29+
_, err := goengineAmqp.NewNotificationPublisher("http://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
3030
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)
3131

32-
_, err = goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "", logger, connection, channel)
32+
_, err = goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "", 3, 4, logger, connection, channel)
3333
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
3434

3535
})
@@ -38,7 +38,7 @@ func TestNotificationPublisher_Publish(t *testing.T) {
3838
ensure := require.New(t)
3939
logger, loggerHook := getLogger()
4040

41-
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", logger, connection, channel)
41+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
4242
ensure.NoError(err)
4343
err = publisher.Publish(ctx, nil)
4444
ensure.Nil(err)
@@ -50,7 +50,7 @@ func TestNotificationPublisher_Publish(t *testing.T) {
5050
ensure := require.New(t)
5151
logger, loggerHook := getLogger()
5252

53-
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", logger, connection, channel)
53+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
5454
ensure.NoError(err)
5555

5656
err = publisher.Publish(ctx, &sql.ProjectionNotification{No: 1, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"})

0 commit comments

Comments
 (0)