Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-taxis-lick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-image': minor
---

Support WebSocket requests to be encoded as JSON, which will enable more SDKs to use WebSockets as a transport protocol when receiving sync lines.
7 changes: 7 additions & 0 deletions .changeset/wild-maps-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-rsocket-router': patch
'@powersync/service-errors': patch
'@powersync/service-core': minor
---

Allow RSocket request payload to be encoded as JSON
54 changes: 39 additions & 15 deletions packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ import {
SocketResponder
} from './types.js';

export interface ReactiveStreamRequest {
payload: Payload;
metadataMimeType: string;
dataMimeType: string;
initialN: number;
responder: SocketResponder;
}

export class ReactiveSocketRouter<C> {
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}

Expand Down Expand Up @@ -75,7 +83,13 @@ export class ReactiveSocketRouter<C> {
throw new errors.AuthorizationError(ErrorCode.PSYNC_S2101, 'No context meta data provided');
}

const context = await params.contextProvider(payload.metadata!);
const metadataMimeType = payload.metadataMimeType;
const dataMimeType = payload.dataMimeType;

const context = await params.contextProvider({
mimeType: payload.metadataMimeType,
contents: payload.metadata!
});

return {
// RequestStream is currently the only supported connection type
Expand All @@ -84,13 +98,17 @@ export class ReactiveSocketRouter<C> {
const abortController = new AbortController();

// TODO: Consider limiting the number of active streams per connection to prevent abuse
handleReactiveStream(context, { payload, initialN, responder }, observer, abortController, params).catch(
(ex) => {
logger.error(ex);
responder.onError(ex);
responder.onComplete();
}
);
handleReactiveStream(
context,
{ payload, initialN, responder, dataMimeType, metadataMimeType },
observer,
abortController,
params
).catch((ex) => {
logger.error(ex);
responder.onError(ex);
responder.onComplete();
});
return {
cancel: () => {
abortController.abort();
Expand All @@ -115,11 +133,7 @@ export class ReactiveSocketRouter<C> {

export async function handleReactiveStream<Context>(
context: Context,
request: {
payload: Payload;
initialN: number;
responder: SocketResponder;
},
request: ReactiveStreamRequest,
observer: SocketRouterObserver,
abortController: AbortController,
params: CommonParams<Context>
Expand All @@ -137,7 +151,10 @@ export async function handleReactiveStream<Context>(
return exitWithError(new errors.ValidationError('Metadata is not provided'));
}

const meta = await params.metaDecoder(metadata);
const meta = await params.metaDecoder({
mimeType: request.metadataMimeType,
contents: metadata
});

const { path } = meta;

Expand All @@ -148,7 +165,14 @@ export async function handleReactiveStream<Context>(
}

const { handler, authorize, validator, decoder = params.payloadDecoder } = route;
const requestPayload = await decoder(payload.data || undefined);
const requestPayload = await decoder(
payload.data
? {
contents: payload.data,
mimeType: request.dataMimeType
}
: undefined
);

if (validator) {
const isValid = validator.validate(requestPayload);
Expand Down
16 changes: 12 additions & 4 deletions packages/rsocket-router/src/router/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,24 @@ export type IReactiveStream<I = any, O = any, C = any> = Omit<
* Decodes raw payload buffer to [I].
* Falls back to router level decoder if not specified.
*/
decoder?: (rawData?: Buffer) => Promise<I>;
decoder?: (rawData?: TypedBuffer) => Promise<I>;
};

/**
* A {@link Buffer} with an associated mimeType inferred from the RSocket `SETUP` frame.
*/
export interface TypedBuffer {
mimeType: string;
contents: Buffer;
}

export type IReactiveStreamInput<I, O, C> = Omit<IReactiveStream<I, O, C>, 'path' | 'type' | 'method'>;

export type ReactiveEndpoint = IReactiveStream;

export type CommonParams<C> = {
endpoints: Array<ReactiveEndpoint>;
contextProvider: (metaData: Buffer) => Promise<C>;
metaDecoder: (meta: Buffer) => Promise<RequestMeta>;
payloadDecoder: (rawData?: Buffer) => Promise<any>;
contextProvider: (metaData: TypedBuffer) => Promise<C>;
metaDecoder: (meta: TypedBuffer) => Promise<RequestMeta>;
payloadDecoder: (rawData?: TypedBuffer) => Promise<any>;
};
69 changes: 63 additions & 6 deletions packages/rsocket-router/tests/src/requests.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { describe, expect, it, vi } from 'vitest';
import { createMockObserver, createMockResponder } from './utils/mock-responder.js';
import { handleReactiveStream } from '../../src/router/ReactiveSocketRouter.js';
import { handleReactiveStream, ReactiveStreamRequest } from '../../src/router/ReactiveSocketRouter.js';
import { deserialize, serialize } from 'bson';
import { RS_ENDPOINT_TYPE, ReactiveEndpoint, RequestMeta, SocketResponder } from '../../src/router/types.js';
import { ErrorCode } from '@powersync/lib-services-framework';
import { EndpointHandlerPayload, ErrorCode } from '@powersync/lib-services-framework';

/**
* Mocks the process of handling reactive routes
Expand All @@ -12,7 +12,12 @@ import { ErrorCode } from '@powersync/lib-services-framework';
* @param responder a mock responder
* @returns
*/
async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responder: SocketResponder) {
async function handleRoute(
path: string,
endpoints: ReactiveEndpoint[],
responder: SocketResponder,
request?: Partial<ReactiveStreamRequest>
) {
return handleReactiveStream<{}>(
{},
{
Expand All @@ -21,15 +26,18 @@ async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responde
metadata: Buffer.from(serialize({ path }))
},
initialN: 1,
responder
dataMimeType: 'application/bson',
metadataMimeType: 'application/bson',
responder,
...request
},
createMockObserver(),
new AbortController(),
{
contextProvider: async () => ({}),
endpoints,
metaDecoder: async (buffer) => deserialize(buffer) as RequestMeta,
payloadDecoder: async (buffer) => buffer && deserialize(buffer)
metaDecoder: async (buffer) => deserialize(buffer.contents) as RequestMeta,
payloadDecoder: async (buffer) => buffer && deserialize(buffer.contents)
}
);
}
Expand Down Expand Up @@ -133,4 +141,53 @@ describe('Requests', () => {
// Should be a validation error
expect(JSON.stringify(spy.mock.calls[0])).includes(ErrorCode.PSYNC_S2002);
});

it('should forward mime types', async () => {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const responder = createMockResponder();
const encodeJson = (value: any) => encoder.encode(JSON.stringify(value));
const path = '/test-route';

const fn = vi.fn(async (p: EndpointHandlerPayload<any, any>) => {
expect(p.params).toStrictEqual({ hello: 'world' });
return undefined;
});

await handleReactiveStream<{}>(
{},
{
payload: {
data: Buffer.from(encodeJson({ hello: 'world' })),
metadata: Buffer.from(encodeJson({ path }))
},
metadataMimeType: 'application/json',
dataMimeType: 'application/json',
initialN: 1,
responder
},
createMockObserver(),
new AbortController(),
{
contextProvider: async () => ({}),
endpoints: [
{
path,
type: RS_ENDPOINT_TYPE.STREAM,
handler: fn
}
],
metaDecoder: async (buffer) => {
expect(buffer.mimeType, 'application/json');
return JSON.parse(decoder.decode(buffer.contents));
},
payloadDecoder: async (buffer) => {
expect(buffer!.mimeType, 'application/json');
return JSON.parse(decoder.decode(buffer!.contents));
}
}
);

expect(fn).toHaveBeenCalled();
});
});
24 changes: 18 additions & 6 deletions packages/service-core/src/routes/configure-rsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { deserialize } from 'bson';
import * as http from 'http';

import { ErrorCode, errors, logger } from '@powersync/lib-services-framework';
import { ReactiveSocketRouter, RSocketRequestMeta } from '@powersync/service-rsocket-router';
import { ReactiveSocketRouter, RSocketRequestMeta, TypedBuffer } from '@powersync/service-rsocket-router';

import { ServiceContext } from '../system/ServiceContext.js';
import { generateContext, getTokenFromHeader } from './auth.js';
Expand All @@ -22,8 +22,8 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
const { route_generators = DEFAULT_SOCKET_ROUTES, server, service_context } = options;

router.applyWebSocketEndpoints(server, {
contextProvider: async (data: Buffer): Promise<Context & { token: string }> => {
const { token, user_agent } = RSocketContextMeta.decode(deserialize(data) as any);
contextProvider: async (data: TypedBuffer): Promise<Context & { token: string }> => {
const { token, user_agent } = RSocketContextMeta.decode(decodeTyped(data) as any);

if (!token) {
throw new errors.AuthorizationError(ErrorCode.PSYNC_S2106, 'No token provided');
Expand Down Expand Up @@ -58,9 +58,21 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
}
},
endpoints: route_generators.map((generator) => generator(router)),
metaDecoder: async (meta: Buffer) => {
return RSocketRequestMeta.decode(deserialize(meta) as any);
metaDecoder: async (meta: TypedBuffer) => {
return RSocketRequestMeta.decode(decodeTyped(meta) as any);
},
payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData)
payloadDecoder: async (rawData?: TypedBuffer) => rawData && decodeTyped(rawData)
});
}

function decodeTyped(data: TypedBuffer) {
switch (data.mimeType) {
case 'application/json':
const decoder = new TextDecoder();
return JSON.parse(decoder.decode(data.contents));
case 'application/bson':
return deserialize(data.contents);
}

throw new errors.UnsupportedMediaType(`Expected JSON or BSON request, got ${data.mimeType}`);
}
7 changes: 7 additions & 0 deletions packages/service-errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ export enum ErrorCode {
*/
PSYNC_S2003 = 'PSYNC_S2003',

/**
* 415 unsupported media type.
*
* This code always indicates an issue with the client.
*/
PSYNC_S2004 = 'PSYNC_S2004',

// ## PSYNC_S21xx: Auth errors originating on the client.
//
// This does not include auth configuration errors on the service.
Expand Down
13 changes: 13 additions & 0 deletions packages/service-errors/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ export class ReplicationAbortedError extends ServiceError {
}
}

export class UnsupportedMediaType extends ServiceError {
static readonly CODE = ErrorCode.PSYNC_S2004;

constructor(errors: any) {
super({
code: UnsupportedMediaType.CODE,
status: 415,
description: 'Unsupported Media Type',
details: errors
});
}
}

export class AuthorizationError extends ServiceError {
/**
* String describing the token. Does not contain the full token, but may help with debugging.
Expand Down