Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-seals-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix inconsistencies between binary data being requested and actually being sent.
45 changes: 30 additions & 15 deletions packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

const lines = await consumeCheckpointLines(stream);
Expand Down Expand Up @@ -149,7 +150,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

const lines = await consumeCheckpointLines(stream);
Expand Down Expand Up @@ -211,7 +213,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

let sentCheckpoints = 0;
Expand Down Expand Up @@ -320,7 +323,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: 'user_one', exp: Date.now() / 1000 + 100000 } as any
token: { sub: 'user_one', exp: Date.now() / 1000 + 100000 } as any,
isEncodingAsBson: false
});

let sentCheckpoints = 0;
Expand Down Expand Up @@ -460,7 +464,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

let sentRows = 0;
Expand Down Expand Up @@ -575,7 +580,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 100000 } as any
token: { sub: '', exp: Date.now() / 1000 + 100000 } as any,
isEncodingAsBson: false
});

const lines: any[] = [];
Expand Down Expand Up @@ -640,7 +646,8 @@ bucket_definitions:
raw_data: false
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

const lines = await consumeCheckpointLines(stream);
Expand Down Expand Up @@ -668,7 +675,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: 0 } as any
token: { sub: '', exp: 0 } as any,
isEncodingAsBson: false
});

const lines = await consumeCheckpointLines(stream);
Expand Down Expand Up @@ -698,7 +706,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});
const iter = stream[Symbol.asyncIterator]();
context.onTestFinished(() => {
Expand Down Expand Up @@ -771,7 +780,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any,
isEncodingAsBson: false
});
const iter = stream[Symbol.asyncIterator]();
context.onTestFinished(() => {
Expand Down Expand Up @@ -846,7 +856,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any,
isEncodingAsBson: false
});
const iter = stream[Symbol.asyncIterator]();
context.onTestFinished(() => {
Expand Down Expand Up @@ -912,7 +923,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any
token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any,
isEncodingAsBson: false
});
const iter = stream[Symbol.asyncIterator]();
context.onTestFinished(() => {
Expand Down Expand Up @@ -979,7 +991,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: exp } as any
token: { sub: '', exp: exp } as any,
isEncodingAsBson: false
});
const iter = stream[Symbol.asyncIterator]();
context.onTestFinished(() => {
Expand Down Expand Up @@ -1041,7 +1054,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: '', exp: Date.now() / 1000 + 10 } as any
token: { sub: '', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
});

const iter = stream[Symbol.asyncIterator]();
Expand Down Expand Up @@ -1166,7 +1180,8 @@ bucket_definitions:
raw_data: true
},
tracker,
token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any
token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any,
isEncodingAsBson: false
};
const stream1 = sync.streamResponse(params);
const lines1 = await consumeCheckpointLines(stream1);
Expand Down
6 changes: 3 additions & 3 deletions packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
bucketStorage: bucketStorage,
syncRules: syncRules,
params: {
...params,
binary_data: true // always true for web sockets
...params
},
token: context!.token_payload!,
tokenStreamOptions: {
Expand All @@ -100,7 +99,8 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
},
tracker,
signal,
logger
logger,
isEncodingAsBson: true
})) {
if (signal.aborted) {
break;
Expand Down
9 changes: 5 additions & 4 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ export const syncStreamed = routeDefinition({
const clientId = payload.params.client_id;
const streamStart = Date.now();
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
const useBson =
payload.request.headers.accept &&
new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType;
const useBson = payload.request.headers.accept
? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType
: false;

logger.defaultMeta = {
...logger.defaultMeta,
Expand Down Expand Up @@ -76,7 +76,8 @@ export const syncStreamed = routeDefinition({
token: payload.context.token_payload!,
tracker,
signal: controller.signal,
logger
logger,
isEncodingAsBson: useBson
});

const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
Expand Down
23 changes: 18 additions & 5 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface SyncStreamParameters {
params: util.StreamingSyncRequest;
token: auth.JwtPayload;
logger?: Logger;
isEncodingAsBson: boolean;
/**
* If this signal is aborted, the stream response ends as soon as possible, without error.
*/
Expand All @@ -39,7 +40,17 @@ export interface SyncStreamParameters {
export async function* streamResponse(
options: SyncStreamParameters
): AsyncIterable<util.StreamingSyncLine | string | null> {
const { syncContext, bucketStorage, syncRules, params, token, tokenStreamOptions, tracker, signal } = options;
const {
syncContext,
bucketStorage,
syncRules,
params,
token,
tokenStreamOptions,
tracker,
signal,
isEncodingAsBson
} = options;
const logger = options.logger ?? defaultLogger;

// We also need to be able to abort, so we create our own controller.
Expand All @@ -65,7 +76,8 @@ export async function* streamResponse(
token,
tracker,
controller.signal,
logger
logger,
isEncodingAsBson
);
// Merge the two streams, and abort as soon as one of the streams end.
const merged = mergeAsyncIterables([stream, ki], controller.signal);
Expand Down Expand Up @@ -93,9 +105,10 @@ async function* streamResponseInner(
tokenPayload: RequestJwtPayload,
tracker: RequestTracker,
signal: AbortSignal,
logger: Logger
logger: Logger,
isEncodingAsBson: boolean
): AsyncGenerator<util.StreamingSyncLine | string | null> {
const { raw_data, binary_data } = params;
const { raw_data } = params;

const userId = tokenPayload.sub;
const checkpointUserId = util.checkpointUserId(userId as string, params.client_id);
Expand Down Expand Up @@ -226,7 +239,7 @@ async function* streamResponseInner(
bucketsToFetch: buckets,
checkpointLine: line,
raw_data,
binary_data,
binary_data: isEncodingAsBson,
onRowsSent: markOperationsSent,
abort_connection: signal,
abort_batch: abortCheckpointSignal,
Expand Down
5 changes: 0 additions & 5 deletions packages/service-core/src/util/protocol-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ export const StreamingSyncRequest = t.object({
*/
raw_data: t.boolean.optional(),

/**
* Data is received in a serialized BSON Buffer
*/
binary_data: t.boolean.optional(),

/**
* Client parameters to be passed to the sync rules.
*/
Expand Down