Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/green-queens-float.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Support BSON lines through the HTTP endpoint via the `application/vnd.powersync.bson-stream` content-type.
2 changes: 2 additions & 0 deletions packages/service-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"jose": "^4.15.1",
"lodash": "^4.17.21",
"lru-cache": "^10.2.2",
"negotiator": "^1.0.0",
"node-fetch": "^3.3.2",
"ts-codec": "^1.3.0",
"uri-js": "^4.4.1",
Expand All @@ -46,6 +47,7 @@
},
"devDependencies": {
"@types/async": "^3.2.24",
"@types/negotiator": "^0.6.4",
"@types/lodash": "^4.17.5",
"fastify": "4.23.2",
"fastify-plugin": "^4.5.1"
Expand Down
8 changes: 1 addition & 7 deletions packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { ErrorCode, errors, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { serialize } from 'bson';

import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';
Expand Down Expand Up @@ -110,16 +109,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
break;
}
if (data == null) {
// Empty value just to flush iterator memory
continue;
} else if (typeof data == 'string') {
// Should not happen with binary_data: true
throw new Error(`Unexpected string data: ${data}`);
}

{
// On NodeJS, serialize always returns a Buffer
const serialized = serialize(data) as Buffer;
const serialized = sync.syncLineToBson(data);
responder.onNext({ data: serialized }, false);
requestedN--;
tracker.addDataSynced(serialized.length);
Expand Down
50 changes: 29 additions & 21 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { Readable } from 'stream';
import Negotiator from 'negotiator';

import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';
Expand All @@ -14,6 +15,10 @@ export enum SyncRoutes {
STREAM = '/sync/stream'
}

const ndJsonContentType = 'application/x-ndjson';
const concatenatedBsonContentType = 'application/vnd.powersync.bson-stream';
const supportedContentTypes = [ndJsonContentType, concatenatedBsonContentType];

export const syncStreamed = routeDefinition({
path: SyncRoutes.STREAM,
method: router.HTTPMethod.POST,
Expand All @@ -26,12 +31,17 @@ export const syncStreamed = routeDefinition({
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
const clientId = payload.params.client_id;
const streamStart = Date.now();
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
const useBson =
payload.request.headers.accept &&
new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType;

logger.defaultMeta = {
...logger.defaultMeta,
user_agent: userAgent,
client_id: clientId,
user_id: payload.context.user_id
user_id: payload.context.user_id,
bson: useBson
};

if (routerEngine.closed) {
Expand Down Expand Up @@ -61,25 +71,23 @@ export const syncStreamed = routeDefinition({
const tracker = new sync.RequestTracker(metricsEngine);
try {
metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
const stream = Readable.from(
sync.transformToBytesTracked(
sync.ndjson(
sync.streamResponse({
syncContext: syncContext,
bucketStorage,
syncRules: syncRules,
params,
syncParams,
token: payload.context.token_payload!,
tracker,
signal: controller.signal,
logger
})
),
tracker
),
{ objectMode: false, highWaterMark: 16 * 1024 }
);
const syncLines = sync.streamResponse({
syncContext: syncContext,
bucketStorage,
syncRules: syncRules,
params,
syncParams,
token: payload.context.token_payload!,
tracker,
signal: controller.signal,
logger
});

const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
objectMode: false,
highWaterMark: 16 * 1024
});

// Best effort guess on why the stream was closed.
// We use the `??=` operator everywhere, so that we catch the first relevant
Expand Down Expand Up @@ -114,7 +122,7 @@ export const syncStreamed = routeDefinition({
return new router.RouterResponse({
status: 200,
headers: {
'Content-Type': 'application/x-ndjson'
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
},
data: stream,
afterSend: async (details) => {
Expand Down
33 changes: 31 additions & 2 deletions packages/service-core/src/sync/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as timers from 'timers/promises';
import { SemaphoreInterface } from 'async-mutex';
import * as util from '../util/util-index.js';
import { RequestTracker } from './RequestTracker.js';
import { serialize } from 'bson';

export type TokenStreamOptions = {
/**
Expand Down Expand Up @@ -76,6 +77,27 @@ export async function* tokenStream(
}
}

export function syncLineToBson(line: string | Record<string, any>): Buffer {
if (typeof line == 'string') {
// Should not happen with binary_data: true
throw new Error(`Unexpected string data: ${line}`);
} else {
// On NodeJS, serialize always returns a Buffer
return serialize(line) as Buffer;
}
}

export async function* bsonLines(iterator: AsyncIterable<string | null | Record<string, any>>): AsyncGenerator<Buffer> {
for await (let line of iterator) {
if (line == null) {
// Empty value just to flush iterator memory
continue;
} else {
yield syncLineToBson(line);
}
}
}

export async function* ndjson(iterator: AsyncIterable<string | null | Record<string, any>>): AsyncGenerator<string> {
for await (let data of iterator) {
if (data == null) {
Expand All @@ -91,11 +113,18 @@ export async function* ndjson(iterator: AsyncIterable<string | null | Record<str
}

export async function* transformToBytesTracked(
iterator: AsyncIterable<string>,
iterator: AsyncIterable<string | Buffer>,
tracker: RequestTracker
): AsyncGenerator<Buffer> {
for await (let data of iterator) {
const encoded = Buffer.from(data, 'utf8');
let encoded: Buffer;

if (typeof data == 'string') {
encoded = Buffer.from(data, 'utf8');
} else {
encoded = data;
}

tracker.addDataSynced(encoded.length);
yield encoded;
}
Expand Down
17 changes: 17 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.