Skip to content
54 changes: 54 additions & 0 deletions packages/service-core/src/routes/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Readable } from 'node:stream';
import type Negotiator from 'negotiator';
import * as zlib from 'node:zlib';

/**
* Compress a streamed response.
*
* `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses.
* The manual implementation is simple enough, and gives us more control over the low-level details.
*
* @param negotiator Negotiator from the request, to negotiate response encoding
* @param stream plain-text stream
* @returns
*/
export function maybeCompressResponseStream(
negotiator: Negotiator,
stream: Readable
): { stream: Readable; encodingHeaders: Record<string, string> } {
const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'identity' });
if (encoding == 'zstd') {
return {
stream: stream.pipe(
// Available since Node v23.8.0, v22.15.0
// This does the actual compression in a background thread pool.
zlib.createZstdCompress({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.ZSTD_e_flush,
params: {
// Default compression level is 3. We reduce this slightly to limit CPU overhead
[zlib.constants.ZSTD_c_compressionLevel]: 2
}
})
),
encodingHeaders: { 'Content-Encoding': 'zstd' }
};
} else if (encoding == 'gzip') {
return {
stream: stream.pipe(
zlib.createGzip({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.Z_SYNC_FLUSH
})
),
encodingHeaders: { 'Content-Encoding': 'gzip' }
};
} else {
return {
stream: stream,
encodingHeaders: {}
};
}
}
16 changes: 9 additions & 7 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { Readable } from 'stream';
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
import Negotiator from 'negotiator';
import { Readable } from 'stream';

import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';
Expand All @@ -10,6 +9,7 @@ import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';

import { APIMetric } from '@powersync/service-types';
import { maybeCompressResponseStream } from '../compression.js';

export enum SyncRoutes {
STREAM = '/sync/stream'
Expand All @@ -31,10 +31,10 @@ export const syncStreamed = routeDefinition({
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
const clientId = payload.params.client_id;
const streamStart = Date.now();
const negotiator = new Negotiator(payload.request);
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
const useBson =
payload.request.headers.accept &&
new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType;
payload.request.headers.accept && negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType;

logger.defaultMeta = {
...logger.defaultMeta,
Expand Down Expand Up @@ -80,10 +80,11 @@ export const syncStreamed = routeDefinition({
});

const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
objectMode: false,
highWaterMark: 16 * 1024
});
const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream);

// Best effort guess on why the stream was closed.
// We use the `??=` operator everywhere, so that we catch the first relevant
Expand Down Expand Up @@ -118,7 +119,8 @@ export const syncStreamed = routeDefinition({
return new router.RouterResponse({
status: 200,
headers: {
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType,
...encodingHeaders
},
data: stream,
afterSend: async (details) => {
Expand Down