Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/bright-yaks-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': patch
'@powersync/react-native': patch
'@powersync/web': patch
---

Fixed bug where a WebSocket connection timeout could cause an uncaught exception.
52 changes: 43 additions & 9 deletions demos/react-supabase-todolist/src/app/views/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { LOGIN_ROUTE, SQL_CONSOLE_ROUTE, TODO_LISTS_ROUTE } from '@/app/router';
import { useNavigationPanel } from '@/components/navigation/NavigationPanelContext';
import { useSupabase } from '@/components/providers/SystemProvider';
import ChecklistRtlIcon from '@mui/icons-material/ChecklistRtl';
import ExitToAppIcon from '@mui/icons-material/ExitToApp';
import MenuIcon from '@mui/icons-material/Menu';
Expand All @@ -17,16 +20,15 @@ import {
ListItemButton,
ListItemIcon,
ListItemText,
Menu,
MenuItem,
Toolbar,
Typography,
styled
} from '@mui/material';
import React from 'react';
import { usePowerSync, useStatus } from '@powersync/react';
import React from 'react';
import { useNavigate } from 'react-router-dom';
import { useSupabase } from '@/components/providers/SystemProvider';
import { useNavigationPanel } from '@/components/navigation/NavigationPanelContext';
import { LOGIN_ROUTE, SQL_CONSOLE_ROUTE, TODO_LISTS_ROUTE } from '@/app/router';

export default function ViewsLayout({ children }: { children: React.ReactNode }) {
const powerSync = usePowerSync();
Expand All @@ -37,6 +39,8 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
const [openDrawer, setOpenDrawer] = React.useState(false);
const { title } = useNavigationPanel();

const [connectionAnchor, setConnectionAnchor] = React.useState<null | HTMLElement>(null);

const NAVIGATION_ITEMS = React.useMemo(
() => [
{
Expand Down Expand Up @@ -72,16 +76,47 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
color="inherit"
aria-label="menu"
sx={{ mr: 2 }}
onClick={() => setOpenDrawer(!openDrawer)}
>
onClick={() => setOpenDrawer(!openDrawer)}>
<MenuIcon />
</IconButton>
<Box sx={{ flexGrow: 1 }}>
<Typography>{title}</Typography>
</Box>
<NorthIcon sx={{ marginRight: '-10px' }} color={status?.dataFlowStatus.uploading ? 'primary' : 'inherit'} />
<SouthIcon color={status?.dataFlowStatus.downloading ? 'primary' : 'inherit'} />
{status?.connected ? <WifiIcon /> : <SignalWifiOffIcon />}
<Box
sx={{ cursor: 'pointer' }}
onClick={(event) => {
setConnectionAnchor(event.currentTarget);
}}>
{status?.connected ? <WifiIcon /> : <SignalWifiOffIcon />}
{/* Allows for manual connection and disconnect for testing purposes */}
<Menu
id="connection-menu"
anchorEl={connectionAnchor}
open={Boolean(connectionAnchor)}
onClose={() => setConnectionAnchor(null)}>
{status?.connected ? (
<MenuItem
onClick={(event) => {
event.stopPropagation();
setConnectionAnchor(null);
powerSync.disconnect();
}}>
Disconnect
</MenuItem>
) : supabase ? (
<MenuItem
onClick={(event) => {
event.stopPropagation();
setConnectionAnchor(null);
powerSync.connect(supabase);
}}>
Connect
</MenuItem>
) : null}
</Menu>
</Box>
</Toolbar>
</S.TopBar>
<Drawer anchor={'left'} open={openDrawer} onClose={() => setOpenDrawer(false)}>
Expand All @@ -95,8 +130,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
await item.beforeNavigate?.();
navigate(item.path);
setOpenDrawer(false);
}}
>
}}>
<ListItemIcon>{item.icon()}</ListItemIcon>
<ListItemText primary={item.title} />
</ListItemButton>
Expand Down
65 changes: 40 additions & 25 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { DataStream } from '../../../utils/DataStream.js';
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
import {
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequest
} from './streaming-sync-types.js';
import { StreamingSyncRequest } from './streaming-sync-types.js';
import { WebsocketClientTransport } from './WebsocketClientTransport.js';

export type BSONImplementation = typeof BSON;
Expand Down Expand Up @@ -305,6 +301,27 @@ export abstract class AbstractRemote {
// automatically as a header.
const userAgent = this.getUserAgent();

const stream = new DataStream<T, Uint8Array>({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
},
mapLine: map
});

// Handle upstream abort
if (options.abortSignal?.aborted) {
throw new AbortOperation('Connection request aborted');
} else {
options.abortSignal?.addEventListener(
'abort',
() => {
stream.close();
},
{ once: true }
);
}

let keepAliveTimeout: any;
const resetTimeout = () => {
clearTimeout(keepAliveTimeout);
Expand All @@ -315,15 +332,28 @@ export abstract class AbstractRemote {
};
resetTimeout();

// Typescript complains about this being `never` if it's not assigned here.
// This is assigned in `wsCreator`.
let disposeSocketConnectionTimeout = () => {};

const url = this.options.socketUrlTransformer(request.url);
const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url,
wsCreator: (url) => {
const socket = this.createSocket(url);
disposeSocketConnectionTimeout = stream.registerListener({
closed: () => {
// Allow closing the underlying WebSocket if the stream was closed before the
// RSocket connect completed. This should effectively abort the request.
socket.close();
}
});

socket.addEventListener('message', (event) => {
resetTimeout();
});

return socket;
}
}),
Expand All @@ -345,22 +375,19 @@ export abstract class AbstractRemote {
let rsocket: RSocket;
try {
rsocket = await connector.connect();
// The connection is established, we no longer need to monitor the initial timeout
disposeSocketConnectionTimeout();
} catch (ex) {
this.logger.error(`Failed to connect WebSocket`, ex);
clearTimeout(keepAliveTimeout);
if (!stream.closed) {
await stream.close();
}
throw ex;
}

resetTimeout();

const stream = new DataStream<T, Uint8Array>({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
},
mapLine: map
});

let socketIsClosed = false;
const closeSocket = () => {
clearTimeout(keepAliveTimeout);
Expand Down Expand Up @@ -455,18 +482,6 @@ export abstract class AbstractRemote {
}
});

/**
* Handle abort operations here.
* Unfortunately cannot insert them into the connection.
*/
if (options.abortSignal?.aborted) {
stream.close();
} else {
options.abortSignal?.addEventListener('abort', () => {
stream.close();
});
}

return stream;
}

Expand Down