Skip to content

Commit 199985e

Browse files
committed
Defer sync stream parsing until the data is processed.
1 parent a31ddb5 commit 199985e

File tree

6 files changed

+75
-58
lines changed

6 files changed

+75
-58
lines changed

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
106106
/**
107107
* Invokes the `powersync_control` function for the sync client.
108108
*/
109-
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
109+
control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise<string>;
110110
}

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
364364
// No-op for now
365365
}
366366

367-
async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
367+
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
368368
return await this.writeTransaction(async (tx) => {
369369
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
370370
return raw;

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ export abstract class AbstractRemote {
273273
*/
274274
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
275275
const bson = await this.getBSON();
276-
return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson);
276+
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
277277
}
278278

279279
/**
@@ -285,9 +285,9 @@ export abstract class AbstractRemote {
285285
*/
286286
async socketStreamRaw<T>(
287287
options: SocketSyncStreamOptions,
288-
map: (buffer: Buffer) => T,
288+
map: (buffer: Uint8Array) => T,
289289
bson?: typeof BSON
290-
): Promise<DataStream> {
290+
): Promise<DataStream<T>> {
291291
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
292292
const mimeType = bson == null ? 'application/json' : 'application/bson';
293293

@@ -358,11 +358,12 @@ export abstract class AbstractRemote {
358358

359359
resetTimeout();
360360

361-
const stream = new DataStream({
361+
const stream = new DataStream<T, Uint8Array>({
362362
logger: this.logger,
363363
pressure: {
364364
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
365-
}
365+
},
366+
mapLine: map
366367
});
367368

368369
let socketIsClosed = false;
@@ -435,7 +436,7 @@ export abstract class AbstractRemote {
435436
return;
436437
}
437438

438-
stream.enqueueData(map(data));
439+
stream.enqueueData(data);
439440
},
440441
onComplete: () => {
441442
stream.close();
@@ -561,8 +562,9 @@ export abstract class AbstractRemote {
561562
const decoder = new TextDecoder();
562563
let buffer = '';
563564

564-
const stream = new DataStream<T>({
565-
logger: this.logger
565+
const stream = new DataStream<T, string>({
566+
logger: this.logger,
567+
mapLine: mapLine
566568
});
567569

568570
const l = stream.registerListener({
@@ -574,7 +576,7 @@ export abstract class AbstractRemote {
574576
if (done) {
575577
const remaining = buffer.trim();
576578
if (remaining.length != 0) {
577-
stream.enqueueData(mapLine(remaining));
579+
stream.enqueueData(remaining);
578580
}
579581

580582
stream.close();
@@ -589,7 +591,7 @@ export abstract class AbstractRemote {
589591
for (var i = 0; i < lines.length - 1; i++) {
590592
var l = lines[i].trim();
591593
if (l.length > 0) {
592-
stream.enqueueData(mapLine(l));
594+
stream.enqueueData(l);
593595
didCompleteLine = true;
594596
}
595597
}

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -847,10 +847,7 @@ The next upload iteration will be delayed.`);
847847
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
848848
// invocation (local events include refreshed tokens and completed uploads).
849849
// This is a single data stream so that we can handle all control calls from a single place.
850-
let controlInvocations: DataStream<{
851-
command: PowerSyncControlCommand;
852-
payload?: ArrayBuffer | string;
853-
}> | null = null;
850+
let controlInvocations: DataStream<EnqueuedCommand, Uint8Array | EnqueuedCommand> | null = null;
854851

855852
async function connect(instr: EstablishSyncStream) {
856853
const syncOptions: SyncStreamOptions = {
@@ -860,20 +857,34 @@ The next upload iteration will be delayed.`);
860857
};
861858

862859
if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) {
863-
controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({
864-
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
865-
payload: line
866-
}));
860+
controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => {
861+
if (typeof line == 'string') {
862+
return {
863+
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
864+
payload: line
865+
};
866+
} else {
867+
// Directly enqueued by us
868+
return line;
869+
}
870+
});
867871
} else {
868872
controlInvocations = await remote.socketStreamRaw(
869873
{
870874
...syncOptions,
871875
fetchStrategy: resolvedOptions.fetchStrategy
872876
},
873-
(buffer) => ({
874-
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
875-
payload: buffer
876-
})
877+
(payload: Uint8Array | EnqueuedCommand) => {
878+
if (payload instanceof Uint8Array) {
879+
return {
880+
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
881+
payload: payload
882+
};
883+
} else {
884+
// Directly enqueued by us
885+
return payload;
886+
}
887+
}
877888
);
878889
}
879890

@@ -900,7 +911,7 @@ The next upload iteration will be delayed.`);
900911
await control(PowerSyncControlCommand.STOP);
901912
}
902913

903-
async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) {
914+
async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) {
904915
const rawResponse = await adapter.control(op, payload ?? null);
905916
await handleInstructions(JSON.parse(rawResponse));
906917
}
@@ -1139,3 +1150,8 @@ The next upload iteration will be delayed.`);
11391150
});
11401151
}
11411152
}
1153+
1154+
interface EnqueuedCommand {
1155+
command: PowerSyncControlCommand;
1156+
payload?: Uint8Array | string;
1157+
}

packages/common/src/utils/DataStream.ts

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import Logger, { ILogger } from 'js-logger';
22
import { BaseListener, BaseObserver } from './BaseObserver.js';
33

4-
export type DataStreamOptions = {
4+
export type DataStreamOptions<ParsedData, SourceData> = {
5+
mapLine?: (line: SourceData) => ParsedData;
6+
57
/**
68
* Close the stream if any consumer throws an error
79
*/
@@ -33,20 +35,23 @@ export const DEFAULT_PRESSURE_LIMITS = {
3335
* native JS streams or async iterators.
3436
* This is handy for environments such as React Native which need polyfills for the above.
3537
*/
36-
export class DataStream<Data extends any = any> extends BaseObserver<DataStreamListener<Data>> {
37-
dataQueue: Data[];
38+
export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataStreamListener<ParsedData>> {
39+
dataQueue: SourceData[];
3840

3941
protected isClosed: boolean;
4042

4143
protected processingPromise: Promise<void> | null;
4244

4345
protected logger: ILogger;
4446

45-
constructor(protected options?: DataStreamOptions) {
47+
protected mapLine: (line: SourceData) => ParsedData;
48+
49+
constructor(protected options?: DataStreamOptions<ParsedData, SourceData>) {
4650
super();
4751
this.processingPromise = null;
4852
this.isClosed = false;
4953
this.dataQueue = [];
54+
this.mapLine = options?.mapLine ?? ((line) => line as any);
5055

5156
this.logger = options?.logger ?? Logger.get('DataStream');
5257

@@ -84,7 +89,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
8489
/**
8590
* Enqueues data for the consumers to read
8691
*/
87-
enqueueData(data: Data) {
92+
enqueueData(data: SourceData) {
8893
if (this.isClosed) {
8994
throw new Error('Cannot enqueue data into closed stream.');
9095
}
@@ -98,7 +103,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
98103
* Reads data once from the data stream
99104
* @returns a Data payload or Null if the stream closed.
100105
*/
101-
async read(): Promise<Data | null> {
106+
async read(): Promise<ParsedData | null> {
102107
if (this.closed) {
103108
return null;
104109
}
@@ -127,7 +132,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
127132
/**
128133
* Executes a callback for each data item in the stream
129134
*/
130-
forEach(callback: DataStreamCallback<Data>) {
135+
forEach(callback: DataStreamCallback<ParsedData>) {
131136
if (this.dataQueue.length <= this.lowWatermark) {
132137
this.iterateAsyncErrored(async (l) => l.lowWater?.());
133138
}
@@ -154,24 +159,6 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
154159
return (this.processingPromise = this._processQueue());
155160
}
156161

157-
/**
158-
* Creates a new data stream which is a map of the original
159-
*/
160-
map<ReturnData>(callback: (data: Data) => ReturnData): DataStream<ReturnData> {
161-
const stream = new DataStream(this.options);
162-
const l = this.registerListener({
163-
data: async (data) => {
164-
stream.enqueueData(callback(data));
165-
},
166-
closed: () => {
167-
stream.close();
168-
l?.();
169-
}
170-
});
171-
172-
return stream;
173-
}
174-
175162
protected hasDataReader() {
176163
return Array.from(this.listeners.values()).some((l) => !!l.data);
177164
}
@@ -184,7 +171,8 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
184171

185172
if (this.dataQueue.length) {
186173
const data = this.dataQueue.shift()!;
187-
await this.iterateAsyncErrored(async (l) => l.data?.(data));
174+
const mapped = this.mapLine(data);
175+
await this.iterateAsyncErrored(async (l) => l.data?.(mapped));
188176
}
189177

190178
if (this.dataQueue.length <= this.lowWatermark) {
@@ -199,8 +187,8 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
199187
}
200188
}
201189

202-
protected async iterateAsyncErrored(cb: (l: BaseListener) => Promise<void>) {
203-
for (let i of Array.from(this.listeners.values())) {
190+
protected async iterateAsyncErrored(cb: (l: Partial<DataStreamListener<ParsedData>>) => Promise<void>) {
191+
for (let i of this.listeners.values()) {
204192
try {
205193
await cb(i);
206194
} catch (ex) {
Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
import { PowerSyncControlCommand, SqliteBucketStorage } from '@powersync/common';
22

33
export class ReactNativeBucketStorageAdapter extends SqliteBucketStorage {
4-
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
5-
if (payload != null && typeof payload != 'string') {
6-
// For some reason, we need to copy array buffers for RNQS to recognize them. We're doing that here because we
7-
// don't want to pay the cost of a copy on platforms where it's not necessary.
8-
payload = new Uint8Array(payload).buffer;
4+
control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
5+
if (payload instanceof Uint8Array) {
6+
// RNQS doesn't accept Uint8Array arguments - convert to ArrayBuffer first.
7+
payload = uint8ArrayToArrayBuffer(payload);
98
}
109

1110
return super.control(op, payload);
1211
}
1312
}
13+
14+
function uint8ArrayToArrayBuffer(array: Uint8Array): ArrayBuffer {
15+
// SharedArrayBuffer isn't defined on ReactNative, so don't need to cater for that.
16+
const arrayBuffer = array.buffer as ArrayBuffer;
17+
if (array.byteOffset == 0 && array.byteLength == arrayBuffer.byteLength) {
18+
// No copying needed - can use ArrayBuffer as-is
19+
return arrayBuffer;
20+
} else {
21+
// Need to make a copy
22+
return arrayBuffer.slice(array.byteOffset, array.byteOffset + array.byteLength);
23+
}
24+
}

0 commit comments

Comments
 (0)