Skip to content

Commit ac1225a

Browse files
feat: websocket reconnection
1 parent e13f2fd commit ac1225a

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

index.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@ function waitConnection (socket, write) {
4646
function waitForConnection (target, timeout) {
4747
return new Promise((resolve, reject) => {
4848
const timeoutId = setTimeout(() => {
49+
/* c8 ignore start */
4950
reject(new Error('WebSocket connection timeout'))
51+
/* c8 ignore stop */
5052
}, timeout)
5153

54+
/* c8 ignore start */
5255
if (target.readyState === WebSocket.OPEN) {
5356
clearTimeout(timeoutId)
5457
return resolve()
5558
}
59+
/* c8 ignore stop */
5660

5761
if (target.readyState === WebSocket.CONNECTING) {
5862
target.once('open', () => {
@@ -63,10 +67,12 @@ function waitForConnection (target, timeout) {
6367
clearTimeout(timeoutId)
6468
reject(err)
6569
})
70+
/* c8 ignore start */
6671
} else {
6772
clearTimeout(timeoutId)
6873
reject(new Error('WebSocket is closed'))
6974
}
75+
/* c8 ignore stop */
7076
})
7177
}
7278

@@ -113,6 +119,7 @@ async function reconnect (logger, source, wsReconnectOptions, targetParams) {
113119
let target
114120
do {
115121
const reconnectWait = wsReconnectOptions.reconnectInterval * (wsReconnectOptions.reconnectDecay * attempts || 1)
122+
wsReconnectOptions.logs && logger.warn({ target: targetParams.url }, `proxy ws reconnect in ${reconnectWait} ms`)
116123
await wait(reconnectWait)
117124

118125
try {
@@ -163,6 +170,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
163170
source.off('unexpected-response', sourceOnUnexpectedResponse)
164171
}
165172

173+
/* c8 ignore start */
166174
function sourceOnMessage (data, binary) {
167175
source.isAlive = true
168176
waitConnection(target, () => target.send(data, { binary }))
@@ -186,6 +194,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
186194
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event')
187195
close(1011, 'unexpected response')
188196
}
197+
/* c8 ignore stop */
189198

190199
// source is alive since it is created by the proxy service
191200
// the pinger is not set since we can't reconnect from here
@@ -198,6 +207,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
198207
source.on('unexpected-response', sourceOnUnexpectedResponse)
199208

200209
// source WebSocket is already connected because it is created by ws server
210+
/* c8 ignore start */
201211
target.on('message', (data, binary) => {
202212
target.isAlive = true
203213
source.send(data, { binary })
@@ -210,11 +220,12 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
210220
target.isAlive = true
211221
source.pong(data)
212222
})
223+
/* c8 ignore stop */
213224
target.on('close', (code, reason) => {
214225
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event')
215226
close(code, reason)
216227
})
217-
228+
/* c8 ignore start */
218229
target.on('error', error => {
219230
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event')
220231
close(1011, error.message)
@@ -223,6 +234,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
223234
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event')
224235
close(1011, 'unexpected response')
225236
})
237+
/* c8 ignore stop */
226238

227239
target.isAlive = true
228240
target.pingTimer = setInterval(() => {

test/ws-reconnect.js

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ function waitForLogMessage (loggerSpy, message, max = 100) {
1616
return new Promise((resolve, reject) => {
1717
let count = 0
1818
const fn = (received) => {
19-
console.log(received)
20-
2119
if (received.msg === message) {
2220
loggerSpy.off('data', fn)
2321
resolve()
@@ -32,17 +30,27 @@ function waitForLogMessage (loggerSpy, message, max = 100) {
3230
})
3331
}
3432

35-
async function createServices ({ t, upstream, wsReconnectOptions, wsTargetOptions, wsServerOptions }) {
33+
async function createTargetServer (t, wsTargetOptions, port = 0) {
3634
const targetServer = createServer()
3735
const targetWs = new WebSocket.Server({ server: targetServer, ...wsTargetOptions })
36+
await promisify(targetServer.listen.bind(targetServer))({ port, host: '127.0.0.1' })
37+
38+
t.after(() => {
39+
targetWs.close()
40+
targetServer.close()
41+
})
42+
43+
return { targetServer, targetWs }
44+
}
3845

39-
await promisify(targetServer.listen.bind(targetServer))({ port: 0, host: '127.0.0.1' })
46+
async function createServices ({ t, wsReconnectOptions, wsTargetOptions, wsServerOptions, targetPort = 0 }) {
47+
const { targetServer, targetWs } = await createTargetServer(t, wsTargetOptions, targetPort)
4048

4149
const loggerSpy = pinoTest.sink()
4250
const logger = pino(loggerSpy)
4351
const proxy = Fastify({ loggerInstance: logger })
4452
proxy.register(proxyPlugin, {
45-
upstream: upstream || `ws://127.0.0.1:${targetServer.address().port}`,
53+
upstream: `ws://127.0.0.1:${targetServer.address().port}`,
4654
websocket: true,
4755
wsReconnect: wsReconnectOptions,
4856
wsServerOptions
@@ -55,8 +63,6 @@ async function createServices ({ t, upstream, wsReconnectOptions, wsTargetOption
5563

5664
t.after(async () => {
5765
client.close()
58-
targetWs.close()
59-
targetServer.close()
6066
await proxy.close()
6167
})
6268

@@ -67,8 +73,7 @@ async function createServices ({ t, upstream, wsReconnectOptions, wsTargetOption
6773
},
6874
proxy,
6975
client,
70-
loggerSpy,
71-
upstream
76+
loggerSpy
7277
}
7378
}
7479

@@ -94,13 +99,13 @@ test('should reconnect on broken connection', async (t) => {
9499

95100
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
96101

97-
const breakConnection = true
102+
let breakConnection = true
98103
target.ws.on('connection', async (socket) => {
99104
socket.on('ping', async () => {
100105
// add latency to break the connection once
101106
if (breakConnection) {
102107
await wait(wsReconnectOptions.pingInterval * 2)
103-
// breakConnection = false
108+
breakConnection = false
104109
}
105110
socket.pong()
106111
})
@@ -157,12 +162,11 @@ test('should reconnect on regular target connection close', async (t) => {
157162
await waitForLogMessage(loggerSpy, 'proxy ws close link')
158163
})
159164

160-
/*
161-
TODO fix
162-
test('should reconnect with retry', async (t) => {
163-
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, reconnectOnClose: true }
165+
test('should reconnect retrying after a few failures', async (t) => {
166+
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, reconnectDecay: 2, logs: true, maxReconnectionRetries: Infinity }
164167

165-
const { target, loggerSpy, upstream } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
168+
const wsTargetOptions = { autoPong: false }
169+
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions })
166170

167171
let breakConnection = true
168172

@@ -179,14 +183,17 @@ test('should reconnect with retry', async (t) => {
179183

180184
await waitForLogMessage(loggerSpy, 'proxy ws connection is broken')
181185

182-
// recreate a new target with the same upstream
183-
186+
// recreate a new target
187+
const targetPort = target.server.address().port
184188
target.ws.close()
185189
target.server.close()
186-
await createServices({ t, upstream, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
187190

188191
await waitForLogMessage(loggerSpy, 'proxy ws target close event')
192+
// make reconnection fail 2 times
189193
await waitForLogMessage(loggerSpy, 'proxy ws reconnect error')
194+
await waitForLogMessage(loggerSpy, 'proxy ws reconnect in 200 ms')
195+
196+
// recreate the target
197+
await createTargetServer(t, { autoPong: true }, targetPort)
190198
await waitForLogMessage(loggerSpy, 'proxy ws reconnected')
191199
})
192-
*/

0 commit comments

Comments
 (0)