Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (

ConfigGlobalWsConnectionTimeout = ffc("config.global.ws.connectionTimeout", "The amount of time to wait while establishing a connection (or auto-reconnection)", TimeDurationType)
ConfigGlobalWsHeartbeatInterval = ffc("config.global.ws.heartbeatInterval", "The amount of time to wait between heartbeat signals on the WebSocket connection", TimeDurationType)
ConfigGlobalWsBackgroundConnect = ffc("config.global.ws.backgroundConnect", "When true the connection is established in the background with infinite reconnect (makes initialConnectAttempts redundant when set)", BooleanType)
ConfigGlobalWsInitialConnectAttempts = ffc("config.global.ws.initialConnectAttempts", "The number of attempts FireFly will make to connect to the WebSocket when starting up, before failing", IntType)
ConfigGlobalWsPath = ffc("config.global.ws.path", "The WebSocket sever URL to which FireFly should connect", "WebSocket URL "+StringType)
ConfigGlobalWsReadBufferSize = ffc("config.global.ws.readBufferSize", "The size in bytes of the read buffer for the WebSocket connection", ByteSizeType)
Expand Down
37 changes: 32 additions & 5 deletions pkg/wsclient/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type WSConfig struct {
InitialDelay time.Duration `json:"initialDelay,omitempty"`
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
DelayFactor float64 `json:"delayFactor,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
BackgroundConnect bool `json:"backgroundConnect,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"` // recommend backgroundConnect instead
DisableReconnect bool `json:"disableReconnect"`
AuthUsername string `json:"authUsername,omitempty"`
AuthPassword string `json:"authPassword,omitempty"`
Expand Down Expand Up @@ -104,6 +105,7 @@ type wsClient struct {
ctx context.Context
headers http.Header
url string
backgroundConnect bool
initialRetryAttempts int
wsdialer *websocket.Dialer
wsconn *websocket.Conn
Expand All @@ -114,6 +116,8 @@ type wsClient struct {
receiveExt chan *WSPayload
send chan []byte
sendDone chan []byte
bgConnCancelCtx context.CancelFunc
bgConnDone chan struct{}
closing chan struct{}
beforeConnect WSPreConnectHandler
afterConnect WSPostConnectHandler
Expand Down Expand Up @@ -153,6 +157,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
MaximumDelay: config.MaximumDelay,
Factor: config.DelayFactor,
},
backgroundConnect: config.BackgroundConnect,
initialRetryAttempts: config.InitialConnectAttempts,
headers: make(http.Header),
send: make(chan []byte),
Expand Down Expand Up @@ -225,12 +230,28 @@ func (w *wsClient) setupReceiveChannel() {

func (w *wsClient) Connect() error {

if w.backgroundConnect && w.bgConnDone == nil {
w.bgConnDone = make(chan struct{})
w.ctx, w.bgConnCancelCtx = context.WithCancel(w.ctx)
go func() {
defer close(w.bgConnDone)
err := w.initialConnect()
if err != nil {
// Retry means we only reach here if context closes
log.L(w.ctx).Errorf("Connection to WebSocket %s was never established before shutdown: %s", w.url, err)
}
}()
return nil
}

return w.initialConnect()
}

func (w *wsClient) initialConnect() error {
if err := w.connect(true); err != nil {
return err
}

go w.receiveReconnectLoop()

return nil
}

Expand All @@ -242,6 +263,12 @@ func (w *wsClient) Close() {
if c != nil {
_ = c.Close()
}
bgc := w.bgConnDone
if bgc != nil {
w.bgConnCancelCtx()
<-w.bgConnDone
w.bgConnDone = nil
}
}
}

Expand Down Expand Up @@ -337,7 +364,7 @@ func (w *wsClient) connect(initial bool) error {
return false, i18n.NewError(w.ctx, i18n.MsgWSClosing)
}

retry = !initial || attempt < w.initialRetryAttempts
retry = w.backgroundConnect || !initial || attempt < w.initialRetryAttempts
if w.beforeConnect != nil {
if err = w.beforeConnect(w.ctx, w); err != nil {
l.Warnf("WS %s connect attempt %d failed in beforeConnect", w.url, attempt)
Expand All @@ -346,7 +373,7 @@ func (w *wsClient) connect(initial bool) error {
}

var res *http.Response
w.wsconn, res, err = w.wsdialer.Dial(w.url, w.headers)
w.wsconn, res, err = w.wsdialer.DialContext(w.ctx, w.url, w.headers)
if err != nil {
var b []byte
var status = -1
Expand Down
19 changes: 17 additions & 2 deletions pkg/wsclient/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestWSClientE2ETLS(t *testing.T) {

}

func TestWSClientE2E(t *testing.T) {
func TestWSClientE2EBG(t *testing.T) {

toServer, fromServer, url, close := NewTestWSServer(func(req *http.Request) {
assert.Equal(t, "/test/updated", req.URL.Path)
Expand All @@ -155,7 +155,7 @@ func TestWSClientE2E(t *testing.T) {
wsConfig.HTTPURL = url
wsConfig.WSKeyPath = "/test"
wsConfig.HeartbeatInterval = 50 * time.Millisecond
wsConfig.InitialConnectAttempts = 2
wsConfig.BackgroundConnect = true

wsc, err := New(context.Background(), wsConfig, beforeConnect, afterConnect)
assert.NoError(t, err)
Expand Down Expand Up @@ -261,6 +261,21 @@ func TestWSClientE2EReceiveExt(t *testing.T) {

}

func TestWSNeverConnectBG(t *testing.T) {
closedSvr := httptest.NewServer(&http.ServeMux{})
closedSvr.Close()

wsc, err := New(context.Background(), &WSConfig{
HTTPURL: closedSvr.URL,
BackgroundConnect: true,
}, nil, nil)
assert.NoError(t, err)
err = wsc.Connect()
assert.NoError(t, err)

wsc.Close()
}

func TestWSClientBadWSURL(t *testing.T) {
wsConfig := generateConfig()
wsConfig.WebSocketURL = ":::"
Expand Down
11 changes: 11 additions & 0 deletions pkg/wsclient/wsconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
WSConfigKeyReadBufferSize = "ws.readBufferSize"
// WSConfigKeyInitialConnectAttempts sets how many times the websocket should attempt to connect on startup, before failing (after initial connection, retry is indefinite)
WSConfigKeyInitialConnectAttempts = "ws.initialConnectAttempts"
// WSConfigKeyBackgroundConnect is recommended instead of initialConnectAttempts for new uses of this library, and makes initial connection and reconnection identical in behavior
WSConfigKeyBackgroundConnect = "ws.backgroundConnect"
// WSConfigKeyPath if set will define the path to connect to - allows sharing of the same URL between HTTP and WebSocket connection info
WSConfigKeyPath = "ws.path"
// WSConfigURL if set will be a completely separate URL for WebSockets (must be a ws: or wss: scheme)
Expand All @@ -60,7 +62,15 @@ func InitConfig(conf config.Section) {
ffresty.InitConfig(conf)
conf.AddKnownKey(WSConfigKeyWriteBufferSize, defaultBufferSize)
conf.AddKnownKey(WSConfigKeyReadBufferSize, defaultBufferSize)

// Note that conf.SetDefault(WSConfigKeyBackgroundConnect, true) is recommended for implementations
// that embed this library, which will cause continual exponential backoff retry connection
// even on the initial connection.
conf.AddKnownKey(WSConfigKeyBackgroundConnect, false)

// Ignored if WSConfigKeyBackgroundConnect is true
conf.AddKnownKey(WSConfigKeyInitialConnectAttempts, defaultInitialConnectAttempts)

conf.AddKnownKey(WSConfigKeyPath)
conf.AddKnownKey(WSConfigURL)
conf.AddKnownKey(WSConfigKeyHeartbeatInterval, defaultHeartbeatInterval)
Expand All @@ -84,6 +94,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*WSConfig, error)
MaximumDelay: conf.GetDuration(ffresty.HTTPConfigRetryMaxDelay),
DelayFactor: conf.GetFloat64(WSConfigDelayFactor),
InitialConnectAttempts: conf.GetInt(WSConfigKeyInitialConnectAttempts),
BackgroundConnect: conf.GetBool(WSConfigKeyBackgroundConnect),
HTTPHeaders: conf.GetObject(ffresty.HTTPConfigHeaders),
AuthUsername: conf.GetString(ffresty.HTTPConfigAuthUsername),
AuthPassword: conf.GetString(ffresty.HTTPConfigAuthPassword),
Expand Down
16 changes: 16 additions & 0 deletions pkg/wsclient/wsconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestWSConfigGeneration(t *testing.T) {
utConf.Set(WSConfigKeyWriteBufferSize, 1024)
utConf.Set(WSConfigKeyInitialConnectAttempts, 1)
utConf.Set(WSConfigKeyPath, "/websocket")
utConf.Set(WSConfigKeyBackgroundConnect, true)
utConf.Set(WSConfigKeyHeartbeatInterval, "42ms")

ctx := context.Background()
wsConfig, err := GenerateConfig(ctx, utConf)
Expand All @@ -48,6 +50,20 @@ func TestWSConfigGeneration(t *testing.T) {
assert.Equal(t, "custom value", wsConfig.HTTPHeaders.GetString("custom-header"))
assert.Equal(t, 1024, wsConfig.ReadBufferSize)
assert.Equal(t, 1024, wsConfig.WriteBufferSize)
assert.True(t, wsConfig.BackgroundConnect)
assert.Equal(t, 42*time.Millisecond, wsConfig.HeartbeatInterval)
}

func TestWSConfigGenerationDefaults(t *testing.T) {
resetConf()

ctx := context.Background()
wsConfig, err := GenerateConfig(ctx, utConf)
assert.NoError(t, err)

assert.Equal(t, defaultInitialConnectAttempts, wsConfig.InitialConnectAttempts)
assert.False(t, wsConfig.BackgroundConnect)
assert.Equal(t, 30*time.Second, wsConfig.HeartbeatInterval)
}

func TestWSConfigTLSGenerationFail(t *testing.T) {
Expand Down