Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,25 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio
opening the WebSocket connection. This function should return an object with the given headers.
The default implementation forwards the `cookie` header.

## `wsReconnect`

**Experimental.** (default: `disabled`)

Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections).
The connection is considered broken if the target does not respond to the ping messages or no data is received from the target.

The `wsReconnect` option contains the configuration for the WebSocket reconnection feature.
To enable the feature, set the `wsReconnect` option to an object with the following properties:

- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`).
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`).
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
- `logs`: Whether to log the reconnection process (default: `false`).
- `onReconnect`: A hook function that is called when the connection is reconnected `async onReconnect(oldSocket, newSocket)` (default: `undefined`).

## Benchmarks

The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM:
Expand Down
194 changes: 188 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict'
const { setTimeout: wait } = require('node:timers/promises')
const From = require('@fastify/reply-from')
const { ServerResponse } = require('node:http')
const WebSocket = require('ws')
const { convertUrlToWebSocket } = require('./utils')
const fp = require('fastify-plugin')
const qs = require('fast-querystring')
const { validateOptions } = require('./src/options')

const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']
const urlPattern = /^https?:\/\//
Expand All @@ -27,6 +29,7 @@ function liftErrorCode (code) {
}

function closeWebSocket (socket, code, reason) {
socket.isAlive = false
if (socket.readyState === WebSocket.OPEN) {
socket.close(liftErrorCode(code), reason)
}
Expand All @@ -40,11 +43,44 @@ function waitConnection (socket, write) {
}
}

function waitForConnection (target, timeout) {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
/* c8 ignore start */
reject(new Error('WebSocket connection timeout'))
/* c8 ignore stop */
}, timeout)

/* c8 ignore start */
if (target.readyState === WebSocket.OPEN) {
clearTimeout(timeoutId)
return resolve()
}
/* c8 ignore stop */

if (target.readyState === WebSocket.CONNECTING) {
target.once('open', () => {
clearTimeout(timeoutId)
resolve()
})
target.once('error', (err) => {
clearTimeout(timeoutId)
reject(err)
})
/* c8 ignore start */
} else {
clearTimeout(timeoutId)
reject(new Error('WebSocket is closed'))
}
/* c8 ignore stop */
})
}

function isExternalUrl (url) {
return urlPattern.test(url)
}

function noop () {}
function noop () { }

function proxyWebSockets (source, target) {
function close (code, reason) {
Expand Down Expand Up @@ -76,6 +112,147 @@ function proxyWebSockets (source, target) {
/* c8 ignore stop */
}

async function reconnect (logger, source, wsReconnectOptions, oldTarget, targetParams) {
const { url, subprotocols, optionsWs } = targetParams

let attempts = 0
let target
do {
const reconnectWait = wsReconnectOptions.reconnectInterval * (wsReconnectOptions.reconnectDecay * attempts || 1)
wsReconnectOptions.logs && logger.warn({ target: targetParams.url }, `proxy ws reconnect in ${reconnectWait} ms`)
await wait(reconnectWait)

try {
target = new WebSocket(url, subprotocols, optionsWs)
await waitForConnection(target, wsReconnectOptions.connectionTimeout)
} catch (err) {
wsReconnectOptions.logs && logger.error({ target: targetParams.url, err, attempts }, 'proxy ws reconnect error')
attempts++
target = undefined
}
} while (!target && attempts < wsReconnectOptions.maxReconnectionRetries)

if (!target) {
logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect! No more retries')
return
}

wsReconnectOptions.logs && logger.info({ target: targetParams.url, attempts }, 'proxy ws reconnected')
await wsReconnectOptions.onReconnect(oldTarget, target)
proxyWebSocketsWithReconnection(logger, source, target, wsReconnectOptions, targetParams)
}

function proxyWebSocketsWithReconnection (logger, source, target, options, targetParams) {
function close (code, reason) {
target.pingTimer && clearTimeout(source.pingTimer)
target.pingTimer = undefined

// reconnect target as long as the source connection is active
if (source.isAlive && (target.broken || options.reconnectOnClose)) {
// clean up the target and related source listeners
target.isAlive = false
target.removeAllListeners()
// need to specify the listeners to remove
removeSourceListeners(source)

reconnect(logger, source, options, target, targetParams)
return
}

options.logs && logger.info({ msg: 'proxy ws close link' })
closeWebSocket(source, code, reason)
closeWebSocket(target, code, reason)
}

function removeSourceListeners (source) {
source.off('message', sourceOnMessage)
source.off('ping', sourceOnPing)
source.off('pong', sourceOnPong)
source.off('close', sourceOnClose)
source.off('error', sourceOnError)
source.off('unexpected-response', sourceOnUnexpectedResponse)
}

/* c8 ignore start */
function sourceOnMessage (data, binary) {
source.isAlive = true
waitConnection(target, () => target.send(data, { binary }))
}
function sourceOnPing (data) {
waitConnection(target, () => target.ping(data))
}
function sourceOnPong (data) {
source.isAlive = true
waitConnection(target, () => target.pong(data))
}
function sourceOnClose (code, reason) {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close event')
close(code, reason)
}
function sourceOnError (error) {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error event')
close(1011, error.message)
}
function sourceOnUnexpectedResponse () {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event')
close(1011, 'unexpected response')
}
/* c8 ignore stop */

// source is alive since it is created by the proxy service
// the pinger is not set since we can't reconnect from here
source.isAlive = true
source.on('message', sourceOnMessage)
source.on('ping', sourceOnPing)
source.on('pong', sourceOnPong)
source.on('close', sourceOnClose)
source.on('error', sourceOnError)
source.on('unexpected-response', sourceOnUnexpectedResponse)

// source WebSocket is already connected because it is created by ws server
/* c8 ignore start */
target.on('message', (data, binary) => {
target.isAlive = true
source.send(data, { binary })
})
target.on('ping', data => {
target.isAlive = true
source.ping(data)
})
target.on('pong', data => {
target.isAlive = true
source.pong(data)
})
/* c8 ignore stop */
target.on('close', (code, reason) => {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event')
close(code, reason)
})
/* c8 ignore start */
target.on('error', error => {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event')
close(1011, error.message)
})
target.on('unexpected-response', () => {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event')
close(1011, 'unexpected response')
})
/* c8 ignore stop */

target.isAlive = true
target.pingTimer = setInterval(() => {
if (target.isAlive === false) {
target.broken = true
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws connection is broken')
target.pingTimer && clearInterval(target.pingTimer)
target.pingTimer = undefined
return target.terminate()
}
target.isAlive = false
target.ping()
}, options.pingInterval).unref()
}

function handleUpgrade (fastify, rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand All @@ -91,7 +268,7 @@ function handleUpgrade (fastify, rawRequest, socket, head) {
}

class WebSocketProxy {
constructor (fastify, { wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
constructor (fastify, { wsReconnect, wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
this.logger = fastify.log
this.wsClientOptions = {
rewriteRequestHeaders: defaultWsHeadersRewrite,
Expand All @@ -101,6 +278,7 @@ class WebSocketProxy {
this.upstream = upstream ? convertUrlToWebSocket(upstream) : ''
this.wsUpstream = wsUpstream ? convertUrlToWebSocket(wsUpstream) : ''
this.getUpstream = getUpstream
this.wsReconnect = wsReconnect

const wss = new WebSocket.Server({
noServer: true,
Expand Down Expand Up @@ -190,7 +368,13 @@ class WebSocketProxy {

const target = new WebSocket(url, subprotocols, optionsWs)
this.logger.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)

if (this.wsReconnect) {
const targetParams = { url, subprotocols, optionsWs }
proxyWebSocketsWithReconnection(this.logger, source, target, this.wsReconnect, targetParams)
} else {
proxyWebSockets(source, target)
}
}
}

Expand Down Expand Up @@ -228,9 +412,7 @@ function generateRewritePrefix (prefix, opts) {
}

async function fastifyHttpProxy (fastify, opts) {
if (!opts.upstream && !opts.websocket && !((opts.upstream === '' || opts.wsUpstream === '') && opts.replyOptions && typeof opts.replyOptions.getUpstream === 'function')) {
throw new Error('upstream must be specified')
}
opts = validateOptions(opts)

const preHandler = opts.preHandler || opts.beforeHandler
const rewritePrefix = generateRewritePrefix(fastify.prefix, opts)
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
"http-errors": "^2.0.0",
"http-proxy": "^1.18.1",
"neostandard": "^0.12.0",
"pino": "^9.6.0",
"pino-test": "^1.1.0",
"simple-get": "^4.0.1",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5",
Expand Down
76 changes: 76 additions & 0 deletions src/options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict'

const DEFAULT_PING_INTERVAL = 30_000
const DEFAULT_MAX_RECONNECTION_RETRIES = Infinity
Copy link
Member

@climba03003 climba03003 Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max retries should be limited by default.
There is no reason to reconnected unlimited since in reconnecting period the client may continue sending data. The data will buffered in stream unlimited.

Copy link
Contributor Author

@simone-sanfratello simone-sanfratello Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is to keep the connection alive, assuming the error messages are being monitored, that's why I set Infinity

We can remove the default value as Infinity, but in that case we can't set an arbitrary default value, it really depends by the context, ping interval, reconnection delay and so on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Those should be limited, possibly to 30 seconds.

const DEFAULT_RECONNECT_INTERVAL = 1_000
const DEFAULT_RECONNECT_DECAY = 1.5
const DEFAULT_CONNECTION_TIMEOUT = 5_000
const DEFAULT_RECONNECT_ON_CLOSE = false
const DEFAULT_LOGS = false
const DEFAULT_ON_RECONNECT = noop

function noop () {}

function validateOptions (options) {
if (!options.upstream && !options.websocket && !((options.upstream === '' || options.wsUpstream === '') && options.replyOptions && typeof options.replyOptions.getUpstream === 'function')) {
throw new Error('upstream must be specified')
}

if (options.wsReconnect) {
const wsReconnect = options.wsReconnect

if (wsReconnect.pingInterval !== undefined && (typeof wsReconnect.pingInterval !== 'number' || wsReconnect.pingInterval < 0)) {
throw new Error('wsReconnect.pingInterval must be a non-negative number')
}
wsReconnect.pingInterval = wsReconnect.pingInterval ?? DEFAULT_PING_INTERVAL

if (wsReconnect.maxReconnectionRetries !== undefined && (typeof wsReconnect.maxReconnectionRetries !== 'number' || wsReconnect.maxReconnectionRetries < 1)) {
throw new Error('wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1')
}
wsReconnect.maxReconnectionRetries = wsReconnect.maxReconnectionRetries ?? DEFAULT_MAX_RECONNECTION_RETRIES

if (wsReconnect.reconnectInterval !== undefined && (typeof wsReconnect.reconnectInterval !== 'number' || wsReconnect.reconnectInterval < 100)) {
throw new Error('wsReconnect.reconnectInterval (ms) must be a number greater than or equal to 100')
}
wsReconnect.reconnectInterval = wsReconnect.reconnectInterval ?? DEFAULT_RECONNECT_INTERVAL

if (wsReconnect.reconnectDecay !== undefined && (typeof wsReconnect.reconnectDecay !== 'number' || wsReconnect.reconnectDecay < 1)) {
throw new Error('wsReconnect.reconnectDecay must be a number greater than or equal to 1')
}
wsReconnect.reconnectDecay = wsReconnect.reconnectDecay ?? DEFAULT_RECONNECT_DECAY

if (wsReconnect.connectionTimeout !== undefined && (typeof wsReconnect.connectionTimeout !== 'number' || wsReconnect.connectionTimeout < 0)) {
throw new Error('wsReconnect.connectionTimeout must be a non-negative number')
}
wsReconnect.connectionTimeout = wsReconnect.connectionTimeout ?? DEFAULT_CONNECTION_TIMEOUT

if (wsReconnect.reconnectOnClose !== undefined && typeof wsReconnect.reconnectOnClose !== 'boolean') {
throw new Error('wsReconnect.reconnectOnClose must be a boolean')
}
wsReconnect.reconnectOnClose = wsReconnect.reconnectOnClose ?? DEFAULT_RECONNECT_ON_CLOSE

if (wsReconnect.logs !== undefined && typeof wsReconnect.logs !== 'boolean') {
throw new Error('wsReconnect.logs must be a boolean')
}
wsReconnect.logs = wsReconnect.logs ?? DEFAULT_LOGS

if (wsReconnect.onReconnect !== undefined && typeof wsReconnect.onReconnect !== 'function') {
throw new Error('wsReconnect.onReconnect must be a function')
}
wsReconnect.onReconnect = wsReconnect.onReconnect ?? DEFAULT_ON_RECONNECT
}

return options
}

module.exports = {
validateOptions,
DEFAULT_PING_INTERVAL,
DEFAULT_MAX_RECONNECTION_RETRIES,
DEFAULT_RECONNECT_INTERVAL,
DEFAULT_RECONNECT_DECAY,
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_RECONNECT_ON_CLOSE,
DEFAULT_LOGS,
DEFAULT_ON_RECONNECT
}
Loading
Loading