Skip to content

Commit dd5cee4

Browse files
alovaksonali.bhavsar
andauthored
Exponential retry (#66)
* Implement exponential backoff for retries * do not use exp backoff if MaxReconnectWait is set to 0 * solve the deadlock * add test for MaxReconnectWait and exponential backoff * update documentation with default values --------- Co-authored-by: sonali.bhavsar <sonali.bhavsar@espressif.com>
1 parent 0472c40 commit dd5cee4

File tree

7 files changed

+110
-30
lines changed

7 files changed

+110
-30
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ by the server. It does it using `ConnectionClosedHandler`.
264264
Following options are supported:
265265

266266
* `ReconnectWait` sets the time to wait after first re-connect attempt
267+
* `MaxReconnectWait` specifies the maximum duration to wait between reconnection attempts, serving as the upper bound for exponential backoff; if set to zero, there's no exponential backoff and ReconnectWait is used for each retry.
267268
* `ErrorHandler` is called in a goroutine with the errors that can't be returned to the caller (from other goroutines)
268269
* `MinConnections` is the number of connections required to be established when we connect the pool
269270
* `ConnectionsFilter` is a function to filter connections in the pool for `Get`, `IsDegraded` or `IsUp` methods

go.sum

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
55
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
66
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
77
github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50=
8-
github.com/moov-io/iso8583 v0.18.3 h1:RYr/zoNxeRcLpJ1VjJ8e8yC0fRK5+xFqKqpnyMTz3+A=
9-
github.com/moov-io/iso8583 v0.18.3/go.mod h1:QPxDQTxKJPjECpK8vCsh7TpwvKyIrw3tXN/lrdATzCA=
10-
github.com/moov-io/iso8583 v0.19.0 h1:pIPWsBCydjN6SEoFUWuc4YKlY8e+NzKrnRvx7cfxCdo=
11-
github.com/moov-io/iso8583 v0.19.0/go.mod h1:QPxDQTxKJPjECpK8vCsh7TpwvKyIrw3tXN/lrdATzCA=
12-
github.com/moov-io/iso8583 v0.19.1 h1:401h9LFZYxsG/rDc3uHQxZn/rZoAhSMs4WQrnZPoGI8=
13-
github.com/moov-io/iso8583 v0.19.1/go.mod h1:QPxDQTxKJPjECpK8vCsh7TpwvKyIrw3tXN/lrdATzCA=
148
github.com/moov-io/iso8583 v0.19.2 h1:xV5kY0t7UpN/d/polVdNt4Rj6ub4eZuTSTarSqC2NtQ=
159
github.com/moov-io/iso8583 v0.19.2/go.mod h1:QPxDQTxKJPjECpK8vCsh7TpwvKyIrw3tXN/lrdATzCA=
1610
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

options.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,21 @@ import (
1313

1414
type Options struct {
1515
// ConnectTimeout sets the timeout for establishing new connections.
16+
// The default is 10 seconds.
1617
ConnectTimeout time.Duration
1718

18-
// SendTimeout sets the timeout for a Send operation
19+
// SendTimeout sets the timeout for a Send operation.
20+
// The default is 30 seconds.
1921
SendTimeout time.Duration
2022

2123
// IdleTime is the period at which the client will be sending ping
22-
// message to the server
24+
// message to the server.
25+
// The default is 5 seconds.
2326
IdleTime time.Duration
2427

2528
// ReadTimeout is the maximum time between read events before the
26-
// ReadTimeoutHandler is called
29+
// ReadTimeoutHandler is called.
30+
// The default is 60 seconds.
2731
ReadTimeout time.Duration
2832

2933
// PingHandler is called when no message was sent during idle time

pool.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,19 +168,23 @@ func (p *Pool) handleClosedConnection(closedConn *Connection) {
168168
func (p *Pool) recreateConnection(closedConn *Connection) {
169169
defer p.wg.Done()
170170

171-
var conn *Connection
172-
var err error
171+
reconnectTime := p.Opts.ReconnectWait
172+
173173
for {
174174
select {
175-
case <-time.After(p.Opts.ReconnectWait):
176-
// Wait before recreating connection
175+
case <-time.After(reconnectTime):
176+
if p.Opts.MaxReconnectWait != 0 {
177+
reconnectTime *= 2
178+
if reconnectTime > p.Opts.MaxReconnectWait {
179+
reconnectTime = p.Opts.MaxReconnectWait
180+
}
181+
}
177182
case <-p.Done():
178-
// If pool is closed, let's get out of here
183+
// if pool is closed, let's get out of here
179184
return
180185
}
181186

182-
// Recreate connection
183-
conn, err = p.Factory(closedConn.addr)
187+
conn, err := p.Factory(closedConn.addr)
184188
if err != nil {
185189
p.handleError(fmt.Errorf("failed to re-create connection for %s: %w", closedConn.addr, err))
186190
return
@@ -190,17 +194,17 @@ func (p *Pool) recreateConnection(closedConn *Connection) {
190194
// recreate goroutine to create new connection for the same address
191195
conn.SetOptions(ConnectionClosedHandler(p.handleClosedConnection))
192196

193-
err = conn.Connect()
194-
if err == nil {
195-
break
197+
// if we successfully reconnected, add connection to the pool and return
198+
if err = conn.Connect(); err == nil {
199+
p.mu.Lock()
200+
p.connections = append(p.connections, conn)
201+
p.mu.Unlock()
202+
203+
return
196204
}
197205

198206
p.handleError(fmt.Errorf("failed to reconnect to %s: %w", conn.addr, err))
199207
}
200-
201-
p.mu.Lock()
202-
p.connections = append(p.connections, conn)
203-
p.mu.Unlock()
204208
}
205209

206210
// Close closes all connections in the pool

pool_options.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,38 @@ type PoolOption func(*PoolOptions) error
88

99
type PoolOptions struct {
1010
// ReconnectWait sets the time to wait after first re-connect attempt
11+
// The default is 5 seconds
1112
ReconnectWait time.Duration
1213

14+
// MaxReconnectWait specifies the maximum duration to wait between
15+
// reconnection attempts, serving as the upper bound for exponential
16+
// backoff.
17+
// A zero value means no exponential backoff and ReconnectWait is used
18+
// for each retry.
19+
// The default is 0.
20+
MaxReconnectWait time.Duration
21+
1322
// ErrorHandler is called in a goroutine with the errors that can't be
1423
// returned to the caller
1524
ErrorHandler func(err error)
1625

1726
// MinConnections is the number of connections required to be established when
18-
// we connect the pool
27+
// we connect the pool.
28+
// A zero value means that Pool will not return error on `Connect` and will
29+
// try to connect to all the addresses in the pool.
30+
// The default is 1.
1931
MinConnections int
2032

21-
// ConntionsFilter is a function to filter connections in the pool
33+
// ConnectionsFilter is a function to filter connections in the pool
2234
// when Get() is called
2335
ConnectionsFilter func(*Connection) bool
2436
}
2537

2638
func GetDefaultPoolOptions() PoolOptions {
2739
return PoolOptions{
28-
ReconnectWait: 5 * time.Second,
29-
MinConnections: 1,
40+
ReconnectWait: 5 * time.Second,
41+
MaxReconnectWait: 0,
42+
MinConnections: 1,
3043
}
3144
}
3245

@@ -37,6 +50,13 @@ func PoolReconnectWait(rw time.Duration) PoolOption {
3750
}
3851
}
3952

53+
func PoolMaxReconnectWait(rw time.Duration) PoolOption {
54+
return func(opts *PoolOptions) error {
55+
opts.MaxReconnectWait = rw
56+
return nil
57+
}
58+
}
59+
4060
func PoolMinConnections(n int) PoolOption {
4161
return func(opts *PoolOptions) error {
4262
opts.MinConnections = n

pool_test.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ func TestPool(t *testing.T) {
7373
// And the pool of connections
7474
// one of our tests
7575
reconnectWait := 500 * time.Millisecond
76-
pool, err := connection.NewPool(factory, addrs, connection.PoolReconnectWait(reconnectWait))
76+
pool, err := connection.NewPool(
77+
factory,
78+
addrs,
79+
connection.PoolReconnectWait(reconnectWait),
80+
)
7781
require.NoError(t, err)
7882
defer pool.Close()
7983

@@ -146,6 +150,59 @@ func TestPool(t *testing.T) {
146150
}, 2000*time.Millisecond, 50*time.Millisecond, "expect to have one less connection")
147151
})
148152

153+
t.Run("when MaxReconnectWait is set reconnect wait time will exponentially increase", func(t *testing.T) {
154+
// Context: MaxReconnectWait is set to 400ms and initial
155+
// ReconnectWait is 100ms. The server is offline, so the pool
156+
// will try to reconnect endlessly with exponential backoff
157+
// until we close the pool. We will wait for 850ms before
158+
// checking the number of reconnects. Expected number of
159+
// reconnects within 850ms is 3. The sequence of reconnect
160+
// waits is: 100ms, 200ms, 400ms (total 700ms). Next reconnect
161+
// would take 400ms, which exceeds 1 second. Thus, 3 reconnect
162+
// attempts are expected.
163+
164+
// Counter for connection attempts
165+
var connectAttempts atomic.Int32
166+
167+
// Factory method will be called on each connection attempt
168+
factory := func(addr string) (*connection.Connection, error) {
169+
// Increment connection attempts counter
170+
connectAttempts.Add(1)
171+
172+
c, err := connection.New(
173+
addr,
174+
testSpec,
175+
readMessageLength,
176+
writeMessageLength,
177+
)
178+
if err != nil {
179+
return nil, fmt.Errorf("building iso 8583 connection: %w", err)
180+
}
181+
182+
return c, nil
183+
}
184+
185+
pool, err := connection.NewPool(
186+
factory,
187+
[]string{"no-live-server-address"}, // connect only to the first server
188+
connection.PoolReconnectWait(100*time.Millisecond),
189+
connection.PoolMaxReconnectWait(400*time.Millisecond),
190+
// let pool start even without connections (it will start reconnecting)
191+
connection.PoolMinConnections(0),
192+
)
193+
require.NoError(t, err)
194+
195+
// There should be no error even if we try to connect to a non-existing server
196+
// as the min connections is set to 0
197+
err = pool.Connect()
198+
require.NoError(t, err)
199+
defer pool.Close()
200+
201+
time.Sleep(850 * time.Millisecond)
202+
203+
require.Equal(t, int32(4), connectAttempts.Load(), "expected 4 connection attempts (3 reconnects + 1 initial connect)")
204+
})
205+
149206
t.Run("Close() closes all connections", func(t *testing.T) {
150207
require.NotZero(t, pool.Connections())
151208

server/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ func (s *Server) Close() {
181181
s.ln.Close()
182182
}
183183

184-
s.wg.Wait()
185-
186184
s.isClosed = true
187185
s.mu.Unlock()
186+
187+
s.wg.Wait()
188188
}
189189

190190
func (s *Server) handleConnection(conn net.Conn) error {

0 commit comments

Comments
 (0)