Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/rich-fans-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-rsocket-router': minor
'@powersync/service-core': minor
'@powersync/service-image': minor
'@powersync/service-types': patch
---

Enable permessage-deflate for websockets.
37 changes: 33 additions & 4 deletions packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import * as http from 'http';
import { Payload, RSocketServer } from 'rsocket-core';
import * as ws from 'ws';
import { SocketRouterObserver } from './SocketRouterListener.js';
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
import {
CommonParams,
IReactiveStream,
IReactiveStreamInput,
RS_ENDPOINT_TYPE,
ReactiveSocketRouterOptions,
RS_ENDPOINT_TYPE,
SocketResponder
} from './types.js';

Expand All @@ -24,6 +25,7 @@ export interface ReactiveStreamRequest {
dataMimeType: string;
initialN: number;
responder: SocketResponder;
connection: WebsocketDuplexConnection;
}

export interface SocketBaseContext {
Expand Down Expand Up @@ -51,7 +53,30 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
* This follows a similar pattern to the Journey Micro
* web sockets router.
*/
const wss = new ws.WebSocketServer({ noServer: true });
const wss = new ws.WebSocketServer({
noServer: true,
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 128 * 1024, // default is 16kb - increased for better efficiency
memLevel: 7, // default is 8
level: 3
},
zlibInflateOptions: {
// for decompressing messages from the client
chunkSize: 32 * 1024
},
// don't keep client context between messages
clientNoContextTakeover: true,
// keep context between messages from the server
serverNoContextTakeover: false,
// bigger window uses more memory and potentially more cpu. 10-15 is a good range.
serverMaxWindowBits: 12,
// Limit concurrent compression threads
concurrencyLimit: 8,
// Size (in bytes) below which messages should not be compressed _if context takeover is disabled_.
threshold: 1024
}
});
server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket as any, head, (ws) => {
wss.emit('connection', ws, request);
Expand All @@ -66,7 +91,9 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
const rSocketServer = new RSocketServer({
transport,
acceptor: {
accept: async (payload) => {
accept: async (payload, rsocket) => {
const connection = (rsocket as any).connection as WebsocketDuplexConnection;

const { max_concurrent_connections } = this.options ?? {};
logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`);
// wss.clients.size includes this connection, so we check for greater than
Expand Down Expand Up @@ -104,7 +131,7 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
// TODO: Consider limiting the number of active streams per connection to prevent abuse
handleReactiveStream(
context,
{ payload, initialN, responder, dataMimeType, metadataMimeType },
{ payload, initialN, responder, dataMimeType, metadataMimeType, connection },
observer,
abortController,
params
Expand Down Expand Up @@ -191,6 +218,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
context,
observer,
signal: abortController.signal,
connection: request.connection,
responder
});
if (!isAuthorized.authorized) {
Expand All @@ -207,6 +235,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
observer,
signal: abortController.signal,
responder,
connection: request.connection,
initialN
});
} catch (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

import { logger } from '@powersync/lib-services-framework';
import { Socket } from 'net';
import {
Closeable,
Deferred,
Expand All @@ -33,6 +34,7 @@ import WebSocket from 'ws';

export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;
readonly tracker: WebSocketTracker;

constructor(
private websocketDuplex: Duplex,
Expand All @@ -50,6 +52,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
websocketDuplex.on('data', this.handleMessage);

this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
this.tracker = new WebSocketTracker(this.rawSocket);
}

get availability(): number {
Expand Down Expand Up @@ -97,7 +100,9 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
};

private handleError = (e: WebSocket.ErrorEvent): void => {
logger.error(`Error in WebSocket duplex connection: ${e}`);
// Example:
// Error: The socket was closed while data was being compressed
logger.warn(`Error in WebSocket duplex connection: ${e}`);
if (!this.done) {
this.close(e.error);
}
Expand Down Expand Up @@ -149,3 +154,33 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
});
}
}

/**
* Tracks encoding and bytes written on a websocket connection, catering for compressed data.
*/
export class WebSocketTracker {
private lastBytesWritten: number;
private socket: Socket;
readonly encoding: 'permessage-deflate' | undefined;

constructor(ws: WebSocket) {
this.socket = (ws as any)._socket;
this.lastBytesWritten = this.socket.bytesWritten;

// Crude check, but this is the only extension that would actually be used
if (ws.extensions.includes('permessage-deflate')) {
this.encoding = 'permessage-deflate';
} else {
this.encoding = undefined;
}
}

/**
* Consumes and returns the number of bytes sent.
*/
getBytesWritten(): number {
const written = this.socket.bytesWritten - this.lastBytesWritten;
this.lastBytesWritten = this.socket.bytesWritten;
return written;
}
}
5 changes: 5 additions & 0 deletions packages/rsocket-router/src/router/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { router } from '@powersync/lib-services-framework';
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core';

import { SocketRouterObserver } from './SocketRouterListener.js';
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';

export enum RS_ENDPOINT_TYPE {
// Other methods are supported by RSocket, but are not yet mapped here
Expand All @@ -26,6 +27,10 @@ export type CommonStreamPayload = {
observer: SocketRouterObserver;
responder: SocketResponder;
signal: AbortSignal;
/**
* The underlying websocket connection. Should not be used directly apart from tracking metadata.
*/
connection: WebsocketDuplexConnection;
};

export type ReactiveStreamPayload<O> = CommonStreamPayload & {
Expand Down
17 changes: 16 additions & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { APIMetric } from '@powersync/service-types';
export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal, connection }) => {
const { service_context, logger } = context;
const { routerEngine, metricsEngine, syncContext } = service_context;

Expand Down Expand Up @@ -84,6 +84,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>

metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
const tracker = new sync.RequestTracker(metricsEngine);
if (connection.tracker.encoding) {
// Must be set before we start the stream
tracker.setCompressed(connection.tracker.encoding);
}
try {
for await (const data of sync.streamResponse({
syncContext: syncContext,
Expand Down Expand Up @@ -150,6 +154,17 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
responder.onComplete();
removeStopHandler();
disposer();
if (connection.tracker.encoding) {
// Technically, this may not be unique to this specific stream, since there could be multiple
// rsocket streams on the same websocket connection. We don't have a way to track compressed bytes
// on individual streams, and we generally expect 1 stream per connection, so this is a reasonable
// approximation.
// If there are multiple streams, bytes written would be split arbitrarily across them, but the
// total should be correct.
// For non-compressed cases, this is tracked by the stream itself.
const socketBytes = connection.tracker.getBytesWritten();
tracker.addCompressedDataSent(socketBytes);
}
logger.info(`Sync stream complete`, {
...tracker.getLogMeta(),
stream_ms: Date.now() - streamStart,
Expand Down