Skip to content

Commit 8eb7508

Browse files
authored
Merge pull request #180 from hyperledger/ws-raw
Add support for wrapping an existing WS connection
2 parents f8f957c + d2b1e1a commit 8eb7508

File tree

3 files changed

+110
-8
lines changed

3 files changed

+110
-8
lines changed

pkg/wsclient/wsclient.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type WSConfig struct {
4545
WriteBufferSize int `json:"writeBufferSize,omitempty"`
4646
InitialDelay time.Duration `json:"initialDelay,omitempty"`
4747
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
48+
DelayFactor float64 `json:"delayFactor,omitempty"`
4849
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
4950
DisableReconnect bool `json:"disableReconnect"`
5051
AuthUsername string `json:"authUsername,omitempty"`
@@ -59,6 +60,14 @@ type WSConfig struct {
5960
ReceiveExt bool
6061
}
6162

63+
type WSWrapConfig struct {
64+
HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"`
65+
ThrottleRequestsPerSecond int `json:"requestsPerSecond,omitempty"`
66+
ThrottleBurst int `json:"burst,omitempty"`
67+
// This one cannot be set in JSON - must be configured on the code interface
68+
ReceiveExt bool
69+
}
70+
6271
// WSPayload allows API consumers of this package to stream data, and inspect the message
6372
// type, rather than just being passed the bytes directly.
6473
type WSPayload struct {
@@ -98,7 +107,7 @@ type wsClient struct {
98107
initialRetryAttempts int
99108
wsdialer *websocket.Dialer
100109
wsconn *websocket.Conn
101-
retry retry.Retry
110+
connRetry retry.Retry
102111
closed bool
103112
useReceiveExt bool
104113
receive chan []byte
@@ -122,6 +131,7 @@ type WSPreConnectHandler func(ctx context.Context, w WSClient) error
122131
// WSPostConnectHandler will be called after every connect/reconnect. Can send data over ws, but must not block listening for data on the ws.
123132
type WSPostConnectHandler func(ctx context.Context, w WSClient) error
124133

134+
// Creates a new outbound client that can be connected to a remote server
125135
func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandler, afterConnect WSPostConnectHandler) (WSClient, error) {
126136
l := log.L(ctx)
127137
wsURL, err := buildWSUrl(ctx, config)
@@ -138,9 +148,10 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
138148
TLSClientConfig: config.TLSClientConfig,
139149
HandshakeTimeout: config.ConnectionTimeout,
140150
},
141-
retry: retry.Retry{
151+
connRetry: retry.Retry{
142152
InitialDelay: config.InitialDelay,
143153
MaximumDelay: config.MaximumDelay,
154+
Factor: config.DelayFactor,
144155
},
145156
initialRetryAttempts: config.InitialConnectAttempts,
146157
headers: make(http.Header),
@@ -153,11 +164,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
153164
disableReconnect: config.DisableReconnect,
154165
rateLimiter: ffresty.GetRateLimiter(config.ThrottleRequestsPerSecond, config.ThrottleBurst),
155166
}
156-
if w.useReceiveExt {
157-
w.receiveExt = make(chan *WSPayload)
158-
} else {
159-
w.receive = make(chan []byte)
160-
}
167+
w.setupReceiveChannel()
161168
for k, v := range config.HTTPHeaders {
162169
if vs, ok := v.(string); ok {
163170
w.headers.Set(k, vs)
@@ -182,6 +189,40 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
182189
return w, nil
183190
}
184191

192+
// Wrap an existing connection (including an inbound server connection) with heartbeating and throttling.
193+
// No reconnect functions are supported when wrapping an existing connection like this, but the supplied
194+
// callback will be invoked when the connection closes (allowing cleanup/tracking).
195+
func Wrap(ctx context.Context, config WSWrapConfig, wsconn *websocket.Conn, onClose func()) WSClient {
196+
w := &wsClient{
197+
ctx: ctx,
198+
url: wsconn.LocalAddr().String(),
199+
wsconn: wsconn,
200+
disableReconnect: true,
201+
heartbeatInterval: config.HeartbeatInterval,
202+
rateLimiter: ffresty.GetRateLimiter(config.ThrottleRequestsPerSecond, config.ThrottleBurst),
203+
useReceiveExt: config.ReceiveExt,
204+
send: make(chan []byte),
205+
closing: make(chan struct{}),
206+
}
207+
w.setupReceiveChannel()
208+
w.pongReceivedOrReset(false)
209+
w.wsconn.SetPongHandler(w.pongHandler)
210+
log.L(ctx).Infof("WS %s wrapped", w.url)
211+
go func() {
212+
w.receiveReconnectLoop()
213+
onClose()
214+
}()
215+
return w
216+
}
217+
218+
func (w *wsClient) setupReceiveChannel() {
219+
if w.useReceiveExt {
220+
w.receiveExt = make(chan *WSPayload)
221+
} else {
222+
w.receive = make(chan []byte)
223+
}
224+
}
225+
185226
func (w *wsClient) Connect() error {
186227

187228
if err := w.connect(true); err != nil {
@@ -291,7 +332,7 @@ func buildWSUrl(ctx context.Context, config *WSConfig) (string, error) {
291332

292333
func (w *wsClient) connect(initial bool) error {
293334
l := log.L(w.ctx)
294-
return w.retry.DoCustomLog(w.ctx, func(attempt int) (retry bool, err error) {
335+
return w.connRetry.DoCustomLog(w.ctx, func(attempt int) (retry bool, err error) {
295336
if w.closed {
296337
return false, i18n.NewError(w.ctx, i18n.MsgWSClosing)
297338
}
@@ -436,6 +477,7 @@ func (w *wsClient) sendLoop(receiverDone chan struct{}) {
436477
l.Errorf("WS %s closing: %s", w.url, err)
437478
disconnecting = true
438479
} else if wsconn != nil {
480+
l.Debugf("WS %s send heartbeat ping", w.url)
439481
if err := wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
440482
l.Errorf("WS %s heartbeat send failed: %s", w.url, err)
441483
disconnecting = true

pkg/wsclient/wsclient_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import (
3030
"time"
3131

3232
"github.com/gorilla/websocket"
33+
"github.com/sirupsen/logrus"
3334
"github.com/stretchr/testify/assert"
35+
"github.com/stretchr/testify/require"
3436
"golang.org/x/time/rate"
3537
)
3638

@@ -838,3 +840,51 @@ func TestRateLimiterFailure(t *testing.T) {
838840
// Close the client
839841
wsc.Close()
840842
}
843+
844+
func TestWSWrap(t *testing.T) {
845+
ctx := context.Background()
846+
logrus.SetLevel(logrus.DebugLevel)
847+
848+
passWS := make(chan (*websocket.Conn))
849+
svr := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
850+
upgrader := &websocket.Upgrader{WriteBufferSize: 1024, ReadBufferSize: 1024}
851+
ws, err := upgrader.Upgrade(res, req, http.Header{})
852+
require.NoError(t, err)
853+
passWS <- ws
854+
}))
855+
defer svr.Close()
856+
857+
clientDone := make(chan struct{})
858+
go func() {
859+
defer close(clientDone)
860+
861+
wsc, err := New(ctx, &WSConfig{HTTPURL: svr.URL}, nil, nil)
862+
require.NoError(t, err)
863+
err = wsc.Connect()
864+
require.NoError(t, err)
865+
866+
wsc.Send(ctx, []byte(`hello`))
867+
msg1 := <-wsc.Receive()
868+
require.Equal(t, `hi`, string(msg1))
869+
870+
wsc.Close()
871+
872+
}()
873+
874+
// Get the conn
875+
rawWSC := <-passWS
876+
877+
// Wrap it
878+
serverDone := make(chan struct{})
879+
wsc := Wrap(ctx, WSWrapConfig{}, rawWSC, func() {
880+
close(serverDone)
881+
})
882+
883+
msg1 := <-wsc.Receive()
884+
require.Equal(t, `hello`, string(msg1))
885+
err := wsc.Send(ctx, []byte(`hi`))
886+
require.NoError(t, err)
887+
888+
<-clientDone
889+
<-serverDone
890+
}

pkg/wsclient/wsconfig.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
defaultBufferSize = "16Kb"
3131
defaultHeartbeatInterval = "30s" // up to a minute to detect a dead connection
3232
defaultConnectionTimeout = 45 * time.Second // 45 seconds - the built in default for gorilla/websocket
33+
defaultRetryBackoffFactor = 2.0
3334
)
3435

3536
const (
@@ -49,6 +50,8 @@ const (
4950
WSConfigKeyHeartbeatInterval = "ws.heartbeatInterval"
5051
// WSConnectionTimeout is the amount of time to wait while attempting to establish a connection (or automatic reconnection)
5152
WSConfigKeyConnectionTimeout = "ws.connectionTimeout"
53+
// WSConfigDelayFactor the exponential backoff factor for delay
54+
WSConfigDelayFactor = "retry.factor"
5255
)
5356

5457
// InitConfig ensures the config is initialized for HTTP too, as WS and HTTP
@@ -62,6 +65,12 @@ func InitConfig(conf config.Section) {
6265
conf.AddKnownKey(WSConfigURL)
6366
conf.AddKnownKey(WSConfigKeyHeartbeatInterval, defaultHeartbeatInterval)
6467
conf.AddKnownKey(WSConfigKeyConnectionTimeout, defaultConnectionTimeout)
68+
conf.AddKnownKey(WSConfigDelayFactor, defaultRetryBackoffFactor)
69+
InitConfigWrap(conf)
70+
}
71+
72+
func InitConfigWrap(conf config.Section) {
73+
conf.AddKnownKey(WSConfigKeyHeartbeatInterval, defaultHeartbeatInterval)
6574
}
6675

6776
func GenerateConfig(ctx context.Context, conf config.Section) (*WSConfig, error) {
@@ -73,6 +82,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*WSConfig, error)
7382
WriteBufferSize: int(conf.GetByteSize(WSConfigKeyWriteBufferSize)),
7483
InitialDelay: conf.GetDuration(ffresty.HTTPConfigRetryInitDelay),
7584
MaximumDelay: conf.GetDuration(ffresty.HTTPConfigRetryMaxDelay),
85+
DelayFactor: conf.GetFloat64(WSConfigDelayFactor),
7686
InitialConnectAttempts: conf.GetInt(WSConfigKeyInitialConnectAttempts),
7787
HTTPHeaders: conf.GetObject(ffresty.HTTPConfigHeaders),
7888
AuthUsername: conf.GetString(ffresty.HTTPConfigAuthUsername),

0 commit comments

Comments
 (0)