Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio
opening the WebSocket connection. This function should return an object with the given headers.
The default implementation forwards the `cookie` header.

## `wsReconnect`

**Experimental.** (default: `disabled`)

Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections).
The connection is considered broken if the target does not respond to the ping messages or no data is received from the target.

The `wsReconnect` option contains the configuration for the WebSocket reconnection feature.
To enable the feature, set the `wsReconnect` option to an object with the following properties:

- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`).
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`).
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
- `logs`: Whether to log the reconnection process (default: `false`).

## wsHooks

- `onTargetRequest`: A hook function that is called when the request is received from the client `async onTargetRequest({ data, binary })` (default: `undefined`).
- `onTargetResponse`: A hook function that is called when the response is received from the target `async onTargetResponse({ data, binary })` (default: `undefined`).
- `onReconnect`: A hook function that is called when the connection is reconnected `async onReconnect(source, target)` (default: `undefined`).

## Benchmarks

The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM:
Expand Down
240 changes: 230 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict'
const { setTimeout: wait } = require('node:timers/promises')
const From = require('@fastify/reply-from')
const { ServerResponse } = require('node:http')
const WebSocket = require('ws')
const { convertUrlToWebSocket } = require('./utils')
const fp = require('fastify-plugin')
const qs = require('fast-querystring')
const { validateOptions } = require('./src/options')

const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']
const urlPattern = /^https?:\/\//
Expand All @@ -27,6 +29,7 @@ function liftErrorCode (code) {
}

function closeWebSocket (socket, code, reason) {
socket.isAlive = false
if (socket.readyState === WebSocket.OPEN) {
socket.close(liftErrorCode(code), reason)
}
Expand All @@ -40,19 +43,61 @@ function waitConnection (socket, write) {
}
}

function waitForConnection (target, timeout) {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
/* c8 ignore start */
reject(new Error('WebSocket connection timeout'))
/* c8 ignore stop */
}, timeout)

/* c8 ignore start */
if (target.readyState === WebSocket.OPEN) {
clearTimeout(timeoutId)
return resolve()
}
/* c8 ignore stop */

if (target.readyState === WebSocket.CONNECTING) {
target.once('open', () => {
clearTimeout(timeoutId)
resolve()
})
target.once('error', (err) => {
clearTimeout(timeoutId)
reject(err)
})
/* c8 ignore start */
} else {
clearTimeout(timeoutId)
reject(new Error('WebSocket is closed'))
}
/* c8 ignore stop */
})
}

function isExternalUrl (url) {
return urlPattern.test(url)
}

function noop () {}
function noop () { }

function proxyWebSockets (source, target) {
function proxyWebSockets (logger, source, target, hooks) {
function close (code, reason) {
closeWebSocket(source, code, reason)
closeWebSocket(target, code, reason)
}

source.on('message', (data, binary) => waitConnection(target, () => target.send(data, { binary })))
source.on('message', async (data, binary) => {
if (hooks.onTargetRequest) {
try {
await hooks.onTargetRequest({ data, binary })
} catch (err) {
logger.error({ err }, 'proxy ws error from onTargetRequest hook')
}
}
waitConnection(target, () => target.send(data, { binary }))
})
/* c8 ignore start */
source.on('ping', data => waitConnection(target, () => target.ping(data)))
source.on('pong', data => waitConnection(target, () => target.pong(data)))
Expand All @@ -64,7 +109,16 @@ function proxyWebSockets (source, target) {
/* c8 ignore stop */

// source WebSocket is already connected because it is created by ws server
target.on('message', (data, binary) => source.send(data, { binary }))
target.on('message', async (data, binary) => {
if (hooks.onTargetResponse) {
try {
await hooks.onTargetResponse({ data, binary })
} catch (err) {
logger.error({ err }, 'proxy ws error from onTargetResponse hook')
}
}
source.send(data, { binary })
})
/* c8 ignore start */
target.on('ping', data => source.ping(data))
/* c8 ignore stop */
Expand All @@ -76,6 +130,167 @@ function proxyWebSockets (source, target) {
/* c8 ignore stop */
}

async function reconnect (logger, source, reconnectOptions, hooks, targetParams) {
const { url, subprotocols, optionsWs } = targetParams

let attempts = 0
let target
do {
const reconnectWait = reconnectOptions.reconnectInterval * (reconnectOptions.reconnectDecay * attempts || 1)
reconnectOptions.logs && logger.warn({ target: targetParams.url }, `proxy ws reconnect in ${reconnectWait} ms`)
await wait(reconnectWait)

try {
target = new WebSocket(url, subprotocols, optionsWs)
await waitForConnection(target, reconnectOptions.connectionTimeout)
} catch (err) {
reconnectOptions.logs && logger.error({ target: targetParams.url, err, attempts }, 'proxy ws reconnect error')
attempts++
target = undefined
}
} while (!target && attempts < reconnectOptions.maxReconnectionRetries)

if (!target) {
logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect! No more retries')
return
}

reconnectOptions.logs && logger.info({ target: targetParams.url, attempts }, 'proxy ws reconnected')
if (hooks.onReconnect) {
try {
await hooks.onReconnect(source, target)
} catch (err) {
reconnectOptions.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onReconnect hook')
}
}
proxyWebSocketsWithReconnection(logger, source, target, reconnectOptions, hooks, targetParams)
}

function proxyWebSocketsWithReconnection (logger, source, target, options, hooks, targetParams) {
function close (code, reason) {
target.pingTimer && clearTimeout(source.pingTimer)
target.pingTimer = undefined

// reconnect target as long as the source connection is active
if (source.isAlive && (target.broken || options.reconnectOnClose)) {
// clean up the target and related source listeners
target.isAlive = false
target.removeAllListeners()
// need to specify the listeners to remove
removeSourceListeners(source)

reconnect(logger, source, options, hooks, targetParams)
return
}

options.logs && logger.info({ msg: 'proxy ws close link' })
closeWebSocket(source, code, reason)
closeWebSocket(target, code, reason)
}

function removeSourceListeners (source) {
source.off('message', sourceOnMessage)
source.off('ping', sourceOnPing)
source.off('pong', sourceOnPong)
source.off('close', sourceOnClose)
source.off('error', sourceOnError)
source.off('unexpected-response', sourceOnUnexpectedResponse)
}

/* c8 ignore start */
async function sourceOnMessage (data, binary) {
source.isAlive = true
if (hooks.onTargetRequest) {
try {
await hooks.onTargetRequest({ data, binary })
} catch (err) {
logger.error({ target: targetParams.url, err }, 'proxy ws error from onTargetRequest hook')
}
}
waitConnection(target, () => target.send(data, { binary }))
}
function sourceOnPing (data) {
waitConnection(target, () => target.ping(data))
}
function sourceOnPong (data) {
source.isAlive = true
waitConnection(target, () => target.pong(data))
}
function sourceOnClose (code, reason) {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close event')
close(code, reason)
}
function sourceOnError (error) {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error event')
close(1011, error.message)
}
function sourceOnUnexpectedResponse () {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event')
close(1011, 'unexpected response')
}
/* c8 ignore stop */

// source is alive since it is created by the proxy service
// the pinger is not set since we can't reconnect from here
source.isAlive = true
source.on('message', sourceOnMessage)
source.on('ping', sourceOnPing)
source.on('pong', sourceOnPong)
source.on('close', sourceOnClose)
source.on('error', sourceOnError)
source.on('unexpected-response', sourceOnUnexpectedResponse)

// source WebSocket is already connected because it is created by ws server
/* c8 ignore start */
target.on('message', async (data, binary) => {
target.isAlive = true
if (hooks.onTargetResponse) {
try {
await hooks.onTargetResponse({ data, binary })
} catch (err) {
logger.error({ target: targetParams.url, err }, 'proxy ws error from onTargetResponse hook')
}
}
source.send(data, { binary })
})
target.on('ping', data => {
target.isAlive = true
source.ping(data)
})
target.on('pong', data => {
target.isAlive = true
source.pong(data)
})
/* c8 ignore stop */
target.on('close', (code, reason) => {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event')
close(code, reason)
})
/* c8 ignore start */
target.on('error', error => {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event')
close(1011, error.message)
})
target.on('unexpected-response', () => {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event')
close(1011, 'unexpected response')
})
/* c8 ignore stop */

target.isAlive = true
target.pingTimer = setInterval(() => {
if (target.isAlive === false) {
target.broken = true
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws connection is broken')
target.pingTimer && clearInterval(target.pingTimer)
target.pingTimer = undefined
return target.terminate()
}
target.isAlive = false
target.ping()
}, options.pingInterval).unref()
}

function handleUpgrade (fastify, rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand All @@ -91,7 +306,7 @@ function handleUpgrade (fastify, rawRequest, socket, head) {
}

class WebSocketProxy {
constructor (fastify, { wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
constructor (fastify, { wsReconnect, wsHooks, wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
this.logger = fastify.log
this.wsClientOptions = {
rewriteRequestHeaders: defaultWsHeadersRewrite,
Expand All @@ -101,7 +316,8 @@ class WebSocketProxy {
this.upstream = upstream ? convertUrlToWebSocket(upstream) : ''
this.wsUpstream = wsUpstream ? convertUrlToWebSocket(wsUpstream) : ''
this.getUpstream = getUpstream

this.wsReconnect = wsReconnect
this.wsHooks = wsHooks
const wss = new WebSocket.Server({
noServer: true,
...wsServerOptions
Expand Down Expand Up @@ -190,7 +406,13 @@ class WebSocketProxy {

const target = new WebSocket(url, subprotocols, optionsWs)
this.logger.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)

if (this.wsReconnect) {
const targetParams = { url, subprotocols, optionsWs }
proxyWebSocketsWithReconnection(this.logger, source, target, this.wsReconnect, this.wsHooks, targetParams)
} else {
proxyWebSockets(this.logger, source, target, this.wsHooks)
}
}
}

Expand Down Expand Up @@ -228,9 +450,7 @@ function generateRewritePrefix (prefix, opts) {
}

async function fastifyHttpProxy (fastify, opts) {
if (!opts.upstream && !opts.websocket && !((opts.upstream === '' || opts.wsUpstream === '') && opts.replyOptions && typeof opts.replyOptions.getUpstream === 'function')) {
throw new Error('upstream must be specified')
}
opts = validateOptions(opts)

const preHandler = opts.preHandler || opts.beforeHandler
const rewritePrefix = generateRewritePrefix(fastify.prefix, opts)
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
"http-errors": "^2.0.0",
"http-proxy": "^1.18.1",
"neostandard": "^0.12.0",
"pino": "^9.6.0",
"pino-test": "^1.1.0",
"simple-get": "^4.0.1",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5",
Expand Down
Loading
Loading