Skip to content

Commit 86807d0

Browse files
authored
Compression for sync streams (#329)
* Enable gzip, zstd compression for sync streams. * Actually enable compression. * Fix error handling in compressed streams. * [WIP] stream tests. * Log encoding used. * Semi-functional test. * Add metric for actual bytes sent, powersync_data_sent_bytes_total. * Changeset. * Minor refactoring.
1 parent df9fa63 commit 86807d0

File tree

10 files changed

+271
-10
lines changed

10 files changed

+271
-10
lines changed

.changeset/spicy-vans-matter.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-core': minor
3+
'@powersync/service-types': minor
4+
'@powersync/service-image': minor
5+
---
6+
7+
Support gzip and zstd compression in http streams.

packages/service-core/src/api/api-metrics.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ export function createCoreAPIMetrics(engine: MetricsEngine): void {
1212
unit: 'bytes'
1313
});
1414

15+
engine.createCounter({
16+
name: APIMetric.DATA_SENT_BYTES,
17+
description: 'Size of data sent to clients, after compression if applicable',
18+
unit: 'bytes'
19+
});
20+
1521
engine.createCounter({
1622
name: APIMetric.OPERATIONS_SYNCED,
1723
description: 'Number of operations synced'
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import type Negotiator from 'negotiator';
2+
import { PassThrough, pipeline, Readable, Transform } from 'node:stream';
3+
import * as zlib from 'node:zlib';
4+
import { RequestTracker } from '../sync/RequestTracker.js';
5+
6+
/**
7+
* Compress a streamed response.
8+
*
9+
* `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses.
10+
* The manual implementation is simple enough, and gives us more control over the low-level details.
11+
*
12+
* @param negotiator Negotiator from the request, to negotiate response encoding
13+
* @param stream plain-text stream
14+
* @returns
15+
*/
16+
export function maybeCompressResponseStream(
17+
negotiator: Negotiator,
18+
stream: Readable,
19+
tracker: RequestTracker
20+
): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } {
21+
const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' });
22+
const transform = createCompressionTransform(encoding);
23+
if (transform == null) {
24+
// No matching compression supported - leave stream as-is
25+
return {
26+
stream,
27+
encodingHeaders: {}
28+
};
29+
} else {
30+
tracker.setCompressed(encoding);
31+
return {
32+
stream: transformStream(stream, transform, tracker),
33+
encodingHeaders: { 'content-encoding': encoding }
34+
};
35+
}
36+
}
37+
38+
function createCompressionTransform(encoding: string | undefined): Transform | null {
39+
if (encoding == 'zstd') {
40+
// Available since Node v23.8.0, v22.15.0
41+
// This does the actual compression in a background thread pool.
42+
return zlib.createZstdCompress({
43+
// We need to flush the frame after every new input chunk, to avoid delaying data
44+
// in the output stream.
45+
flush: zlib.constants.ZSTD_e_flush,
46+
params: {
47+
// Default compression level is 3. We reduce this slightly to limit CPU overhead
48+
[zlib.constants.ZSTD_c_compressionLevel]: 2
49+
}
50+
});
51+
} else if (encoding == 'gzip') {
52+
return zlib.createGzip({
53+
// We need to flush the frame after every new input chunk, to avoid delaying data
54+
// in the output stream.
55+
flush: zlib.constants.Z_SYNC_FLUSH
56+
});
57+
}
58+
return null;
59+
}
60+
61+
function transformStream(source: Readable, transform: Transform, tracker: RequestTracker) {
62+
// pipe does not forward error events automatically, resulting in unhandled error
63+
// events. This forwards it.
64+
const out = new PassThrough();
65+
const trackingTransform = new Transform({
66+
transform(chunk, _encoding, callback) {
67+
tracker.addCompressedDataSent(chunk.length);
68+
callback(null, chunk);
69+
}
70+
});
71+
pipeline(source, transform, trackingTransform, out, (err) => {
72+
if (err) out.destroy(err);
73+
});
74+
return out;
75+
}

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
113113
const serialized = sync.syncLineToBson(data);
114114
responder.onNext({ data: serialized }, false);
115115
requestedN--;
116-
tracker.addDataSynced(serialized.length);
116+
tracker.addPlaintextDataSynced(serialized.length);
117117
}
118118

119119
if (requestedN <= 0 && !signal.aborted) {

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
2-
import { RequestParameters } from '@powersync/service-sync-rules';
3-
import { Readable } from 'stream';
1+
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
42
import Negotiator from 'negotiator';
3+
import { Readable } from 'stream';
54

65
import * as sync from '../../sync/sync-index.js';
76
import * as util from '../../util/util-index.js';
@@ -10,6 +9,7 @@ import { authUser } from '../auth.js';
109
import { routeDefinition } from '../router.js';
1110

1211
import { APIMetric } from '@powersync/service-types';
12+
import { maybeCompressResponseStream } from '../compression.js';
1313

1414
export enum SyncRoutes {
1515
STREAM = '/sync/stream'
@@ -31,9 +31,10 @@ export const syncStreamed = routeDefinition({
3131
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
3232
const clientId = payload.params.client_id;
3333
const streamStart = Date.now();
34+
const negotiator = new Negotiator(payload.request);
3435
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
3536
const useBson = payload.request.headers.accept
36-
? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType
37+
? negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType
3738
: false;
3839

3940
logger.defaultMeta = {
@@ -81,10 +82,11 @@ export const syncStreamed = routeDefinition({
8182
});
8283

8384
const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
84-
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
85+
const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
8586
objectMode: false,
8687
highWaterMark: 16 * 1024
8788
});
89+
const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream, tracker);
8890

8991
// Best effort guess on why the stream was closed.
9092
// We use the `??=` operator everywhere, so that we catch the first relevant
@@ -119,7 +121,8 @@ export const syncStreamed = routeDefinition({
119121
return new router.RouterResponse({
120122
status: 200,
121123
headers: {
122-
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
124+
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType,
125+
...encodingHeaders
123126
},
124127
data: stream,
125128
afterSend: async (details) => {

packages/service-core/src/sync/RequestTracker.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,20 @@ import { MetricsEngine } from '../metrics/MetricsEngine.js';
22

33
import { APIMetric } from '@powersync/service-types';
44
import { SyncBucketData } from '../util/protocol-types.js';
5+
import { ServiceAssertionError } from '@powersync/lib-services-framework';
56

67
/**
78
* Record sync stats per request stream.
89
*/
910
export class RequestTracker {
1011
operationsSynced = 0;
1112
dataSyncedBytes = 0;
13+
dataSentBytes = 0;
1214
operationCounts: OperationCounts = { put: 0, remove: 0, move: 0, clear: 0 };
1315
largeBuckets: Record<string, number> = {};
1416

17+
private encoding: string | undefined = undefined;
18+
1519
constructor(private metrics: MetricsEngine) {
1620
this.metrics = metrics;
1721
}
@@ -29,18 +33,39 @@ export class RequestTracker {
2933
this.metrics.getCounter(APIMetric.OPERATIONS_SYNCED).add(operations.total);
3034
}
3135

32-
addDataSynced(bytes: number) {
36+
setCompressed(encoding: string) {
37+
this.encoding = encoding;
38+
}
39+
40+
addPlaintextDataSynced(bytes: number) {
3341
this.dataSyncedBytes += bytes;
3442

3543
this.metrics.getCounter(APIMetric.DATA_SYNCED_BYTES).add(bytes);
44+
45+
if (this.encoding == null) {
46+
// This avoids having to create a separate stream just to track this
47+
this.dataSentBytes += bytes;
48+
49+
this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
50+
}
51+
}
52+
53+
addCompressedDataSent(bytes: number) {
54+
if (this.encoding == null) {
55+
throw new ServiceAssertionError('No compression encoding set');
56+
}
57+
this.dataSentBytes += bytes;
58+
this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
3659
}
3760

3861
getLogMeta() {
3962
return {
4063
operations_synced: this.operationsSynced,
4164
data_synced_bytes: this.dataSyncedBytes,
65+
data_sent_bytes: this.dataSentBytes,
4266
operation_counts: this.operationCounts,
43-
large_buckets: this.largeBuckets
67+
large_buckets: this.largeBuckets,
68+
encoding: this.encoding
4469
};
4570
}
4671
}

packages/service-core/src/sync/util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ export async function* transformToBytesTracked(
125125
encoded = data;
126126
}
127127

128-
tracker.addDataSynced(encoded.length);
128+
tracker.addPlaintextDataSynced(encoded.length);
129129
yield encoded;
130130
}
131131
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import {
2+
BucketStorageFactory,
3+
createCoreAPIMetrics,
4+
MetricsEngine,
5+
OpenTelemetryMetricsFactory,
6+
RouteAPI,
7+
RouterEngine,
8+
ServiceContext,
9+
StorageEngine,
10+
SyncContext,
11+
SyncRulesBucketStorage
12+
} from '@/index.js';
13+
import { MeterProvider } from '@opentelemetry/sdk-metrics';
14+
15+
export function mockServiceContext(storage: Partial<SyncRulesBucketStorage> | null) {
16+
// This is very incomplete - just enough to get the current tests passing.
17+
18+
const storageEngine: StorageEngine = {
19+
activeBucketStorage: {
20+
async getActiveStorage() {
21+
return storage;
22+
}
23+
} as Partial<BucketStorageFactory>
24+
} as any;
25+
26+
const meterProvider = new MeterProvider({
27+
readers: []
28+
});
29+
const meter = meterProvider.getMeter('powersync-tests');
30+
const metricsEngine = new MetricsEngine({
31+
disable_telemetry_sharing: true,
32+
factory: new OpenTelemetryMetricsFactory(meter)
33+
});
34+
createCoreAPIMetrics(metricsEngine);
35+
const service_context: Partial<ServiceContext> = {
36+
syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }),
37+
routerEngine: {
38+
getAPI() {
39+
return {
40+
getParseSyncRulesOptions() {
41+
return { defaultSchema: 'public' };
42+
}
43+
} as Partial<RouteAPI>;
44+
},
45+
addStopHandler() {
46+
return () => {};
47+
}
48+
} as Partial<RouterEngine> as any,
49+
storageEngine,
50+
metricsEngine: metricsEngine,
51+
// Not used
52+
configuration: null as any,
53+
lifeCycleEngine: null as any,
54+
migrations: null as any,
55+
replicationEngine: null as any,
56+
serviceMode: null as any
57+
};
58+
return service_context as ServiceContext;
59+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js';
2+
import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework';
3+
import { SqlSyncRules } from '@powersync/service-sync-rules';
4+
import { Readable, Writable } from 'stream';
5+
import { pipeline } from 'stream/promises';
6+
import { beforeEach, describe, expect, it, vi } from 'vitest';
7+
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
8+
import { mockServiceContext } from './mocks.js';
9+
10+
describe('Stream Route', () => {
11+
describe('compressed stream', () => {
12+
it('handles missing sync rules', async () => {
13+
const context: Context = {
14+
logger: logger,
15+
service_context: mockServiceContext(null)
16+
};
17+
18+
const request: BasicRouterRequest = {
19+
headers: {},
20+
hostname: '',
21+
protocol: 'http'
22+
};
23+
24+
const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>).catch(
25+
(e) => e
26+
)) as ServiceError;
27+
28+
expect(error.errorData.status).toEqual(500);
29+
expect(error.errorData.code).toEqual('PSYNC_S2302');
30+
});
31+
32+
it('handles a stream error with compression', async () => {
33+
// This primarily tests that an underlying storage error doesn't result in an uncaught error
34+
// when compressing the stream.
35+
36+
const storage = {
37+
getParsedSyncRules() {
38+
return new SqlSyncRules('bucket_definitions: {}');
39+
},
40+
watchCheckpointChanges: async function* (options) {
41+
throw new Error('Simulated storage error');
42+
}
43+
} as Partial<SyncRulesBucketStorage>;
44+
const serviceContext = mockServiceContext(storage);
45+
46+
const context: Context = {
47+
logger: logger,
48+
service_context: serviceContext,
49+
token_payload: {
50+
exp: new Date().getTime() / 1000 + 10000,
51+
iat: new Date().getTime() / 1000 - 10000,
52+
sub: 'test-user'
53+
}
54+
};
55+
56+
// It may be worth eventually doing this via Fastify to test the full stack
57+
58+
const request: BasicRouterRequest = {
59+
headers: {
60+
'accept-encoding': 'gzip'
61+
},
62+
hostname: '',
63+
protocol: 'http'
64+
};
65+
66+
const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>);
67+
expect(response.status).toEqual(200);
68+
const stream = response.data as Readable;
69+
const r = await drainWithTimeout(stream).catch((error) => error);
70+
expect(r.message).toContain('Simulated storage error');
71+
});
72+
});
73+
});
74+
75+
export async function drainWithTimeout(readable: Readable, ms = 2_000) {
76+
const devNull = new Writable({
77+
write(_chunk, _enc, cb) {
78+
cb();
79+
} // discard everything
80+
});
81+
82+
// Throws AbortError if it takes longer than ms, and destroys the stream
83+
await pipeline(readable, devNull, { signal: AbortSignal.timeout(ms) });
84+
}

packages/types/src/metrics.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
export enum APIMetric {
22
// Uncompressed size of synced data from PowerSync to Clients
33
DATA_SYNCED_BYTES = 'powersync_data_synced_bytes_total',
4+
// Potentially-compressed size of data sent from PowerSync to Clients
5+
DATA_SENT_BYTES = 'powersync_data_sent_bytes_total',
46
// Number of operations synced
57
OPERATIONS_SYNCED = 'powersync_operations_synced_total',
68
// Number of concurrent sync connections

0 commit comments

Comments
 (0)