Skip to content

Commit 48256dc

Browse files
Fix socket issue
1 parent 97da283 commit 48256dc

File tree

2 files changed

+47
-25
lines changed

2 files changed

+47
-25
lines changed

.changeset/bright-yaks-pump.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/react-native': patch
4+
'@powersync/web': patch
5+
---
6+
7+
Fixed bug where a WebSocket connection timeout could cause a an uncaught exception.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
77
import { AbortOperation } from '../../../utils/AbortOperation.js';
88
import { DataStream } from '../../../utils/DataStream.js';
99
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
10-
import {
11-
StreamingSyncLine,
12-
StreamingSyncLineOrCrudUploadComplete,
13-
StreamingSyncRequest
14-
} from './streaming-sync-types.js';
10+
import { StreamingSyncRequest } from './streaming-sync-types.js';
1511
import { WebsocketClientTransport } from './WebsocketClientTransport.js';
1612

1713
export type BSONImplementation = typeof BSON;
@@ -305,6 +301,27 @@ export abstract class AbstractRemote {
305301
// automatically as a header.
306302
const userAgent = this.getUserAgent();
307303

304+
const stream = new DataStream<T, Uint8Array>({
305+
logger: this.logger,
306+
pressure: {
307+
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
308+
},
309+
mapLine: map
310+
});
311+
312+
// Handle upstream abort
313+
if (options.abortSignal?.aborted) {
314+
throw new AbortOperation('Connection request aborted');
315+
} else {
316+
options.abortSignal?.addEventListener(
317+
'abort',
318+
() => {
319+
stream.close();
320+
},
321+
{ once: true }
322+
);
323+
}
324+
308325
let keepAliveTimeout: any;
309326
const resetTimeout = () => {
310327
clearTimeout(keepAliveTimeout);
@@ -315,15 +332,28 @@ export abstract class AbstractRemote {
315332
};
316333
resetTimeout();
317334

335+
// Typescript complains about this being `never` if it's not assigned here.
336+
// This is assigned in `wsCreator`.
337+
let disposeSocketConnectionTimeout = () => {};
338+
318339
const url = this.options.socketUrlTransformer(request.url);
319340
const connector = new RSocketConnector({
320341
transport: new WebsocketClientTransport({
321342
url,
322343
wsCreator: (url) => {
323344
const socket = this.createSocket(url);
345+
disposeSocketConnectionTimeout = stream.registerListener({
346+
closed: () => {
347+
// Allow closing the underlying WebSocket if the stream was closed before the
348+
// RSocket connect completed. This should effectively abort the request.
349+
socket.close();
350+
}
351+
});
352+
324353
socket.addEventListener('message', (event) => {
325354
resetTimeout();
326355
});
356+
327357
return socket;
328358
}
329359
}),
@@ -345,22 +375,19 @@ export abstract class AbstractRemote {
345375
let rsocket: RSocket;
346376
try {
347377
rsocket = await connector.connect();
378+
// The connection is established, we no longer need to monitor the initial timeout
379+
disposeSocketConnectionTimeout();
348380
} catch (ex) {
349381
this.logger.error(`Failed to connect WebSocket`, ex);
350382
clearTimeout(keepAliveTimeout);
383+
if (!stream.closed) {
384+
await stream.close();
385+
}
351386
throw ex;
352387
}
353388

354389
resetTimeout();
355390

356-
const stream = new DataStream<T, Uint8Array>({
357-
logger: this.logger,
358-
pressure: {
359-
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
360-
},
361-
mapLine: map
362-
});
363-
364391
let socketIsClosed = false;
365392
const closeSocket = () => {
366393
clearTimeout(keepAliveTimeout);
@@ -455,18 +482,6 @@ export abstract class AbstractRemote {
455482
}
456483
});
457484

458-
/**
459-
* Handle abort operations here.
460-
* Unfortunately cannot insert them into the connection.
461-
*/
462-
if (options.abortSignal?.aborted) {
463-
stream.close();
464-
} else {
465-
options.abortSignal?.addEventListener('abort', () => {
466-
stream.close();
467-
});
468-
}
469-
470485
return stream;
471486
}
472487

0 commit comments

Comments
 (0)