Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/bright-yaks-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': patch
'@powersync/react-native': patch
'@powersync/web': patch
---

Fixed bug where a WebSocket connection timeout could cause a an uncaught exception.
65 changes: 40 additions & 25 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { DataStream } from '../../../utils/DataStream.js';
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
import {
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequest
} from './streaming-sync-types.js';
import { StreamingSyncRequest } from './streaming-sync-types.js';
import { WebsocketClientTransport } from './WebsocketClientTransport.js';

export type BSONImplementation = typeof BSON;
Expand Down Expand Up @@ -305,6 +301,27 @@ export abstract class AbstractRemote {
// automatically as a header.
const userAgent = this.getUserAgent();

const stream = new DataStream<T, Uint8Array>({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
},
mapLine: map
});

// Handle upstream abort
if (options.abortSignal?.aborted) {
throw new AbortOperation('Connection request aborted');
} else {
options.abortSignal?.addEventListener(
'abort',
() => {
stream.close();
},
{ once: true }
);
}

let keepAliveTimeout: any;
const resetTimeout = () => {
clearTimeout(keepAliveTimeout);
Expand All @@ -315,15 +332,28 @@ export abstract class AbstractRemote {
};
resetTimeout();

// Typescript complains about this being `never` if it's not assigned here.
// This is assigned in `wsCreator`.
let disposeSocketConnectionTimeout = () => {};

const url = this.options.socketUrlTransformer(request.url);
const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url,
wsCreator: (url) => {
const socket = this.createSocket(url);
disposeSocketConnectionTimeout = stream.registerListener({
closed: () => {
// Allow closing the underlying WebSocket if the stream was closed before the
// RSocket connect completed. This should effectively abort the request.
socket.close();
}
});

socket.addEventListener('message', (event) => {
resetTimeout();
});

return socket;
}
}),
Expand All @@ -345,22 +375,19 @@ export abstract class AbstractRemote {
let rsocket: RSocket;
try {
rsocket = await connector.connect();
// The connection is established, we no longer need to monitor the initial timeout
disposeSocketConnectionTimeout();
} catch (ex) {
this.logger.error(`Failed to connect WebSocket`, ex);
clearTimeout(keepAliveTimeout);
if (!stream.closed) {
await stream.close();
}
throw ex;
}

resetTimeout();

const stream = new DataStream<T, Uint8Array>({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
},
mapLine: map
});

let socketIsClosed = false;
const closeSocket = () => {
clearTimeout(keepAliveTimeout);
Expand Down Expand Up @@ -455,18 +482,6 @@ export abstract class AbstractRemote {
}
});

/**
* Handle abort operations here.
* Unfortunately cannot insert them into the connection.
*/
if (options.abortSignal?.aborted) {
stream.close();
} else {
options.abortSignal?.addEventListener('abort', () => {
stream.close();
});
}

return stream;
}

Expand Down