Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/purple-socks-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/web': minor
---

Introduced functionality for releasing the navigator lock.
This resolves an issue related to sequential `connect()` calls breaking all syncing and never reaching a `connected` state. A typical error case was React's StrictMode which could trigger an `useEffect` which was calling `connect()` twice.
1 change: 1 addition & 0 deletions packages/web/src/db/adapters/WebDBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
release: () => void;
};

export interface WebDBAdapter extends DBAdapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
release: () => void;
};

export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
Expand Down Expand Up @@ -64,7 +65,6 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
},
async () => {
resolve();

// Free the lock when the connection is already closed.
if (this.lockAbortController.signal.aborted) {
return;
Expand All @@ -89,7 +89,7 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
);

const newPort = await remote[Comlink.createEndpoint]();
return { port: newPort, identifier };
return { port: newPort, identifier, release: () => this.lockAbortController.abort() };
}

/**
Expand Down
18 changes: 13 additions & 5 deletions packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
import * as Comlink from 'comlink';
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
import { SharedSyncClientProvider } from '../../worker/sync/SharedSyncClientProvider';
import {
ManualSharedSyncPayload,
SharedSyncClientEvent,
Expand All @@ -17,20 +17,28 @@ import {
* The shared worker will trigger methods on this side of the message port
* via this client provider.
*/
class SharedSyncClientProvider extends AbstractSharedSyncClientProvider {
class SharedSyncClientProviderImplementation implements SharedSyncClientProvider {
protected release: (() => void) | null;

constructor(
protected options: WebStreamingSyncImplementationOptions,
public statusChanged: (status: SyncStatusOptions) => void,
protected webDB: WebDBAdapter
) {
super();
this.release = null;
}

async getDBWorkerPort(): Promise<MessagePort> {
const { port } = await this.webDB.shareConnection();
const { port, release } = await this.webDB.shareConnection();
this.release = release;
return Comlink.transfer(port, [port]);
}

async releaseSharedConnection() {
this.release?.();
this.release = null;
}

async fetchCredentials(): Promise<PowerSyncCredentials | null> {
const credentials = await this.options.remote.getCredentials();
if (credentials == null) {
Expand Down Expand Up @@ -159,7 +167,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
/**
* Pass along any sync status updates to this listener
*/
this.clientProvider = new SharedSyncClientProvider(
this.clientProvider = new SharedSyncClientProviderImplementation(
this.webOptions,
(status) => {
this.iterateListeners((l) => this.updateSyncStatus(status));
Expand Down
20 changes: 0 additions & 20 deletions packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts

This file was deleted.

21 changes: 21 additions & 0 deletions packages/web/src/worker/sync/SharedSyncClientProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common';

/**
* The client side port should provide these methods.
*/
export interface SharedSyncClientProvider {
fetchCredentials(): Promise<PowerSyncCredentials | null>;
uploadCrud(): Promise<void>;
statusChanged(status: SyncStatusOptions): void;
getDBWorkerPort(): Promise<MessagePort>;
releaseSharedConnection(): void;

trace(...x: any[]): void;
debug(...x: any[]): void;
info(...x: any[]): void;
log(...x: any[]): void;
warn(...x: any[]): void;
error(...x: any[]): void;
time(label: string): void;
timeEnd(label: string): void;
}
7 changes: 4 additions & 3 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { LockedAsyncDatabaseAdapter } from '../../db/adapters/LockedAsyncDatabas
import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags';
import { WorkerWrappedAsyncDatabaseConnection } from '../../db/adapters/WorkerWrappedAsyncDatabaseConnection';
import { getNavigatorLocks } from '../../shared/navigator';
import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider';
import { SharedSyncClientProvider } from './SharedSyncClientProvider';
import { BroadcastLogger } from './BroadcastLogger';

/**
Expand Down Expand Up @@ -64,7 +64,7 @@ export interface SharedSyncImplementationListener extends StreamingSyncImplement
*/
export type WrappedSyncPort = {
port: MessagePort;
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
clientProvider: Comlink.Remote<SharedSyncClientProvider>;
db?: DBAdapter;
};

Expand Down Expand Up @@ -216,7 +216,7 @@ export class SharedSyncImplementation
addPort(port: MessagePort) {
const portProvider = {
port,
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port)
clientProvider: Comlink.wrap<SharedSyncClientProvider>(port)
};
this.ports.push(portProvider);

Expand Down Expand Up @@ -259,6 +259,7 @@ export class SharedSyncImplementation
}

// Clearing the adapter will result in a new one being opened in connect
await trackedPort.clientProvider.releaseSharedConnection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: Where would we release the lock otherwise? Through trackedPort.db.close()?

I don't have a good mental model of how we're sharing connections here, but I'm worried about whether it's possible for us to release the shared connection here and someone else then re-obtaining the lock on a different connection? (so we'd have two of them when it sounds like we probably only want to have one).

If the lock is only held to avoid the tab being put to sleep as the comment suggests, why aren't we just requesting a randomly-named lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the lock is only held to avoid the tab being put to sleep as the comment suggests, why aren't we just requesting a randomly-named lock?

You are the second person to suggest this. I think you're spot on here.

this.dbAdapter = null;

if (shouldReconnect) {
Expand Down