Skip to content

Commit 4e4350f

Browse files
cleanup connection manager
1 parent e561dda commit 4e4350f

File tree

3 files changed

+43
-30
lines changed

3 files changed

+43
-30
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
478478
* Use {@link connect} to connect again.
479479
*/
480480
async disconnect() {
481-
await this.waitForReady();
482481
return this.connectionManager.disconnect();
483482
}
484483

packages/common/src/client/ConnectionManager.ts

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
100100
// We only need to trigger a disconnect here if we have already reached the point of connecting.
101101
// If we do already have pending options, a disconnect has already been performed.
102102
// The connectInternal method also does a sanity disconnect to prevent straggler connections.
103-
if (!hadPendingOptions) {
103+
// We should also disconnect if we have already completed a connection attempt.
104+
if (!hadPendingOptions || !this.connectingPromise) {
104105
await this.disconnectInternal();
105106
}
106107

@@ -136,46 +137,47 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
136137
* This is protected in an exclusive lock.
137138
* The promise tracks the creation which is used to synchronize disconnect attempts.
138139
*/
139-
this.syncStreamInitPromise = (async () => {
140-
// Always await this if present since we will be populating a new sync implementation shortly
141-
await this.disconnectingPromise;
142-
143-
if (!this.pendingConnectionOptions) {
144-
// A disconnect could have cleared this.
145-
return;
140+
this.syncStreamInitPromise = new Promise(async (resolve, reject) => {
141+
try {
142+
await this.disconnectingPromise;
143+
144+
if (!this.pendingConnectionOptions) {
145+
this.logger.debug('No pending connection options found, not creating sync stream implementation');
146+
// A disconnect could have cleared this.
147+
return;
148+
}
149+
150+
const { connector, options } = this.pendingConnectionOptions;
151+
appliedOptions = options;
152+
153+
this.pendingConnectionOptions = null;
154+
155+
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);
156+
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
157+
this.syncStreamImplementation = sync;
158+
this.syncDisposer = onDispose;
159+
await this.syncStreamImplementation.waitForReady();
160+
resolve();
161+
} catch (error) {
162+
reject(error);
146163
}
147-
148-
const { connector, options } = this.pendingConnectionOptions;
149-
appliedOptions = options;
150-
151-
this.pendingConnectionOptions = null;
152-
153-
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);
154-
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
155-
this.syncStreamImplementation = sync;
156-
this.syncDisposer = onDispose;
157-
await this.syncStreamImplementation.waitForReady();
158-
})();
164+
});
159165

160166
await this.syncStreamInitPromise;
161167
this.syncStreamInitPromise = null;
162168

163169
if (!appliedOptions) {
164-
this.logger.debug('No pending connection options found, not connecting');
165170
// A disconnect could have cleared the options which did not create a syncStreamImplementation
166171
return;
167172
}
168173

169174
// It might be possible that a disconnect triggered between the last check
170175
// and this point. Awaiting here allows the sync stream to be cleared if disconnected.
171-
this.logger.debug('Waiting for disconnect to complete before connecting', this.disconnectingPromise);
172176
await this.disconnectingPromise;
173-
this.logger.debug('Disconnect completed, proceeding to connect');
174177

175178
this.syncStreamImplementation?.triggerCrudUpload();
176179
this.logger.debug('Attempting to connect to PowerSync instance');
177180
await this.syncStreamImplementation?.connect(appliedOptions!);
178-
this.logger.debug('connected to sync stream implementation');
179181
}
180182

181183
/**

packages/web/tests/stream.test.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
BucketChecksum,
3+
createBaseLogger,
34
DataStream,
45
PowerSyncConnectionOptions,
56
WASQLiteOpenFactory,
@@ -11,16 +12,25 @@ import { ConnectedDatabaseUtils, generateConnectedDatabase } from './utils/gener
1112

1213
const UPLOAD_TIMEOUT_MS = 3000;
1314

15+
const logger = createBaseLogger();
16+
logger.useDefaults();
17+
1418
describe('Streaming', { sequential: true }, () => {
1519
describe(
1620
'Streaming - With Web Workers',
1721
{
1822
sequential: true
1923
},
20-
describeStreamingTests(() => generateConnectedDatabase())
24+
describeStreamingTests(() =>
25+
generateConnectedDatabase({
26+
powerSyncOptions: {
27+
logger
28+
}
29+
})
30+
)
2131
);
2232

23-
describe(
33+
describe.only(
2434
'Streaming - Without Web Workers',
2535
{
2636
sequential: true
@@ -30,7 +40,8 @@ describe('Streaming', { sequential: true }, () => {
3040
powerSyncOptions: {
3141
flags: {
3242
useWebWorker: false
33-
}
43+
},
44+
logger
3445
}
3546
})
3647
)
@@ -47,7 +58,8 @@ describe('Streaming', { sequential: true }, () => {
4758
database: new WASQLiteOpenFactory({
4859
dbFilename: 'streaming-opfs.sqlite',
4960
vfs: WASQLiteVFS.OPFSCoopSyncVFS
50-
})
61+
}),
62+
logger
5163
}
5264
})
5365
)
@@ -213,7 +225,7 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
213225
const call = spy.mock.lastCall![1] as PowerSyncConnectionOptions;
214226
expect(call.params!['count']).eq(0);
215227
},
216-
{ timeout: 2000, interval: 100 }
228+
{ timeout: 4000, interval: 100 }
217229
);
218230

219231
expect(

0 commit comments

Comments
 (0)