Skip to content

Commit e46101c

Browse files
Add background connect option to common utility - superseding initialConnectAttempts
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
1 parent 8eb7508 commit e46101c

File tree

4 files changed

+65
-8
lines changed

4 files changed

+65
-8
lines changed

pkg/config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,15 @@ type sectionParent interface {
8989
AddChild(key string, defValue ...interface{})
9090
}
9191

92+
type SectionArrayCommon interface {
93+
KeySet
94+
SubSection(name string) Section
95+
}
96+
9297
// Section represents a section of the global configuration, at a nested point in the config hierarchy.
9398
// Note that all keys are added to a GLOBAL map, so this cannot be used for per-instance customization.
9499
type Section interface {
95-
KeySet
100+
SectionArrayCommon
96101
SetDefault(key string, defValue interface{})
97102
SubSection(name string) Section
98103
SubArray(name string) ArraySection

pkg/wsclient/wsclient.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ type WSConfig struct {
4646
InitialDelay time.Duration `json:"initialDelay,omitempty"`
4747
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
4848
DelayFactor float64 `json:"delayFactor,omitempty"`
49-
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
49+
BackgroundConnect bool `json:"backgroundConnect,omitempty"`
50+
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"` // recommend backgroundConnect instead
5051
DisableReconnect bool `json:"disableReconnect"`
5152
AuthUsername string `json:"authUsername,omitempty"`
5253
AuthPassword string `json:"authPassword,omitempty"`
@@ -104,6 +105,7 @@ type wsClient struct {
104105
ctx context.Context
105106
headers http.Header
106107
url string
108+
backgroundConnect bool
107109
initialRetryAttempts int
108110
wsdialer *websocket.Dialer
109111
wsconn *websocket.Conn
@@ -114,6 +116,8 @@ type wsClient struct {
114116
receiveExt chan *WSPayload
115117
send chan []byte
116118
sendDone chan []byte
119+
bgConnCancelCtx context.CancelFunc
120+
bgConnDone chan struct{}
117121
closing chan struct{}
118122
beforeConnect WSPreConnectHandler
119123
afterConnect WSPostConnectHandler
@@ -153,6 +157,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
153157
MaximumDelay: config.MaximumDelay,
154158
Factor: config.DelayFactor,
155159
},
160+
backgroundConnect: config.BackgroundConnect,
156161
initialRetryAttempts: config.InitialConnectAttempts,
157162
headers: make(http.Header),
158163
send: make(chan []byte),
@@ -225,12 +230,28 @@ func (w *wsClient) setupReceiveChannel() {
225230

226231
func (w *wsClient) Connect() error {
227232

233+
if w.backgroundConnect && w.bgConnDone == nil {
234+
w.bgConnDone = make(chan struct{})
235+
w.ctx, w.bgConnCancelCtx = context.WithCancel(w.ctx)
236+
go func() {
237+
defer close(w.bgConnDone)
238+
err := w.initialConnect()
239+
if err != nil {
240+
// Retry means we only reach here if context closes
241+
log.L(w.ctx).Errorf("Connection to WebSocket %s was never established before shutdown: %s", w.url, err)
242+
}
243+
}()
244+
return nil
245+
}
246+
247+
return w.initialConnect()
248+
}
249+
250+
func (w *wsClient) initialConnect() error {
228251
if err := w.connect(true); err != nil {
229252
return err
230253
}
231-
232254
go w.receiveReconnectLoop()
233-
234255
return nil
235256
}
236257

@@ -242,6 +263,12 @@ func (w *wsClient) Close() {
242263
if c != nil {
243264
_ = c.Close()
244265
}
266+
bgc := w.bgConnDone
267+
if bgc != nil {
268+
w.bgConnCancelCtx()
269+
<-w.bgConnDone
270+
w.bgConnDone = nil
271+
}
245272
}
246273
}
247274

@@ -337,7 +364,7 @@ func (w *wsClient) connect(initial bool) error {
337364
return false, i18n.NewError(w.ctx, i18n.MsgWSClosing)
338365
}
339366

340-
retry = !initial || attempt < w.initialRetryAttempts
367+
retry = w.backgroundConnect || !initial || attempt < w.initialRetryAttempts
341368
if w.beforeConnect != nil {
342369
if err = w.beforeConnect(w.ctx, w); err != nil {
343370
l.Warnf("WS %s connect attempt %d failed in beforeConnect", w.url, attempt)
@@ -346,7 +373,7 @@ func (w *wsClient) connect(initial bool) error {
346373
}
347374

348375
var res *http.Response
349-
w.wsconn, res, err = w.wsdialer.Dial(w.url, w.headers)
376+
w.wsconn, res, err = w.wsdialer.DialContext(w.ctx, w.url, w.headers)
350377
if err != nil {
351378
var b []byte
352379
var status = -1

pkg/wsclient/wsclient_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestWSClientE2ETLS(t *testing.T) {
130130

131131
}
132132

133-
func TestWSClientE2E(t *testing.T) {
133+
func TestWSClientE2EBG(t *testing.T) {
134134

135135
toServer, fromServer, url, close := NewTestWSServer(func(req *http.Request) {
136136
assert.Equal(t, "/test/updated", req.URL.Path)
@@ -155,7 +155,7 @@ func TestWSClientE2E(t *testing.T) {
155155
wsConfig.HTTPURL = url
156156
wsConfig.WSKeyPath = "/test"
157157
wsConfig.HeartbeatInterval = 50 * time.Millisecond
158-
wsConfig.InitialConnectAttempts = 2
158+
wsConfig.BackgroundConnect = true
159159

160160
wsc, err := New(context.Background(), wsConfig, beforeConnect, afterConnect)
161161
assert.NoError(t, err)
@@ -261,6 +261,21 @@ func TestWSClientE2EReceiveExt(t *testing.T) {
261261

262262
}
263263

264+
func TestWSNeverConnectBG(t *testing.T) {
265+
closedSvr := httptest.NewServer(&http.ServeMux{})
266+
closedSvr.Close()
267+
268+
wsc, err := New(context.Background(), &WSConfig{
269+
HTTPURL: closedSvr.URL,
270+
BackgroundConnect: true,
271+
}, nil, nil)
272+
assert.NoError(t, err)
273+
err = wsc.Connect()
274+
assert.NoError(t, err)
275+
276+
wsc.Close()
277+
}
278+
264279
func TestWSClientBadWSURL(t *testing.T) {
265280
wsConfig := generateConfig()
266281
wsConfig.WebSocketURL = ":::"

pkg/wsclient/wsconfig.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ const (
4242
WSConfigKeyReadBufferSize = "ws.readBufferSize"
4343
// WSConfigKeyInitialConnectAttempts sets how many times the websocket should attempt to connect on startup, before failing (after initial connection, retry is indefinite)
4444
WSConfigKeyInitialConnectAttempts = "ws.initialConnectAttempts"
45+
// WSConfigKeyBackgroundConnect is recommended instead of initialConnectAttempts for new uses of this library, and makes initial connection and reconnection identical in behavior
46+
WSConfigKeyBackgroundConnect = "ws.backgroundConnect"
4547
// WSConfigKeyPath if set will define the path to connect to - allows sharing of the same URL between HTTP and WebSocket connection info
4648
WSConfigKeyPath = "ws.path"
4749
// WSConfigURL if set will be a completely separate URL for WebSockets (must be a ws: or wss: scheme)
@@ -60,7 +62,15 @@ func InitConfig(conf config.Section) {
6062
ffresty.InitConfig(conf)
6163
conf.AddKnownKey(WSConfigKeyWriteBufferSize, defaultBufferSize)
6264
conf.AddKnownKey(WSConfigKeyReadBufferSize, defaultBufferSize)
65+
66+
// Note that conf.SetDefault(WSConfigKeyBackgroundConnect, true) is recommended for implementations
67+
// that embed this library, which will cause continual exponential backoff retry connection
68+
// even on the initial connection.
69+
conf.AddKnownKey(WSConfigKeyBackgroundConnect, false)
70+
71+
// Ignored if WSConfigKeyBackgroundConnect is true
6372
conf.AddKnownKey(WSConfigKeyInitialConnectAttempts, defaultInitialConnectAttempts)
73+
6474
conf.AddKnownKey(WSConfigKeyPath)
6575
conf.AddKnownKey(WSConfigURL)
6676
conf.AddKnownKey(WSConfigKeyHeartbeatInterval, defaultHeartbeatInterval)

0 commit comments

Comments
 (0)