Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/dull-mugs-carry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@powersync/react-native': patch
'@powersync/diagnostics-app': patch
'@powersync/common': patch
'@powersync/node': patch
'@powersync/op-sqlite': patch
'@powersync/web': patch
---

Improve websocket keepalive logic to reduce keepalive errors.
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
/**
* Invokes the `powersync_control` function for the sync client.
*/
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise<string>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
// No-op for now
}

async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
return await this.writeTransaction(async (tx) => {
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
return raw;
Expand Down
52 changes: 39 additions & 13 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5;

// Keep alive message is sent every period
const KEEP_ALIVE_MS = 20_000;
// The ACK must be received in this period
const KEEP_ALIVE_LIFETIME_MS = 30_000;

// One message of any type must be received in this period.
const SOCKET_TIMEOUT_MS = 30_000;

// One keepalive message must be received in this period.
// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed
// significantly. Therefore this is longer than the socket timeout.
const KEEP_ALIVE_LIFETIME_MS = 90_000;

export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote');

Expand Down Expand Up @@ -267,7 +273,7 @@ export abstract class AbstractRemote {
*/
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const bson = await this.getBSON();
return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson);
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
}

/**
Expand All @@ -279,9 +285,9 @@ export abstract class AbstractRemote {
*/
async socketStreamRaw<T>(
options: SocketSyncStreamOptions,
map: (buffer: Buffer) => T,
map: (buffer: Uint8Array) => T,
bson?: typeof BSON
): Promise<DataStream> {
): Promise<DataStream<T>> {
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
const mimeType = bson == null ? 'application/json' : 'application/bson';

Expand All @@ -304,12 +310,26 @@ export abstract class AbstractRemote {
// automatically as a header.
const userAgent = this.getUserAgent();

let keepAliveTimeout: any;
const resetTimeout = () => {
clearTimeout(keepAliveTimeout);
keepAliveTimeout = setTimeout(() => {
this.logger.error(`No data received on WebSocket in ${SOCKET_TIMEOUT_MS}ms, closing connection.`);
stream.close();
}, SOCKET_TIMEOUT_MS);
};
resetTimeout();

const url = this.options.socketUrlTransformer(request.url);
const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url,
wsCreator: (url) => {
return this.createSocket(url);
const socket = this.createSocket(url);
socket.addEventListener('message', (event) => {
resetTimeout();
});
return socket;
}
}),
setup: {
Expand All @@ -332,18 +352,23 @@ export abstract class AbstractRemote {
rsocket = await connector.connect();
} catch (ex) {
this.logger.error(`Failed to connect WebSocket`, ex);
clearTimeout(keepAliveTimeout);
throw ex;
}

const stream = new DataStream({
resetTimeout();

const stream = new DataStream<T, Uint8Array>({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
}
},
mapLine: map
});

let socketIsClosed = false;
const closeSocket = () => {
clearTimeout(keepAliveTimeout);
if (socketIsClosed) {
return;
}
Expand Down Expand Up @@ -411,7 +436,7 @@ export abstract class AbstractRemote {
return;
}

stream.enqueueData(map(data));
stream.enqueueData(data);
},
onComplete: () => {
stream.close();
Expand Down Expand Up @@ -537,8 +562,9 @@ export abstract class AbstractRemote {
const decoder = new TextDecoder();
let buffer = '';

const stream = new DataStream<T>({
logger: this.logger
const stream = new DataStream<T, string>({
logger: this.logger,
mapLine: mapLine
});

const l = stream.registerListener({
Expand All @@ -550,7 +576,7 @@ export abstract class AbstractRemote {
if (done) {
const remaining = buffer.trim();
if (remaining.length != 0) {
stream.enqueueData(mapLine(remaining));
stream.enqueueData(remaining);
}

stream.close();
Expand All @@ -565,7 +591,7 @@ export abstract class AbstractRemote {
for (var i = 0; i < lines.length - 1; i++) {
var l = lines[i].trim();
if (l.length > 0) {
stream.enqueueData(mapLine(l));
stream.enqueueData(l);
didCompleteLine = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,7 @@ The next upload iteration will be delayed.`);
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
// invocation (local events include refreshed tokens and completed uploads).
// This is a single data stream so that we can handle all control calls from a single place.
let controlInvocations: DataStream<{
command: PowerSyncControlCommand;
payload?: ArrayBuffer | string;
}> | null = null;
let controlInvocations: DataStream<EnqueuedCommand, Uint8Array | EnqueuedCommand> | null = null;

async function connect(instr: EstablishSyncStream) {
const syncOptions: SyncStreamOptions = {
Expand All @@ -860,20 +857,34 @@ The next upload iteration will be delayed.`);
};

if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) {
controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
payload: line
}));
controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => {
if (typeof line == 'string') {
return {
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
payload: line
};
} else {
// Directly enqueued by us
return line;
}
});
} else {
controlInvocations = await remote.socketStreamRaw(
{
...syncOptions,
fetchStrategy: resolvedOptions.fetchStrategy
},
(buffer) => ({
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
payload: buffer
})
(payload: Uint8Array | EnqueuedCommand) => {
if (payload instanceof Uint8Array) {
return {
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
payload: payload
};
} else {
// Directly enqueued by us
return payload;
}
}
);
}

Expand All @@ -900,7 +911,7 @@ The next upload iteration will be delayed.`);
await control(PowerSyncControlCommand.STOP);
}

async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) {
async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) {
const rawResponse = await adapter.control(op, payload ?? null);
await handleInstructions(JSON.parse(rawResponse));
}
Expand Down Expand Up @@ -1139,3 +1150,8 @@ The next upload iteration will be delayed.`);
});
}
}

interface EnqueuedCommand {
command: PowerSyncControlCommand;
payload?: Uint8Array | string;
}
44 changes: 16 additions & 28 deletions packages/common/src/utils/DataStream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Logger, { ILogger } from 'js-logger';
import { BaseListener, BaseObserver } from './BaseObserver.js';

export type DataStreamOptions = {
export type DataStreamOptions<ParsedData, SourceData> = {
mapLine?: (line: SourceData) => ParsedData;

/**
* Close the stream if any consumer throws an error
*/
Expand Down Expand Up @@ -33,20 +35,23 @@ export const DEFAULT_PRESSURE_LIMITS = {
* native JS streams or async iterators.
* This is handy for environments such as React Native which need polyfills for the above.
*/
export class DataStream<Data extends any = any> extends BaseObserver<DataStreamListener<Data>> {
dataQueue: Data[];
export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataStreamListener<ParsedData>> {
dataQueue: SourceData[];

protected isClosed: boolean;

protected processingPromise: Promise<void> | null;

protected logger: ILogger;

constructor(protected options?: DataStreamOptions) {
protected mapLine: (line: SourceData) => ParsedData;

constructor(protected options?: DataStreamOptions<ParsedData, SourceData>) {
super();
this.processingPromise = null;
this.isClosed = false;
this.dataQueue = [];
this.mapLine = options?.mapLine ?? ((line) => line as any);

this.logger = options?.logger ?? Logger.get('DataStream');

Expand Down Expand Up @@ -84,7 +89,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
/**
* Enqueues data for the consumers to read
*/
enqueueData(data: Data) {
enqueueData(data: SourceData) {
if (this.isClosed) {
throw new Error('Cannot enqueue data into closed stream.');
}
Expand All @@ -98,7 +103,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
* Reads data once from the data stream
* @returns a Data payload or Null if the stream closed.
*/
async read(): Promise<Data | null> {
async read(): Promise<ParsedData | null> {
if (this.closed) {
return null;
}
Expand Down Expand Up @@ -127,7 +132,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
/**
* Executes a callback for each data item in the stream
*/
forEach(callback: DataStreamCallback<Data>) {
forEach(callback: DataStreamCallback<ParsedData>) {
if (this.dataQueue.length <= this.lowWatermark) {
this.iterateAsyncErrored(async (l) => l.lowWater?.());
}
Expand All @@ -154,24 +159,6 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
return (this.processingPromise = this._processQueue());
}

/**
* Creates a new data stream which is a map of the original
*/
map<ReturnData>(callback: (data: Data) => ReturnData): DataStream<ReturnData> {
const stream = new DataStream(this.options);
const l = this.registerListener({
data: async (data) => {
stream.enqueueData(callback(data));
},
closed: () => {
stream.close();
l?.();
}
});

return stream;
}

protected hasDataReader() {
return Array.from(this.listeners.values()).some((l) => !!l.data);
}
Expand All @@ -184,7 +171,8 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL

if (this.dataQueue.length) {
const data = this.dataQueue.shift()!;
await this.iterateAsyncErrored(async (l) => l.data?.(data));
const mapped = this.mapLine(data);
await this.iterateAsyncErrored(async (l) => l.data?.(mapped));
}

if (this.dataQueue.length <= this.lowWatermark) {
Expand All @@ -199,8 +187,8 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
}
}

protected async iterateAsyncErrored(cb: (l: BaseListener) => Promise<void>) {
for (let i of Array.from(this.listeners.values())) {
protected async iterateAsyncErrored(cb: (l: Partial<DataStreamListener<ParsedData>>) => Promise<void>) {
for (let i of this.listeners.values()) {
try {
await cb(i);
} catch (ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import { PowerSyncControlCommand, SqliteBucketStorage } from '@powersync/common';

export class ReactNativeBucketStorageAdapter extends SqliteBucketStorage {
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
if (payload != null && typeof payload != 'string') {
// For some reason, we need to copy array buffers for RNQS to recognize them. We're doing that here because we
// don't want to pay the cost of a copy on platforms where it's not necessary.
payload = new Uint8Array(payload).buffer;
control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
if (payload instanceof Uint8Array) {
// RNQS doesn't accept Uint8Array arguments - convert to ArrayBuffer first.
payload = uint8ArrayToArrayBuffer(payload);
}

return super.control(op, payload);
}
}

function uint8ArrayToArrayBuffer(array: Uint8Array): ArrayBuffer {
// SharedArrayBuffer isn't defined on ReactNative, so don't need to cater for that.
const arrayBuffer = array.buffer as ArrayBuffer;
if (array.byteOffset == 0 && array.byteLength == arrayBuffer.byteLength) {
// No copying needed - can use ArrayBuffer as-is
return arrayBuffer;
} else {
// Need to make a copy
return arrayBuffer.slice(array.byteOffset, array.byteOffset + array.byteLength);
}
}
Loading