From 53bb1ec89ec658d9bc9515f320eee33e8d1c925d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 18:36:04 +0200 Subject: [PATCH 1/2] Allow binary data in HTTP streams --- .changeset/green-queens-float.md | 5 +++ packages/service-core/package.json | 2 + .../src/routes/endpoints/socket-route.ts | 8 +--- .../src/routes/endpoints/sync-stream.ts | 45 ++++++++++--------- packages/service-core/src/sync/util.ts | 33 +++++++++++++- pnpm-lock.yaml | 17 +++++++ 6 files changed, 81 insertions(+), 29 deletions(-) create mode 100644 .changeset/green-queens-float.md diff --git a/.changeset/green-queens-float.md b/.changeset/green-queens-float.md new file mode 100644 index 000000000..e17a08057 --- /dev/null +++ b/.changeset/green-queens-float.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Support BSON lines through the HTTP endpoint via the `application/vnd.powersync.bson-stream` content-type. diff --git a/packages/service-core/package.json b/packages/service-core/package.json index 17669c9e9..e40bb0044 100644 --- a/packages/service-core/package.json +++ b/packages/service-core/package.json @@ -37,6 +37,7 @@ "jose": "^4.15.1", "lodash": "^4.17.21", "lru-cache": "^10.2.2", + "negotiator": "^1.0.0", "node-fetch": "^3.3.2", "ts-codec": "^1.3.0", "uri-js": "^4.4.1", @@ -46,6 +47,7 @@ }, "devDependencies": { "@types/async": "^3.2.24", + "@types/negotiator": "^0.6.4", "@types/lodash": "^4.17.5", "fastify": "4.23.2", "fastify-plugin": "^4.5.1" diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 0533aab00..7f6a86ddf 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -1,6 +1,5 @@ import { ErrorCode, errors, schema } from '@powersync/lib-services-framework'; import { RequestParameters } from '@powersync/service-sync-rules'; -import { serialize } from 'bson'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; @@ -110,16 +109,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => break; } if (data == null) { - // Empty value just to flush iterator memory continue; - } else if (typeof data == 'string') { - // Should not happen with binary_data: true - throw new Error(`Unexpected string data: ${data}`); } { - // On NodeJS, serialize always returns a Buffer - const serialized = serialize(data) as Buffer; + const serialized = sync.syncLineToBson(data); responder.onNext({ data: serialized }, false); requestedN--; tracker.addDataSynced(serialized.length); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 644627ede..e5fad17d7 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -1,6 +1,7 @@ import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework'; import { RequestParameters } from '@powersync/service-sync-rules'; import { Readable } from 'stream'; +import Negotiator from 'negotiator'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; @@ -14,6 +15,10 @@ export enum SyncRoutes { STREAM = '/sync/stream' } +const ndJsonContentType = 'application/x-ndjson'; +const concatenatedBsonContentType = 'application/vnd.powersync.bson-stream'; +const supportedContentTypes = [ndJsonContentType, concatenatedBsonContentType]; + export const syncStreamed = routeDefinition({ path: SyncRoutes.STREAM, method: router.HTTPMethod.POST, @@ -26,6 +31,8 @@ export const syncStreamed = routeDefinition({ const userAgent = headers['x-user-agent'] ?? headers['user-agent']; const clientId = payload.params.client_id; const streamStart = Date.now(); + // This falls back to JSON unless there's preference for the bson-stream in the Accept header. + const useBson = new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType; logger.defaultMeta = { ...logger.defaultMeta, @@ -61,25 +68,23 @@ export const syncStreamed = routeDefinition({ const tracker = new sync.RequestTracker(metricsEngine); try { metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); - const stream = Readable.from( - sync.transformToBytesTracked( - sync.ndjson( - sync.streamResponse({ - syncContext: syncContext, - bucketStorage, - syncRules: syncRules, - params, - syncParams, - token: payload.context.token_payload!, - tracker, - signal: controller.signal, - logger - }) - ), - tracker - ), - { objectMode: false, highWaterMark: 16 * 1024 } - ); + const syncLines = sync.streamResponse({ + syncContext: syncContext, + bucketStorage, + syncRules: syncRules, + params, + syncParams, + token: payload.context.token_payload!, + tracker, + signal: controller.signal, + logger + }); + + const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines); + const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), { + objectMode: false, + highWaterMark: 16 * 1024 + }); // Best effort guess on why the stream was closed. // We use the `??=` operator everywhere, so that we catch the first relevant @@ -114,7 +119,7 @@ export const syncStreamed = routeDefinition({ return new router.RouterResponse({ status: 200, headers: { - 'Content-Type': 'application/x-ndjson' + 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType }, data: stream, afterSend: async (details) => { diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index d9ef40ce4..6cc0671b0 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -3,6 +3,7 @@ import * as timers from 'timers/promises'; import { SemaphoreInterface } from 'async-mutex'; import * as util from '../util/util-index.js'; import { RequestTracker } from './RequestTracker.js'; +import { serialize } from 'bson'; export type TokenStreamOptions = { /** @@ -76,6 +77,27 @@ export async function* tokenStream( } } +export function syncLineToBson(line: string | Record): Buffer { + if (typeof line == 'string') { + // Should not happen with binary_data: true + throw new Error(`Unexpected string data: ${line}`); + } else { + // On NodeJS, serialize always returns a Buffer + return serialize(line) as Buffer; + } +} + +export async function* bsonLines(iterator: AsyncIterable>): AsyncGenerator { + for await (let line of iterator) { + if (line == null) { + // Empty value just to flush iterator memory + continue; + } else { + yield syncLineToBson(line); + } + } +} + export async function* ndjson(iterator: AsyncIterable>): AsyncGenerator { for await (let data of iterator) { if (data == null) { @@ -91,11 +113,18 @@ export async function* ndjson(iterator: AsyncIterable, + iterator: AsyncIterable, tracker: RequestTracker ): AsyncGenerator { for await (let data of iterator) { - const encoded = Buffer.from(data, 'utf8'); + let encoded: Buffer; + + if (typeof data == 'string') { + encoded = Buffer.from(data, 'utf8'); + } else { + encoded = data; + } + tracker.addDataSynced(encoded.length); yield encoded; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97d1c7a2e..280ed376d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -546,6 +546,9 @@ importers: lru-cache: specifier: ^10.2.2 version: 10.4.3 + negotiator: + specifier: ^1.0.0 + version: 1.0.0 node-fetch: specifier: ^3.3.2 version: 3.3.2 @@ -571,6 +574,9 @@ importers: '@types/lodash': specifier: ^4.17.5 version: 4.17.6 + '@types/negotiator': + specifier: ^0.6.4 + version: 0.6.4 fastify: specifier: 4.23.2 version: 4.23.2 @@ -1517,6 +1523,9 @@ packages: '@types/mysql@2.15.22': resolution: {integrity: sha512-wK1pzsJVVAjYCSZWQoWHziQZbNggXFDUEIGf54g4ZM/ERuP86uGdWeKZWMYlqTPMZfHJJvLPyogXGvCOg87yLQ==} + '@types/negotiator@0.6.4': + resolution: {integrity: sha512-elf6BsTq+AkyNsb2h5cGNst2Mc7dPliVoAPm1fXglC/BM3f2pFA40BaSSv3E5lyHteEawVKLP+8TwiY1DMNb3A==} + '@types/node@12.20.55': resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==} @@ -2876,6 +2885,10 @@ packages: resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==} engines: {node: '>= 0.6'} + negotiator@1.0.0: + resolution: {integrity: sha512-8Ofs/AUQh8MaEcrlq5xOX0CQ9ypTF5dl78mjlMNfOK08fzpgTHQRQPBxcPlEtIw0yRpws+Zo/3r+5WRby7u3Gg==} + engines: {node: '>= 0.6'} + node-cleanup@2.1.2: resolution: {integrity: sha512-qN8v/s2PAJwGUtr1/hYTpNKlD6Y9rc4p8KSmJXyGdYGZsDGKXrGThikLFP9OCHFeLeEpQzPwiAtdIvBLqm//Hw==} @@ -4972,6 +4985,8 @@ snapshots: dependencies: '@types/node': 22.16.2 + '@types/negotiator@0.6.4': {} + '@types/node@12.20.55': {} '@types/node@13.13.52': {} @@ -6355,6 +6370,8 @@ snapshots: negotiator@0.6.3: {} + negotiator@1.0.0: {} + node-cleanup@2.1.2: {} node-domexception@1.0.0: {} From 183787412bca4b058e2f47288d0605ebebc6005c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 18:43:05 +0200 Subject: [PATCH 2/2] Add to logs --- packages/service-core/src/routes/endpoints/sync-stream.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index e5fad17d7..8ca6306ca 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -32,13 +32,16 @@ export const syncStreamed = routeDefinition({ const clientId = payload.params.client_id; const streamStart = Date.now(); // This falls back to JSON unless there's preference for the bson-stream in the Accept header. - const useBson = new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType; + const useBson = + payload.request.headers.accept && + new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType; logger.defaultMeta = { ...logger.defaultMeta, user_agent: userAgent, client_id: clientId, - user_id: payload.context.user_id + user_id: payload.context.user_id, + bson: useBson }; if (routerEngine.closed) {