diff --git a/.changeset/little-bananas-fetch.md b/.changeset/little-bananas-fetch.md
new file mode 100644
index 000000000..5cfc87b22
--- /dev/null
+++ b/.changeset/little-bananas-fetch.md
@@ -0,0 +1,7 @@
+---
+'@powersync/common': minor
+---
+
+- Added additional listeners for `closing` and `closed` events in `AbstractPowerSyncDatabase`.
+- Added `query` and `customQuery` APIs for enhanced watched queries.
+- Added `triggerImmediate` option to the `onChange` API. This allows emitting an initial event which can be useful for downstream use cases.
diff --git a/.changeset/nine-pens-ring.md b/.changeset/nine-pens-ring.md
new file mode 100644
index 000000000..4697d8141
--- /dev/null
+++ b/.changeset/nine-pens-ring.md
@@ -0,0 +1,43 @@
+---
+'@powersync/vue': minor
+---
+
+[Potentially breaking change] The `useQuery` hook results are now explicitly defined as readonly. These values should not be mutated.
+
+- Added the ability to limit re-renders by specifying a `rowComparator` for query results. The `useQuery` hook will only emit `data` changes when the data has changed.
+
+```javascript
+// The data here will maintain previous object references for unchanged items.
+const { data } = useQuery('SELECT * FROM lists WHERE name = ?', ['aname'], {
+ rowComparator: {
+ keyBy: (item) => item.id,
+ compareBy: (item) => JSON.stringify(item)
+ }
+});
+```
+
+- Added the ability to subscribe to an existing instance of a `WatchedQuery`
+
+```vue
+
+
+
+ Loading...
+ Updating results...
+
+ {{ error }}
+
+
+```
diff --git a/.changeset/plenty-rice-protect.md b/.changeset/plenty-rice-protect.md
new file mode 100644
index 000000000..17c805e90
--- /dev/null
+++ b/.changeset/plenty-rice-protect.md
@@ -0,0 +1,6 @@
+---
+'@powersync/react': minor
+'@powersync/vue': minor
+---
+
+- [Internal] Updated implementation to use shared `WatchedQuery` implementation.
diff --git a/.changeset/stale-dots-jog.md b/.changeset/stale-dots-jog.md
new file mode 100644
index 000000000..e1ed00c4b
--- /dev/null
+++ b/.changeset/stale-dots-jog.md
@@ -0,0 +1,5 @@
+---
+'@powersync/web': minor
+---
+
+Improved query behaviour when client is closed. Pending requests will be aborted, future requests will be rejected with an Error. Fixed read and write lock requests not respecting timeout parameter.
diff --git a/.changeset/swift-guests-explain.md b/.changeset/swift-guests-explain.md
new file mode 100644
index 000000000..e1a1a8ad6
--- /dev/null
+++ b/.changeset/swift-guests-explain.md
@@ -0,0 +1,39 @@
+---
+'@powersync/react': minor
+---
+
+- Added the ability to limit re-renders by specifying a `rowComparator` for query results. The `useQuery` hook will only emit `data` changes when the data has changed.
+
+```javascript
+// The data here will maintain previous object references for unchanged items.
+const { data } = useQuery('SELECT * FROM lists WHERE name = ?', ['aname'], {
+ rowComparator: {
+ keyBy: (item) => item.id,
+ compareBy: (item) => JSON.stringify(item)
+ }
+});
+```
+
+- Added the ability to subscribe to an existing instance of a `WatchedQuery`
+
+```jsx
+import { useWatchedQuerySubscription } from '@powersync/react';
+
+const listsQuery = powerSync
+ .query({
+ sql: `SELECT * FROM lists`
+ })
+ .differentialWatch();
+
+export const ListsWidget = (props) => {
+ const { data: lists } = useWatchedQuerySubscription(listsQuery);
+
+ return (
+
+ {lists.map((list) => (
+
{list.name}
+ ))}
+
+ );
+};
+```
diff --git a/.prettierrc b/.prettierrc
index 85bab2c6a..b92dbc1be 100644
--- a/.prettierrc
+++ b/.prettierrc
@@ -5,5 +5,6 @@
"jsxBracketSameLine": true,
"useTabs": false,
"printWidth": 120,
- "trailingComma": "none"
+ "trailingComma": "none",
+ "plugins": ["prettier-plugin-embed", "prettier-plugin-sql"]
}
diff --git a/demos/react-supabase-todolist/src/app/views/sql-console/page.tsx b/demos/react-supabase-todolist/src/app/views/sql-console/page.tsx
index 6b0a13e1d..4cb0e84fb 100644
--- a/demos/react-supabase-todolist/src/app/views/sql-console/page.tsx
+++ b/demos/react-supabase-todolist/src/app/views/sql-console/page.tsx
@@ -1,24 +1,24 @@
-import React from 'react';
-import { useQuery } from '@powersync/react';
-import { Box, Button, Grid, TextField, styled } from '@mui/material';
-import { DataGrid } from '@mui/x-data-grid';
import { NavigationPage } from '@/components/navigation/NavigationPage';
+import { Alert, Box, Button, Grid, TextField, styled } from '@mui/material';
+import { DataGrid } from '@mui/x-data-grid';
+import { useQuery } from '@powersync/react';
+import React from 'react';
export type LoginFormParams = {
email: string;
password: string;
};
-const DEFAULT_QUERY = 'SELECT * FROM lists';
-
-export default function SQLConsolePage() {
- const inputRef = React.useRef();
- const [query, setQuery] = React.useState(DEFAULT_QUERY);
- const { data: querySQLResult } = useQuery(query);
+const DEFAULT_QUERY = /* sql */ `
+ SELECT
+ *
+ FROM
+ lists
+`;
+const TableDisplay = React.memo(({ data }: { data: ReadonlyArray }) => {
const queryDataGridResult = React.useMemo(() => {
- const firstItem = querySQLResult?.[0];
-
+ const firstItem = data?.[0];
return {
columns: firstItem
? Object.keys(firstItem).map((field) => ({
@@ -26,9 +26,48 @@ export default function SQLConsolePage() {
flex: 1
}))
: [],
- rows: querySQLResult
+ rows: data
};
- }, [querySQLResult]);
+ }, [data]);
+
+ return (
+
+ ({ ...r, id: r.id ?? index })) ?? []}
+ columns={queryDataGridResult.columns}
+ initialState={{
+ pagination: {
+ paginationModel: {
+ pageSize: 20
+ }
+ }
+ }}
+ pageSizeOptions={[20]}
+ disableRowSelectionOnClick
+ />
+
+ );
+});
+
+export default function SQLConsolePage() {
+ const inputRef = React.useRef();
+ const [query, setQuery] = React.useState(DEFAULT_QUERY);
+
+ const { data, error } = useQuery(query, [], {
+ /**
+ * We don't use the isFetching status here, we can avoid re-renders if we don't report on it.
+ */
+ reportFetching: false,
+ /**
+ * The query here will only emit results when the query data set changes.
+ * Result sets are compared by serializing each item to JSON and comparing the strings.
+ */
+ rowComparator: {
+ keyBy: (item: any) => JSON.stringify(item),
+ compareBy: (item: any) => JSON.stringify(item)
+ }
+ });
return (
@@ -57,33 +96,13 @@ export default function SQLConsolePage() {
if (queryInput) {
setQuery(queryInput);
}
- }}
- >
+ }}>
Execute Query
-
- {queryDataGridResult ? (
-
- {queryDataGridResult.columns ? (
- ({ ...r, id: r.id ?? index })) ?? []}
- columns={queryDataGridResult.columns}
- initialState={{
- pagination: {
- paginationModel: {
- pageSize: 20
- }
- }
- }}
- pageSizeOptions={[20]}
- disableRowSelectionOnClick
- />
- ) : null}
-
- ) : null}
+ {error ? {error.message} : null}
+
);
diff --git a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx
index 0648a5ca5..7a488792f 100644
--- a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx
+++ b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx
@@ -1,4 +1,7 @@
-import { usePowerSync, useQuery } from '@powersync/react';
+import { NavigationPage } from '@/components/navigation/NavigationPage';
+import { useSupabase } from '@/components/providers/SystemProvider';
+import { TodoItemWidget } from '@/components/widgets/TodoItemWidget';
+import { LISTS_TABLE, TODOS_TABLE, TodoRecord } from '@/library/powersync/AppSchema';
import AddIcon from '@mui/icons-material/Add';
import {
Box,
@@ -15,12 +18,9 @@ import {
styled
} from '@mui/material';
import Fab from '@mui/material/Fab';
+import { usePowerSync, useQuery } from '@powersync/react';
import React, { Suspense } from 'react';
import { useParams } from 'react-router-dom';
-import { useSupabase } from '@/components/providers/SystemProvider';
-import { LISTS_TABLE, TODOS_TABLE, TodoRecord } from '@/library/powersync/AppSchema';
-import { NavigationPage } from '@/components/navigation/NavigationPage';
-import { TodoItemWidget } from '@/components/widgets/TodoItemWidget';
/**
* useSearchParams causes the entire element to fall back to client side rendering
@@ -34,39 +34,36 @@ const TodoEditSection = () => {
const {
data: [listRecord]
- } = useQuery<{ name: string }>(`SELECT name FROM ${LISTS_TABLE} WHERE id = ?`, [listID]);
+ } = useQuery<{ name: string }>(
+ /* sql */ `
+ SELECT
+ name
+ FROM
+ ${LISTS_TABLE}
+ WHERE
+ id = ?
+ `,
+ [listID]
+ );
const { data: todos } = useQuery(
- `SELECT * FROM ${TODOS_TABLE} WHERE list_id=? ORDER BY created_at DESC, id`,
+ /* sql */ `
+ SELECT
+ *
+ FROM
+ ${TODOS_TABLE}
+ WHERE
+ list_id = ?
+ ORDER BY
+ created_at DESC,
+ id
+ `,
[listID]
);
const [showPrompt, setShowPrompt] = React.useState(false);
const nameInputRef = React.createRef();
- const toggleCompletion = async (record: TodoRecord, completed: boolean) => {
- const updatedRecord = { ...record, completed: completed };
- if (completed) {
- const userID = supabase?.currentSession?.user.id;
- if (!userID) {
- throw new Error(`Could not get user ID.`);
- }
- updatedRecord.completed_at = new Date().toISOString();
- updatedRecord.completed_by = userID;
- } else {
- updatedRecord.completed_at = null;
- updatedRecord.completed_by = null;
- }
- await powerSync.execute(
- `UPDATE ${TODOS_TABLE}
- SET completed = ?,
- completed_at = ?,
- completed_by = ?
- WHERE id = ?`,
- [completed, updatedRecord.completed_at, updatedRecord.completed_by, record.id]
- );
- };
-
const createNewTodo = async (description: string) => {
const userID = supabase?.currentSession?.user.id;
if (!userID) {
@@ -74,21 +71,16 @@ const TodoEditSection = () => {
}
await powerSync.execute(
- `INSERT INTO
- ${TODOS_TABLE}
- (id, created_at, created_by, description, list_id)
- VALUES
- (uuid(), datetime(), ?, ?, ?)`,
+ /* sql */ `
+ INSERT INTO
+ ${TODOS_TABLE} (id, created_at, created_by, description, list_id)
+ VALUES
+ (uuid (), datetime (), ?, ?, ?)
+ `,
[userID, description, listID!]
);
};
- const deleteTodo = async (id: string) => {
- await powerSync.writeTransaction(async (tx) => {
- await tx.execute(`DELETE FROM ${TODOS_TABLE} WHERE id = ?`, [id]);
- });
- };
-
if (!listRecord) {
return (
@@ -106,13 +98,7 @@ const TodoEditSection = () => {
{todos.map((r) => (
- deleteTodo(r.id)}
- isComplete={r.completed == 1}
- toggleCompletion={() => toggleCompletion(r, !r.completed)}
- />
+
))}
@@ -129,8 +115,7 @@ const TodoEditSection = () => {
await createNewTodo(nameInputRef.current!.value);
setShowPrompt(false);
}
- }}
- >
+ }}>
{'Create Todo Item'}
Enter a description for a new todo item
diff --git a/demos/react-supabase-todolist/src/app/views/todo-lists/page.tsx b/demos/react-supabase-todolist/src/app/views/todo-lists/page.tsx
index 1f1a686b7..edb168a2b 100644
--- a/demos/react-supabase-todolist/src/app/views/todo-lists/page.tsx
+++ b/demos/react-supabase-todolist/src/app/views/todo-lists/page.tsx
@@ -1,4 +1,9 @@
-import { usePowerSync, useStatus } from '@powersync/react';
+import { NavigationPage } from '@/components/navigation/NavigationPage';
+import { useSupabase } from '@/components/providers/SystemProvider';
+import { GuardBySync } from '@/components/widgets/GuardBySync';
+import { SearchBarWidget } from '@/components/widgets/SearchBarWidget';
+import { TodoListsWidget } from '@/components/widgets/TodoListsWidget';
+import { LISTS_TABLE } from '@/library/powersync/AppSchema';
import AddIcon from '@mui/icons-material/Add';
import {
Box,
@@ -12,18 +17,12 @@ import {
styled
} from '@mui/material';
import Fab from '@mui/material/Fab';
+import { usePowerSync } from '@powersync/react';
import React from 'react';
-import { useSupabase } from '@/components/providers/SystemProvider';
-import { LISTS_TABLE } from '@/library/powersync/AppSchema';
-import { NavigationPage } from '@/components/navigation/NavigationPage';
-import { SearchBarWidget } from '@/components/widgets/SearchBarWidget';
-import { TodoListsWidget } from '@/components/widgets/TodoListsWidget';
-import { GuardBySync } from '@/components/widgets/GuardBySync';
export default function TodoListsPage() {
const powerSync = usePowerSync();
const supabase = useSupabase();
- const status = useStatus();
const [showPrompt, setShowPrompt] = React.useState(false);
const nameInputRef = React.createRef();
@@ -36,7 +35,12 @@ export default function TodoListsPage() {
}
const res = await powerSync.execute(
- `INSERT INTO ${LISTS_TABLE} (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?) RETURNING *`,
+ /* sql */ `
+ INSERT INTO
+ ${LISTS_TABLE} (id, created_at, name, owner_id)
+ VALUES
+ (uuid (), datetime (), ?, ?) RETURNING *
+ `,
[name, userID]
);
@@ -71,8 +75,7 @@ export default function TodoListsPage() {
}
}}
aria-labelledby="alert-dialog-title"
- aria-describedby="alert-dialog-description"
- >
+ aria-describedby="alert-dialog-description">
{'Create Todo List'}
Enter a name for a new todo list
diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx
index 9bbbb4f11..8a3f3c209 100644
--- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx
+++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx
@@ -1,9 +1,9 @@
import { configureFts } from '@/app/utils/fts_setup';
-import { AppSchema } from '@/library/powersync/AppSchema';
+import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/powersync/AppSchema';
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
-import { createBaseLogger, LogLevel, PowerSyncDatabase } from '@powersync/web';
+import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web';
import React, { Suspense } from 'react';
import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext';
@@ -17,10 +17,46 @@ export const db = new PowerSyncDatabase({
}
});
+export type EnhancedListRecord = ListRecord & { total_tasks: number; completed_tasks: number };
+
+export type QueryStore = {
+ lists: DifferentialWatchedQuery;
+};
+
+const QueryStore = React.createContext(null);
+export const useQueryStore = () => React.useContext(QueryStore);
+
export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
- const [connector] = React.useState(new SupabaseConnector());
+ const [connector] = React.useState(() => new SupabaseConnector());
const [powerSync] = React.useState(db);
+ const [queryStore] = React.useState(() => {
+ const listsQuery = db
+ .query({
+ sql: /* sql */ `
+ SELECT
+ ${LISTS_TABLE}.*,
+ COUNT(${TODOS_TABLE}.id) AS total_tasks,
+ SUM(
+ CASE
+ WHEN ${TODOS_TABLE}.completed = true THEN 1
+ ELSE 0
+ END
+ ) as completed_tasks
+ FROM
+ ${LISTS_TABLE}
+ LEFT JOIN ${TODOS_TABLE} ON ${LISTS_TABLE}.id = ${TODOS_TABLE}.list_id
+ GROUP BY
+ ${LISTS_TABLE}.id;
+ `
+ })
+ .differentialWatch();
+
+ return {
+ lists: listsQuery
+ };
+ });
+
React.useEffect(() => {
const logger = createBaseLogger();
logger.useDefaults(); // eslint-disable-line
@@ -30,7 +66,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
powerSync.init();
const l = connector.registerListener({
- initialized: () => { },
+ initialized: () => {},
sessionStarted: () => {
powerSync.connect(connector);
}
@@ -47,11 +83,13 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
return (
}>
-
-
- {children}
-
-
+
+
+
+ {children}
+
+
+
);
};
diff --git a/demos/react-supabase-todolist/src/components/widgets/ListItemWidget.tsx b/demos/react-supabase-todolist/src/components/widgets/ListItemWidget.tsx
index bdb68b82d..e2ac183ef 100644
--- a/demos/react-supabase-todolist/src/components/widgets/ListItemWidget.tsx
+++ b/demos/react-supabase-todolist/src/components/widgets/ListItemWidget.tsx
@@ -1,73 +1,90 @@
-import React from 'react';
import {
- ListItem,
+ Avatar,
+ Box,
IconButton,
+ ListItem,
ListItemAvatar,
- Avatar,
+ ListItemButton,
ListItemText,
- Box,
Paper,
- styled,
- ListItemButton
+ styled
} from '@mui/material';
+import React from 'react';
-import DeleteIcon from '@mui/icons-material/DeleteOutline';
+import { TODO_LISTS_ROUTE } from '@/app/router';
+import { LISTS_TABLE, TODOS_TABLE } from '@/library/powersync/AppSchema';
import RightIcon from '@mui/icons-material/ArrowRightAlt';
+import DeleteIcon from '@mui/icons-material/DeleteOutline';
import ListIcon from '@mui/icons-material/ListAltOutlined';
+import { usePowerSync } from '@powersync/react';
+import { useNavigate } from 'react-router-dom';
export type ListItemWidgetProps = {
+ id: string;
title: string;
description: string;
selected?: boolean;
- onDelete: () => void;
- onPress: () => void;
};
-export const ListItemWidget: React.FC = (props) => {
+export const ListItemWidget: React.FC = React.memo((props) => {
+ const { id, title, description, selected } = props;
+
+ const powerSync = usePowerSync();
+ const navigate = useNavigate();
+
+ const deleteList = React.useCallback(async () => {
+ await powerSync.writeTransaction(async (tx) => {
+ // Delete associated todos
+ await tx.execute(
+ /* sql */ `
+ DELETE FROM ${TODOS_TABLE}
+ WHERE
+ list_id = ?
+ `,
+ [id]
+ );
+ // Delete list record
+ await tx.execute(
+ /* sql */ `
+ DELETE FROM ${LISTS_TABLE}
+ WHERE
+ id = ?
+ `,
+ [id]
+ );
+ });
+ }, [id]);
+
+ const openList = React.useCallback(() => {
+ navigate(TODO_LISTS_ROUTE + '/' + id);
+ }, [id]);
+
return (
- {
- props.onDelete();
- }}
- >
+
- {
- props.onPress();
- }}
- >
+
- }
- >
- {
- props.onPress();
- }}
- selected={props.selected}
- >
+ }>
+
-
+
);
-};
+});
export namespace S {
export const MainPaper = styled(Paper)`
diff --git a/demos/react-supabase-todolist/src/components/widgets/TodoItemWidget.tsx b/demos/react-supabase-todolist/src/components/widgets/TodoItemWidget.tsx
index 8fac060de..d44418c4a 100644
--- a/demos/react-supabase-todolist/src/components/widgets/TodoItemWidget.tsx
+++ b/demos/react-supabase-todolist/src/components/widgets/TodoItemWidget.tsx
@@ -1,51 +1,88 @@
-import React from 'react';
-import { ListItem, IconButton, ListItemAvatar, ListItemText, Box, styled, Paper, ListItemButton } from '@mui/material';
-import DeleteIcon from '@mui/icons-material/DeleteOutline';
-import CheckBoxOutlineBlankIcon from '@mui/icons-material/CheckBoxOutlineBlank';
+import { TODOS_TABLE } from '@/library/powersync/AppSchema';
import CheckBoxIcon from '@mui/icons-material/CheckBox';
+import CheckBoxOutlineBlankIcon from '@mui/icons-material/CheckBoxOutlineBlank';
+import DeleteIcon from '@mui/icons-material/DeleteOutline';
+import { Box, IconButton, ListItem, ListItemAvatar, ListItemButton, ListItemText, Paper, styled } from '@mui/material';
+import { usePowerSync } from '@powersync/react';
+import React from 'react';
+import { useSupabase } from '../providers/SystemProvider';
export type TodoItemWidgetProps = {
+ id: string;
description: string | null;
isComplete: boolean;
- onDelete: () => void;
- toggleCompletion: () => void;
};
-export const TodoItemWidget: React.FC = (props) => {
+export const TodoItemWidget: React.FC = React.memo((props) => {
+ const { id, description, isComplete } = props;
+
+ const powerSync = usePowerSync();
+ const supabase = useSupabase();
+
+ const deleteTodo = React.useCallback(async () => {
+ await powerSync.writeTransaction(async (tx) => {
+ await tx.execute(
+ /* sql */ `
+ DELETE FROM ${TODOS_TABLE}
+ WHERE
+ id = ?
+ `,
+ [id]
+ );
+ });
+ }, [id]);
+
+ const toggleCompletion = React.useCallback(async () => {
+ let completedAt: String | null = null;
+ let completedBy: String | null = null;
+
+ if (!isComplete) {
+ // Need to set to Completed. This requires a session.
+ const userID = supabase?.currentSession?.user.id;
+ if (!userID) {
+ throw new Error(`Could not get user ID.`);
+ }
+ completedAt = new Date().toISOString();
+ completedBy = userID;
+ }
+
+ await powerSync.execute(
+ /* sql */ `
+ UPDATE ${TODOS_TABLE}
+ SET
+ completed = ?,
+ completed_at = ?,
+ completed_by = ?
+ WHERE
+ id = ?
+ `,
+ [!isComplete, completedAt, completedBy, id]
+ );
+ }, [id, isComplete]);
+
return (
- {
- props.onDelete();
- }}
- >
+
- }
- >
- {
- props.toggleCompletion();
- }}
- >
+ }>
+
{props.isComplete ? : }
-
+
);
-};
+});
namespace S {
export const MainPaper = styled(Paper)`
diff --git a/demos/react-supabase-todolist/src/components/widgets/TodoListsWidget.tsx b/demos/react-supabase-todolist/src/components/widgets/TodoListsWidget.tsx
index 9352115dc..d2f12b9fa 100644
--- a/demos/react-supabase-todolist/src/components/widgets/TodoListsWidget.tsx
+++ b/demos/react-supabase-todolist/src/components/widgets/TodoListsWidget.tsx
@@ -1,9 +1,7 @@
-import { usePowerSync, useQuery } from '@powersync/react';
import { List } from '@mui/material';
-import { useNavigate } from 'react-router-dom';
+import { useWatchedQuerySubscription } from '@powersync/react';
+import { useQueryStore } from '../providers/SystemProvider';
import { ListItemWidget } from './ListItemWidget';
-import { LISTS_TABLE, ListRecord, TODOS_TABLE } from '@/library/powersync/AppSchema';
-import { TODO_LISTS_ROUTE } from '@/app/router';
export type TodoListsWidgetProps = {
selectedId?: string;
@@ -14,30 +12,10 @@ const description = (total: number, completed: number = 0) => {
};
export function TodoListsWidget(props: TodoListsWidgetProps) {
- const powerSync = usePowerSync();
- const navigate = useNavigate();
+ const queries = useQueryStore();
+ const { data: listRecords, isLoading } = useWatchedQuerySubscription(queries!.lists);
- const { data: listRecords, isLoading } = useQuery(`
- SELECT
- ${LISTS_TABLE}.*, COUNT(${TODOS_TABLE}.id) AS total_tasks, SUM(CASE WHEN ${TODOS_TABLE}.completed = true THEN 1 ELSE 0 END) as completed_tasks
- FROM
- ${LISTS_TABLE}
- LEFT JOIN ${TODOS_TABLE}
- ON ${LISTS_TABLE}.id = ${TODOS_TABLE}.list_id
- GROUP BY
- ${LISTS_TABLE}.id;
- `);
-
- const deleteList = async (id: string) => {
- await powerSync.writeTransaction(async (tx) => {
- // Delete associated todos
- await tx.execute(`DELETE FROM ${TODOS_TABLE} WHERE list_id = ?`, [id]);
- // Delete list record
- await tx.execute(`DELETE FROM ${LISTS_TABLE} WHERE id = ?`, [id]);
- });
- };
-
- if (isLoading) {
+ if (isLoading && listRecords.length == 0) {
return Loading...
;
}
@@ -46,13 +24,10 @@ export function TodoListsWidget(props: TodoListsWidgetProps) {
{listRecords.map((r) => (
deleteList(r.id)}
- onPress={() => {
- navigate(TODO_LISTS_ROUTE + '/' + r.id);
- }}
/>
))}
diff --git a/demos/yjs-react-supabase-text-collab/.env.local.template b/demos/yjs-react-supabase-text-collab/.env.local.template
index 1282709c8..bcdc041a8 100644
--- a/demos/yjs-react-supabase-text-collab/.env.local.template
+++ b/demos/yjs-react-supabase-text-collab/.env.local.template
@@ -1,3 +1,5 @@
-VITE_SUPABASE_URL=
+VITE_SUPABASE_URL=http://localhost:54321
VITE_SUPABASE_ANON_KEY=
-VITE_POWERSYNC_URL=
+VITE_POWERSYNC_URL=http://localhost:8080
+# Only required for development with a local Supabase instance
+PS_SUPABASE_JWT_SECRET=
\ No newline at end of file
diff --git a/demos/yjs-react-supabase-text-collab/CHANGELOG.md b/demos/yjs-react-supabase-text-collab/CHANGELOG.md
index 908563a2a..a4b16065c 100644
--- a/demos/yjs-react-supabase-text-collab/CHANGELOG.md
+++ b/demos/yjs-react-supabase-text-collab/CHANGELOG.md
@@ -1,5 +1,11 @@
# yjs-react-supabase-text-collab
+## 0.2.0
+
+- Added a local development option with local Supabase and PowerSync services.
+- Updated Sync rules to use client parameters. Each client now only syncs `document` and `document_updates` for the document being edited.
+- Updated `PowerSyncYjsProvider` to use an incremental watched query for `document_updates`.
+
## 0.1.16
### Patch Changes
diff --git a/demos/yjs-react-supabase-text-collab/README.md b/demos/yjs-react-supabase-text-collab/README.md
index 7d6e916b7..34810d4ae 100644
--- a/demos/yjs-react-supabase-text-collab/README.md
+++ b/demos/yjs-react-supabase-text-collab/README.md
@@ -17,12 +17,44 @@ pnpm install
pnpm build:packages
```
+#### Quick Start: Local Development
+
+This demo can be started with local PowerSync and Supabase services.
+
+Follow the [instructions](https://supabase.com/docs/guides/cli/getting-started) for configuring Supabase locally.
+
+Copy the environment variables template file
+
+```bash
+cp .env.template .env.local
+```
+
+Start the Supabase project
+
+```bash
+supabase start
+```
+
+Copy the `anon key` and `JWT secret` into the `.env` file.
+
+Run the PowerSync service with
+
+```bash
+docker run \
+-p 8080:8080 \
+-e POWERSYNC_CONFIG_B64=$(base64 -i ./powersync.yaml) \
+-e POWERSYNC_SYNC_RULES_B64=$(base64 -i ./sync-rules.yaml) \
+--env-file ./.env.local \
+--network supabase_network_yjs-react-supabase-text-collab \
+--name my-powersync journeyapps/powersync-service:latest
+```
+
### 2. Create project on Supabase and set up Postgres
This demo app uses Supabase as its Postgres database and backend:
1. [Create a new project on the Supabase dashboard](https://supabase.com/dashboard/projects).
-2. Go to the Supabase SQL Editor for your new project and execute the SQL statements in [`database.sql`](database.sql) to create the database schema, database functions, and publication needed for PowerSync.
+2. Go to the Supabase SQL Editor for your new project and execute the SQL statements in [`database.sql`](./supabase/migrations/20250618064101_configure_powersync.sql) to create the database schema, database functions, and publication needed for PowerSync.
3. Enable "anonymous sign-ins" for the project [here](https://supabase.com/dashboard/project/_/auth/providers).
### 3. Create new project on PowerSync and connect to Supabase/Postgres
@@ -108,8 +140,8 @@ To-do
- [ ] Add button to the UI allowing the user to merge the Yjs edits i.e. `document_update` rows. Invoke `merge-document-updates` edge function in Supabase.
- [ ] Prepopulate sample text into newly created documents.
- [ ] Improve performance / rework inefficient parts of implementation:
- - [ ] Optimize the 'seen updates' approach to filter the `SELECT` query for updates that have not yet been seen — perhaps based on `created_at` timestamp generated on the Postgres side. For the watch query — watch for certain tables instead of watching a query. This will allow querying `document_updates` with a dynamic parameter.
- - [ ] Flush 'seen updates' when `document_updates` are merged.
+ - [] Optimize the 'seen updates' approach to filter the `SELECT` query for updates that have not yet been seen — perhaps based on `created_at` timestamp generated on the Postgres side. For the watch query — watch for certain tables instead of watching a query. This will allow querying `document_updates` with a dynamic parameter.
+ - [x] Flush 'seen updates' when `document_updates` are merged.
Done
diff --git a/demos/yjs-react-supabase-text-collab/package.json b/demos/yjs-react-supabase-text-collab/package.json
index 64605392c..1f8c469d1 100644
--- a/demos/yjs-react-supabase-text-collab/package.json
+++ b/demos/yjs-react-supabase-text-collab/package.json
@@ -1,6 +1,6 @@
{
"name": "yjs-react-supabase-text-collab",
- "version": "0.1.16",
+ "version": "0.2.0",
"private": true,
"scripts": {
"dev": "vite",
diff --git a/demos/yjs-react-supabase-text-collab/powersync.yaml b/demos/yjs-react-supabase-text-collab/powersync.yaml
new file mode 100644
index 000000000..bedd899f4
--- /dev/null
+++ b/demos/yjs-react-supabase-text-collab/powersync.yaml
@@ -0,0 +1,47 @@
+# yaml-language-server: $schema=https://unpkg.com/@powersync/service-schema@latest/json-schema/powersync-config.json
+
+# This is a local development configuration file for PowerSync.
+
+# Note that this example uses YAML custom tags for environment variable substitution.
+# Using `!env [variable name]` will substitute the value of the environment variable named
+# [variable name].
+#
+# Only environment variables with names starting with `PS_` can be substituted.
+#
+# If using VS Code see the `.vscode/settings.json` definitions which define custom tags.
+
+# Settings for telemetry reporting
+# See https://docs.powersync.com/self-hosting/telemetry
+telemetry:
+ # Opt out of reporting anonymized usage metrics to PowerSync telemetry service
+ disable_telemetry_sharing: false
+
+# Settings for source database replication
+replication:
+ # Specify database connection details
+ # Note only 1 connection is currently supported
+ # Multiple connection support is on the roadmap
+ connections:
+ - type: postgresql
+ uri: postgresql://postgres:postgres@supabase_db_yjs-react-supabase-text-collab:5432/postgres
+
+ # SSL settings
+ sslmode: disable # 'verify-full' (default) or 'verify-ca' or 'disable'
+
+# This is valid if using the `mongo` service defined in `ps-mongo.yaml`
+
+# Connection settings for sync bucket storage
+storage:
+ # This uses Postgres bucket storage for simplicity
+ type: postgresql
+ uri: postgresql://postgres:postgres@supabase_db_yjs-react-supabase-text-collab:5432/postgres
+ # SSL settings
+ sslmode: disable # 'verify-full' (default) or 'verify-ca' or 'disable'
+
+# The port which the PowerSync API server will listen on
+port: 8080
+
+# Client (application end user) authentication settings
+client_auth:
+ supabase: true
+ supabase_jwt_secret: !env PS_SUPABASE_JWT_SECRET
diff --git a/demos/yjs-react-supabase-text-collab/src/app/editor/page.tsx b/demos/yjs-react-supabase-text-collab/src/app/editor/page.tsx
index c7cf44920..c5b808608 100644
--- a/demos/yjs-react-supabase-text-collab/src/app/editor/page.tsx
+++ b/demos/yjs-react-supabase-text-collab/src/app/editor/page.tsx
@@ -1,22 +1,23 @@
-import { usePowerSync, useQuery, useStatus } from '@powersync/react';
-import { Box, Container, FormControlLabel, Switch, Typography } from '@mui/material';
-import { useEffect, useMemo, useState } from 'react';
+import { connector, useSupabase } from '@/components/providers/SystemProvider';
import MenuBar from '@/components/widgets/MenuBar';
import { PowerSyncYjsProvider } from '@/library/powersync/PowerSyncYjsProvider';
+import { Box, Container, FormControlLabel, Switch, Typography } from '@mui/material';
+import { usePowerSync, useQuery, useStatus } from '@powersync/react';
import Collaboration from '@tiptap/extension-collaboration';
import Highlight from '@tiptap/extension-highlight';
import TaskItem from '@tiptap/extension-task-item';
import TaskList from '@tiptap/extension-task-list';
import { EditorContent, useEditor } from '@tiptap/react';
import StarterKit from '@tiptap/starter-kit';
+import { useEffect, useMemo, useState } from 'react';
+import { useParams } from 'react-router-dom';
import * as Y from 'yjs';
import './tiptap-styles.scss';
-import { useParams } from 'react-router-dom';
-import { connector } from '@/components/providers/SystemProvider';
export default function EditorPage() {
const powerSync = usePowerSync();
const status = useStatus();
+ const supabase = useSupabase();
const { id: documentId } = useParams();
// cache the last edited document ID in local storage
@@ -33,6 +34,12 @@ export default function EditorPage() {
useEffect(() => {
const provider = new PowerSyncYjsProvider(ydoc, powerSync, documentId!);
+ // Only sync changes for this document
+ powerSync.connect(supabase!, {
+ params: {
+ document_id: documentId!
+ }
+ });
return () => {
provider.destroy();
};
diff --git a/demos/yjs-react-supabase-text-collab/src/components/providers/SystemProvider.tsx b/demos/yjs-react-supabase-text-collab/src/components/providers/SystemProvider.tsx
index a939d8b40..7cbd439d8 100644
--- a/demos/yjs-react-supabase-text-collab/src/components/providers/SystemProvider.tsx
+++ b/demos/yjs-react-supabase-text-collab/src/components/providers/SystemProvider.tsx
@@ -1,8 +1,8 @@
import { AppSchema } from '@/library/powersync/AppSchema';
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
+import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
import { createBaseLogger, LogLevel, PowerSyncDatabase } from '@powersync/web';
-import { CircularProgress } from '@mui/material';
import React, { Suspense } from 'react';
const SupabaseContext = React.createContext(null);
@@ -13,7 +13,6 @@ export const powerSync = new PowerSyncDatabase({
schema: AppSchema
});
export const connector = new SupabaseConnector();
-powerSync.connect(connector);
const logger = createBaseLogger();
logger.useDefaults();
diff --git a/demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts b/demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts
index 1b0e28447..4e41bc719 100644
--- a/demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts
+++ b/demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts
@@ -1,9 +1,9 @@
import * as Y from 'yjs';
import { b64ToUint8Array, Uint8ArrayTob64 } from '@/library/binary-utils';
-import { v4 as uuidv4 } from 'uuid';
import { AbstractPowerSyncDatabase } from '@powersync/web';
import { ObservableV2 } from 'lib0/observable';
+import { DocumentUpdates } from './AppSchema';
export interface PowerSyncYjsEvents {
/**
@@ -24,7 +24,6 @@ export interface PowerSyncYjsEvents {
* @param documentId
*/
export class PowerSyncYjsProvider extends ObservableV2 {
- private seenDocUpdates = new Set();
private abortController = new AbortController();
constructor(
@@ -33,40 +32,61 @@ export class PowerSyncYjsProvider extends ObservableV2 {
public readonly documentId: string
) {
super();
+ /**
+ * Watch for changes to the `document_updates` table for this document.
+ * This will be used to apply updates from other editors.
+ * When we received an added item we apply the update to the Yjs document.
+ */
+ const updateQuery = db
+ .query({
+ sql: /* sql */ `
+ SELECT
+ *
+ FROM
+ document_updates
+ WHERE
+ document_id = ?
+ `,
+ parameters: [documentId]
+ })
+ .differentialWatch();
- const updates = db.watch('SELECT * FROM document_updates WHERE document_id = ?', [documentId], {
- signal: this.abortController.signal
- });
+ this.abortController.signal.addEventListener(
+ 'abort',
+ () => {
+ // Stop the watch query when the abort signal is triggered
+ updateQuery.close();
+ },
+ { once: true }
+ );
this._storeUpdate = this._storeUpdate.bind(this);
this.destroy = this.destroy.bind(this);
let synced = false;
-
- const watchLoop = async () => {
- for await (const results of updates) {
- if (this.abortController.signal.aborted) {
- break;
- }
-
- // New data detected in the database
- for (const update of results.rows!._array) {
- // Ignore any updates we've already seen
- if (!this.seenDocUpdates.has(update.id)) {
- this.seenDocUpdates.add(update.id);
- // apply the update from the database to the doc
- const origin = this;
- Y.applyUpdateV2(doc, b64ToUint8Array(update.update_b64), origin);
- }
+ const origin = this;
+ updateQuery.registerListener({
+ onDiff: async (diff) => {
+ for (const added of diff.added) {
+ /**
+ * Local document updates get stored to the database and synced.
+ *
+ * These updates here originate from syncing remote updates.
+ * Applying these updates to YJS should not result in the `_storeUpdate`
+ * handler creating a new `document_update` record since we mark the `origin`
+ * here and check the `origin` in `_storeUpdate`.
+ */
+ Y.applyUpdateV2(doc, b64ToUint8Array(added.update_b64), origin);
}
-
if (!synced) {
synced = true;
this.emit('synced', []);
}
+ },
+ onError: (error) => {
+ console.error('Error in PowerSyncYjsProvider update query:', error);
}
- };
- watchLoop();
+ });
doc.on('updateV2', this._storeUpdate);
doc.on('destroy', this.destroy);
@@ -77,14 +97,16 @@ export class PowerSyncYjsProvider extends ObservableV2 {
// update originated from the database / PowerSync - ignore
return;
}
- // update originated from elsewhere - save to the database
- const docUpdateId = uuidv4();
- this.seenDocUpdates.add(docUpdateId);
- await this.db.execute('INSERT INTO document_updates(id, document_id, update_b64) VALUES(?, ?, ?)', [
- docUpdateId,
- this.documentId,
- Uint8ArrayTob64(update)
- ]);
+
+ await this.db.execute(
+ /* sql */ `
+ INSERT INTO
+ document_updates (id, document_id, update_b64)
+ VALUES
+ (uuid (), ?, ?)
+ `,
+ [this.documentId, Uint8ArrayTob64(update)]
+ );
}
/**
@@ -102,6 +124,13 @@ export class PowerSyncYjsProvider extends ObservableV2 {
* Also call `destroy()` to remove any event listeners and prevent future updates to the database.
*/
async deleteData() {
- await this.db.execute('DELETE FROM document_updates WHERE document_id = ?', [this.documentId]);
+ await this.db.execute(
+ /* sql */ `
+ DELETE FROM document_updates
+ WHERE
+ document_id = ?
+ `,
+ [this.documentId]
+ );
}
}
diff --git a/demos/yjs-react-supabase-text-collab/supabase/config.toml b/demos/yjs-react-supabase-text-collab/supabase/config.toml
index b3bdbbad6..62883d2db 100644
--- a/demos/yjs-react-supabase-text-collab/supabase/config.toml
+++ b/demos/yjs-react-supabase-text-collab/supabase/config.toml
@@ -79,6 +79,8 @@ enable_refresh_token_rotation = true
refresh_token_reuse_interval = 10
# Allow/disallow new user signups to your project.
enable_signup = true
+enable_anonymous_sign_ins = true
+
[auth.email]
# Allow/disallow new user signups via email to your project.
diff --git a/demos/yjs-react-supabase-text-collab/database.sql b/demos/yjs-react-supabase-text-collab/supabase/migrations/20250618064101_configure_powersync.sql
similarity index 100%
rename from demos/yjs-react-supabase-text-collab/database.sql
rename to demos/yjs-react-supabase-text-collab/supabase/migrations/20250618064101_configure_powersync.sql
diff --git a/demos/yjs-react-supabase-text-collab/sync-rules.yaml b/demos/yjs-react-supabase-text-collab/sync-rules.yaml
index d8b77a6e1..236b6bbd3 100644
--- a/demos/yjs-react-supabase-text-collab/sync-rules.yaml
+++ b/demos/yjs-react-supabase-text-collab/sync-rules.yaml
@@ -1,10 +1,11 @@
+# yaml-language-server: $schema=https://unpkg.com/@powersync/service-sync-rules@latest/schema/sync_rules.json
+
# Sync-rule docs: https://docs.powersync.com/usage/sync-rules
bucket_definitions:
documents:
- data:
- - SELECT * FROM documents
- updates:
# Allow remote changes to be synchronized even while there are local changes
priority: 0
+ parameters: SELECT (request.parameters() ->> 'document_id') as document_id
data:
- - SELECT id, document_id, base64(update_data) as update_b64 FROM document_updates
+ - SELECT * FROM documents WHERE id = bucket.document_id
+ - SELECT id, document_id, base64(update_data) as update_b64 FROM document_updates WHERE document_id = bucket.document_id
diff --git a/docs/.gitignore b/docs/.gitignore
index 0814a0d06..c32a3c8c3 100644
--- a/docs/.gitignore
+++ b/docs/.gitignore
@@ -19,6 +19,7 @@ npm-debug.log*
docs/attachments-sdk/
docs/common-sdk/
+docs/node-sdk/
docs/react-native-sdk/
docs/react-sdk/
docs/vue-sdk/
diff --git a/package.json b/package.json
index 5ae008c68..2e82c5855 100644
--- a/package.json
+++ b/package.json
@@ -20,7 +20,7 @@
"format": "prettier --write .",
"lint": "eslint .",
"release": "pnpm build:packages:prod && pnpm changeset publish",
- "test": "pnpm run -r test"
+ "test": "pnpm run -r --workspace-concurrency=0 test"
},
"keywords": [],
"type": "module",
@@ -33,12 +33,14 @@
"@actions/core": "^1.10.1",
"@changesets/cli": "2.27.2",
"@pnpm/workspace.find-packages": "^4.0.2",
- "@vitest/browser": "^3.0.8",
+ "@vitest/browser": "^3.2.4",
"husky": "^9.0.11",
"lint-staged": "^15.2.2",
"playwright": "^1.51.0",
"prettier": "^3.2.5",
+ "prettier-plugin-embed": "^0.4.15",
+ "prettier-plugin-sql": "^0.18.1",
"typescript": "^5.7.2",
- "vitest": "^3.0.8"
+ "vitest": "^3.2.4"
}
}
diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts
index 90bd46010..490d56aa9 100644
--- a/packages/common/src/client/AbstractPowerSyncDatabase.ts
+++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts
@@ -17,9 +17,10 @@ import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { throttleTrailing } from '../utils/async.js';
import { ConnectionManager } from './ConnectionManager.js';
+import { CustomQuery } from './CustomQuery.js';
+import { ArrayQueryDefinition, Query } from './Query.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
-import { runOnSchemaChange } from './runOnSchemaChange.js';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter.js';
import { CrudBatch } from './sync/bucket/CrudBatch.js';
import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js';
@@ -34,6 +35,9 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
+import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
+import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
+import { WatchedQueryComparator } from './watched/processors/comparators.js';
export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
@@ -71,7 +75,7 @@ export interface PowerSyncDatabaseOptionsWithSettings extends BasePowerSyncDatab
database: SQLOpenOptions;
}
-export interface SQLWatchOptions {
+export interface SQLOnChangeOptions {
signal?: AbortSignal;
tables?: string[];
/** The minimum interval between queries. */
@@ -83,6 +87,18 @@ export interface SQLWatchOptions {
* by not removing PowerSync table name prefixes
*/
rawTableNames?: boolean;
+ /**
+ * Emits an empty result set immediately
+ */
+ triggerImmediate?: boolean;
+}
+
+export interface SQLWatchOptions extends SQLOnChangeOptions {
+ /**
+ * Optional comparator which will be used to compare the results of the query.
+ * The watched query will only yield results if the comparator returns false.
+ */
+ comparator?: WatchedQueryComparator;
}
export interface WatchOnChangeEvent {
@@ -102,6 +118,8 @@ export interface WatchOnChangeHandler {
export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
initialized: () => void;
schemaChanged: (schema: Schema) => void;
+ closing: () => Promise | void;
+ closed: () => Promise | void;
}
export interface PowerSyncCloseOptions {
@@ -123,8 +141,6 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
disconnect: true
};
-export const DEFAULT_WATCH_THROTTLE_MS = 30;
-
export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelayMs: 5000,
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
@@ -516,6 +532,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver cb.closing?.());
+
const { disconnect } = options;
if (disconnect) {
await this.disconnect();
@@ -524,6 +542,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver cb.closed?.());
}
/**
@@ -862,6 +881,62 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver ({
+ * ...row,
+ * created_at: new Date(row.created_at as string)
+ * })
+ * })
+ * .watch()
+ * // OR use .differentialWatch() for fine-grained watches.
+ * ```
+ */
+ query(query: ArrayQueryDefinition): Query {
+ const { sql, parameters = [], mapper } = query;
+ const compatibleQuery: WatchCompatibleQuery = {
+ compile: () => ({
+ sql,
+ parameters
+ }),
+ execute: async ({ sql, parameters }) => {
+ const result = await this.getAll(sql, parameters);
+ return mapper ? result.map(mapper) : (result as RowType[]);
+ }
+ };
+ return this.customQuery(compatibleQuery);
+ }
+
+ /**
+ * Allows building a {@link WatchedQuery} using an existing {@link WatchCompatibleQuery}.
+ * The watched query will use the provided {@link WatchCompatibleQuery.execute} method to query results.
+ *
+ * @example
+ * ```javascript
+ *
+ * // Potentially a query from an ORM like Drizzle
+ * const query = db.select().from(lists);
+ *
+ * const watchedTodos = powersync.customQuery(query)
+ * .watch()
+ * // OR use .differentialWatch() for fine-grained watches.
+ * ```
+ */
+ customQuery(query: WatchCompatibleQuery): Query {
+ return new CustomQuery({
+ db: this,
+ query
+ });
+ }
+
/**
* Execute a read query every time the source tables are modified.
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
@@ -879,39 +954,45 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver ({
+ sql: sql,
+ parameters: parameters ?? []
+ }),
+ execute: () => this.executeReadOnly(sql, parameters)
+ },
+ reportFetching: false,
+ throttleMs: options?.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
+ triggerOnTables: options?.tables
+ }
+ });
- const watchQuery = async (abortSignal: AbortSignal) => {
- try {
- const resolvedTables = await this.resolveTables(sql, parameters, options);
- // Fetch initial data
- const result = await this.executeReadOnly(sql, parameters);
- onResult(result);
-
- this.onChangeWithCallback(
- {
- onChange: async () => {
- try {
- const result = await this.executeReadOnly(sql, parameters);
- onResult(result);
- } catch (error) {
- onError?.(error);
- }
- },
- onError
- },
- {
- ...(options ?? {}),
- tables: resolvedTables,
- // Override the abort signal since we intercept it
- signal: abortSignal
- }
- );
- } catch (error) {
- onError?.(error);
+ const dispose = watchedQuery.registerListener({
+ onData: (data) => {
+ if (!data) {
+ // This should not happen. We only use null for the initial data.
+ return;
+ }
+ onResult(data);
+ },
+ onError: (error) => {
+ onError(error);
}
- };
+ });
- runOnSchemaChange(watchQuery, this, options);
+ options?.signal?.addEventListener('abort', () => {
+ dispose();
+ watchedQuery.close();
+ });
}
/**
@@ -985,7 +1066,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver;
+ onChange(options?: SQLOnChangeOptions): AsyncIterable;
/**
* See {@link onChangeWithCallback}.
*
@@ -1000,11 +1081,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void;
+ onChange(handler?: WatchOnChangeHandler, options?: SQLOnChangeOptions): () => void;
onChange(
- handlerOrOptions?: WatchOnChangeHandler | SQLWatchOptions,
- maybeOptions?: SQLWatchOptions
+ handlerOrOptions?: WatchOnChangeHandler | SQLOnChangeOptions,
+ maybeOptions?: SQLOnChangeOptions
): (() => void) | AsyncIterable {
if (handlerOrOptions && typeof handlerOrOptions === 'object' && 'onChange' in handlerOrOptions) {
const handler = handlerOrOptions as WatchOnChangeHandler;
@@ -1029,7 +1110,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void {
+ onChangeWithCallback(handler?: WatchOnChangeHandler, options?: SQLOnChangeOptions): () => void {
const { onChange, onError = (e: Error) => this.logger.error(e) } = handler ?? {};
if (!onChange) {
throw new Error('onChange is required');
@@ -1056,6 +1137,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver {
try {
diff --git a/packages/common/src/client/CustomQuery.ts b/packages/common/src/client/CustomQuery.ts
new file mode 100644
index 000000000..c7dfdd6bb
--- /dev/null
+++ b/packages/common/src/client/CustomQuery.ts
@@ -0,0 +1,56 @@
+import { AbstractPowerSyncDatabase } from './AbstractPowerSyncDatabase.js';
+import { Query, StandardWatchedQueryOptions } from './Query.js';
+import { FalsyComparator } from './watched/processors/comparators.js';
+import {
+ DifferentialQueryProcessor,
+ DifferentialWatchedQueryOptions
+} from './watched/processors/DifferentialQueryProcessor.js';
+import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
+import { DEFAULT_WATCH_QUERY_OPTIONS, WatchCompatibleQuery, WatchedQueryOptions } from './watched/WatchedQuery.js';
+
+/**
+ * @internal
+ */
+export interface CustomQueryOptions {
+ db: AbstractPowerSyncDatabase;
+ query: WatchCompatibleQuery;
+}
+
+/**
+ * @internal
+ */
+export class CustomQuery implements Query {
+ constructor(protected options: CustomQueryOptions) {}
+
+ protected resolveOptions(options: WatchedQueryOptions): WatchedQueryOptions {
+ return {
+ reportFetching: options?.reportFetching ?? DEFAULT_WATCH_QUERY_OPTIONS.reportFetching,
+ throttleMs: options?.throttleMs ?? DEFAULT_WATCH_QUERY_OPTIONS.throttleMs,
+ triggerOnTables: options?.triggerOnTables
+ };
+ }
+
+ watch(watchOptions: StandardWatchedQueryOptions) {
+ return new OnChangeQueryProcessor({
+ db: this.options.db,
+ comparator: watchOptions?.comparator ?? FalsyComparator,
+ placeholderData: watchOptions?.placeholderData ?? [],
+ watchOptions: {
+ ...this.resolveOptions(watchOptions),
+ query: this.options.query
+ }
+ });
+ }
+
+ differentialWatch(differentialWatchOptions: DifferentialWatchedQueryOptions) {
+ return new DifferentialQueryProcessor({
+ db: this.options.db,
+ rowComparator: differentialWatchOptions?.rowComparator,
+ placeholderData: differentialWatchOptions?.placeholderData ?? [],
+ watchOptions: {
+ ...this.resolveOptions(differentialWatchOptions),
+ query: this.options.query
+ }
+ });
+ }
+}
diff --git a/packages/common/src/client/Query.ts b/packages/common/src/client/Query.ts
new file mode 100644
index 000000000..22ddc3278
--- /dev/null
+++ b/packages/common/src/client/Query.ts
@@ -0,0 +1,106 @@
+import { WatchedQueryComparator } from './watched/processors/comparators.js';
+import {
+ DifferentialWatchedQuery,
+ DifferentialWatchedQueryOptions
+} from './watched/processors/DifferentialQueryProcessor.js';
+import { StandardWatchedQuery } from './watched/processors/OnChangeQueryProcessor.js';
+import { WatchedQueryOptions } from './watched/WatchedQuery.js';
+
+/**
+ * Query parameters for {@link ArrayQueryDefinition#parameters}
+ */
+export type QueryParam = string | number | boolean | null | undefined | bigint | Uint8Array;
+
+/**
+ * Options for building a query with {@link AbstractPowerSyncDatabase#query}.
+ * This query will be executed with {@link AbstractPowerSyncDatabase#getAll}.
+ */
+export interface ArrayQueryDefinition {
+ sql: string;
+ parameters?: ReadonlyArray>;
+ /**
+ * Maps the raw SQLite row to a custom typed object.
+ * @example
+ * ```javascript
+ * mapper: (row) => ({
+ * ...row,
+ * created_at: new Date(row.created_at as string),
+ * })
+ * ```
+ */
+ mapper?: (row: Record) => RowType;
+}
+
+/**
+ * Options for {@link Query#watch}.
+ */
+export interface StandardWatchedQueryOptions extends WatchedQueryOptions {
+ /**
+ * The underlying watched query implementation (re)evaluates the query on any SQLite table change.
+ *
+ * Providing this optional comparator can be used to filter duplicate result set emissions when the result set is unchanged.
+ * The comparator compares the previous and current result set.
+ *
+ * For an efficient comparator see {@link ArrayComparator}.
+ *
+ * @example
+ * ```javascript
+ * comparator: new ArrayComparator({
+ * compareBy: (item) => JSON.stringify(item)
+ * })
+ * ```
+ */
+ comparator?: WatchedQueryComparator;
+
+ /**
+ * The initial data state reported while the query is loading for the first time.
+ * @default []
+ */
+ placeholderData?: RowType[];
+}
+
+export interface Query {
+ /**
+ * Creates a {@link WatchedQuery} which watches and emits results of the linked query.
+ *
+ * By default the returned watched query will emit changes whenever a change to the underlying SQLite tables is made.
+ * These changes might not be relevant to the query, but the query will emit a new result set.
+ *
+ * A {@link StandardWatchedQueryOptions#comparator} can be provided to limit the data emissions. The watched query will still
+ * query the underlying DB on underlying table changes, but the result will only be emitted if the comparator detects a change in the results.
+ *
+ * The comparator in this method is optimized and returns early as soon as it detects a change. Each data emission will correlate to a change in the result set,
+ * but note that the result set will not maintain internal object references to the previous result set. If internal object references are needed,
+ * consider using {@link Query#differentialWatch} instead.
+ */
+ watch(options?: StandardWatchedQueryOptions): StandardWatchedQuery>>;
+
+ /**
+ * Creates a {@link WatchedQuery} which watches and emits results of the linked query.
+ *
+ * This query method watches for changes in the underlying SQLite tables and runs the query on each table change.
+ * The difference between the current and previous result set is computed.
+ * The watched query will not emit changes if the result set is identical to the previous result set.
+ *
+ * If the result set is different, the watched query will emit the new result set and emit a detailed diff of the changes via the `onData` and `onDiff` listeners.
+ *
+ * The deep differentiation allows maintaining result set object references between result emissions.
+ * The {@link DifferentialWatchedQuery#state} `data` array will contain the previous row references for unchanged rows.
+ *
+ * @example
+ * ```javascript
+ * const watchedLists = powerSync.query({sql: 'SELECT * FROM lists'})
+ * .differentialWatch();
+ *
+ * const disposeListener = watchedLists.registerListener({
+ * onData: (lists) => {
+ * console.log('The latest result set for the query is', lists);
+ * },
+ * onDiff: (diff) => {
+ * console.log('The lists result set has changed since the last emission', diff.added, diff.removed, diff.updated, diff.all)
+ * }
+ * })
+ * ```
+ */
+ differentialWatch(options?: DifferentialWatchedQueryOptions): DifferentialWatchedQuery;
+}
diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
index ecd6c7ef4..3c24c059c 100644
--- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
+++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
@@ -1,4 +1,4 @@
-import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
+import { BaseListener, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
import { CrudBatch } from './CrudBatch.js';
import { CrudEntry, OpId } from './CrudEntry.js';
import { SyncDataBatch } from './SyncDataBatch.js';
@@ -72,7 +72,7 @@ export interface BucketStorageListener extends BaseListener {
crudUpdate: () => void;
}
-export interface BucketStorageAdapter extends BaseObserver, Disposable {
+export interface BucketStorageAdapter extends BaseObserverInterface, Disposable {
init(): Promise;
saveSyncData(batch: SyncDataBatch, fixedKeyFormat?: boolean): Promise;
removeBuckets(buckets: string[]): Promise;
diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts
index 31aed88bc..669441df3 100644
--- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts
+++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts
@@ -1,11 +1,11 @@
import Logger, { ILogger } from 'js-logger';
-import { DataStream } from '../../../utils/DataStream.js';
-import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/crud/SyncProgress.js';
import * as sync_status from '../../../db/crud/SyncStatus.js';
+import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
-import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
+import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
+import { DataStream } from '../../../utils/DataStream.js';
import { throttleLeadingTrailing } from '../../../utils/async.js';
import {
BucketChecksum,
@@ -17,6 +17,7 @@ import {
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
+import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js';
import {
BucketRequest,
CrudUploadNotification,
@@ -30,7 +31,6 @@ import {
isStreamingSyncCheckpointPartiallyComplete,
isStreamingSyncData
} from './streaming-sync-types.js';
-import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js';
export enum LockType {
CRUD = 'crud',
@@ -179,7 +179,9 @@ export interface AdditionalConnectionOptions {
/** @internal */
export type RequiredAdditionalConnectionOptions = Required;
-export interface StreamingSyncImplementation extends BaseObserver, Disposable {
+export interface StreamingSyncImplementation
+ extends BaseObserverInterface,
+ Disposable {
/**
* Connects to the sync service
*/
@@ -231,6 +233,9 @@ export abstract class AbstractStreamingSyncImplementation
protected _lastSyncedAt: Date | null;
protected options: AbstractStreamingSyncImplementationOptions;
protected abortController: AbortController | null;
+ // In rare cases, mostly for tests, uploads can be triggered without being properly connected.
+ // This allows ensuring that all upload processes can be aborted.
+ protected uploadAbortController: AbortController | null;
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise;
protected logger: ILogger;
@@ -320,8 +325,10 @@ export abstract class AbstractStreamingSyncImplementation
}
async dispose() {
+ super.dispose();
this.crudUpdateListener?.();
this.crudUpdateListener = undefined;
+ this.uploadAbortController?.abort();
}
abstract obtainLock(lockOptions: LockOptions): Promise;
@@ -348,7 +355,17 @@ export abstract class AbstractStreamingSyncImplementation
*/
let checkedCrudItem: CrudEntry | undefined;
- while (true) {
+ const controller = new AbortController();
+ this.uploadAbortController = controller;
+ this.abortController?.signal.addEventListener(
+ 'abort',
+ () => {
+ controller.abort();
+ },
+ { once: true }
+ );
+
+ while (!controller.signal.aborted) {
try {
/**
* This is the first item in the FIFO CRUD queue.
@@ -393,7 +410,7 @@ The next upload iteration will be delayed.`);
uploadError: ex
}
});
- await this.delayRetry();
+ await this.delayRetry(controller.signal);
if (!this.isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
@@ -409,6 +426,7 @@ The next upload iteration will be delayed.`);
});
}
}
+ this.uploadAbortController = null;
}
});
}
diff --git a/packages/common/src/client/watched/GetAllQuery.ts b/packages/common/src/client/watched/GetAllQuery.ts
new file mode 100644
index 000000000..116fe1fd4
--- /dev/null
+++ b/packages/common/src/client/watched/GetAllQuery.ts
@@ -0,0 +1,46 @@
+import { CompiledQuery } from '../../types/types.js';
+import { AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js';
+import { WatchCompatibleQuery } from './WatchedQuery.js';
+
+/**
+ * Options for {@link GetAllQuery}.
+ */
+export type GetAllQueryOptions = {
+ sql: string;
+ parameters?: ReadonlyArray;
+ /**
+ * Optional mapper function to convert raw rows into the desired RowType.
+ * @example
+ * ```javascript
+ * (rawRow) => ({
+ * id: rawRow.id,
+ * created_at: new Date(rawRow.created_at),
+ * })
+ * ```
+ */
+ mapper?: (rawRow: Record) => RowType;
+};
+
+/**
+ * Performs a {@link AbstractPowerSyncDatabase.getAll} operation for a watched query.
+ */
+export class GetAllQuery implements WatchCompatibleQuery {
+ constructor(protected options: GetAllQueryOptions) {}
+
+ compile(): CompiledQuery {
+ return {
+ sql: this.options.sql,
+ parameters: this.options.parameters ?? []
+ };
+ }
+
+ async execute(options: { db: AbstractPowerSyncDatabase }): Promise {
+ const { db } = options;
+ const { sql, parameters = [] } = this.compile();
+ const rawResult = await db.getAll(sql, [...parameters]);
+ if (this.options.mapper) {
+ return rawResult.map(this.options.mapper);
+ }
+ return rawResult as RowType[];
+ }
+}
diff --git a/packages/common/src/client/watched/WatchedQuery.ts b/packages/common/src/client/watched/WatchedQuery.ts
new file mode 100644
index 000000000..ad0f0506d
--- /dev/null
+++ b/packages/common/src/client/watched/WatchedQuery.ts
@@ -0,0 +1,119 @@
+import { CompiledQuery } from '../../types/types.js';
+import { BaseListener } from '../../utils/BaseObserver.js';
+import { MetaBaseObserverInterface } from '../../utils/MetaBaseObserver.js';
+import { AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js';
+
+/**
+ * State for {@link WatchedQuery} instances.
+ */
+export interface WatchedQueryState {
+ /**
+ * Indicates the initial loading state (hard loading).
+ * Loading becomes false once the first set of results from the watched query is available or an error occurs.
+ */
+ readonly isLoading: boolean;
+ /**
+ * Indicates whether the query is currently fetching data, is true during the initial load
+ * and any time when the query is re-evaluating (useful for large queries).
+ */
+ readonly isFetching: boolean;
+ /**
+ * The last error that occurred while executing the query.
+ */
+ readonly error: Error | null;
+ /**
+ * The last time the query was updated.
+ */
+ readonly lastUpdated: Date | null;
+ /**
+ * The last data returned by the query.
+ */
+ readonly data: Data;
+}
+
+/**
+ * Options provided to the `execute` method of a {@link WatchCompatibleQuery}.
+ */
+export interface WatchExecuteOptions {
+ sql: string;
+ parameters: any[];
+ db: AbstractPowerSyncDatabase;
+}
+
+/**
+ * Similar to {@link CompatibleQuery}, except the `execute` method
+ * does not enforce an Array result type.
+ */
+export interface WatchCompatibleQuery {
+ execute(options: WatchExecuteOptions): Promise;
+ compile(): CompiledQuery;
+}
+
+export interface WatchedQueryOptions {
+ /** The minimum interval between queries. */
+ throttleMs?: number;
+ /**
+ * If true (default) the watched query will update its state to report
+ * on the fetching state of the query.
+ * Setting to false reduces the number of state changes if the fetch status
+ * is not relevant to the consumer.
+ */
+ reportFetching?: boolean;
+
+ /**
+ * By default, watched queries requery the database on any change to any dependent table of the query.
+ * Supplying an override here can be used to limit the tables which trigger querying the database.
+ */
+ triggerOnTables?: string[];
+}
+
+export enum WatchedQueryListenerEvent {
+ ON_DATA = 'onData',
+ ON_ERROR = 'onError',
+ ON_STATE_CHANGE = 'onStateChange',
+ CLOSED = 'closed'
+}
+
+export interface WatchedQueryListener extends BaseListener {
+ [WatchedQueryListenerEvent.ON_DATA]?: (data: Data) => void | Promise;
+ [WatchedQueryListenerEvent.ON_ERROR]?: (error: Error) => void | Promise;
+ [WatchedQueryListenerEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState) => void | Promise;
+ [WatchedQueryListenerEvent.CLOSED]?: () => void | Promise;
+}
+
+export const DEFAULT_WATCH_THROTTLE_MS = 30;
+
+export const DEFAULT_WATCH_QUERY_OPTIONS: WatchedQueryOptions = {
+ throttleMs: DEFAULT_WATCH_THROTTLE_MS,
+ reportFetching: true
+};
+
+export interface WatchedQuery<
+ Data = unknown,
+ Settings extends WatchedQueryOptions = WatchedQueryOptions,
+ Listener extends WatchedQueryListener = WatchedQueryListener
+> extends MetaBaseObserverInterface {
+ /**
+ * Current state of the watched query.
+ */
+ readonly state: WatchedQueryState;
+
+ readonly closed: boolean;
+
+ /**
+ * Subscribe to watched query events.
+ * @returns A function to unsubscribe from the events.
+ */
+ registerListener(listener: Listener): () => void;
+
+ /**
+ * Updates the underlying query options.
+ * This will trigger a re-evaluation of the query and update the state.
+ */
+ updateSettings(options: Settings): Promise;
+
+ /**
+ * Close the watched query and end all subscriptions.
+ */
+ close(): Promise;
+}
diff --git a/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts b/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts
new file mode 100644
index 000000000..0e5c1ab8a
--- /dev/null
+++ b/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts
@@ -0,0 +1,201 @@
+import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js';
+import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js';
+import { WatchedQuery, WatchedQueryListener, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
+
+/**
+ * @internal
+ */
+export interface AbstractQueryProcessorOptions {
+ db: AbstractPowerSyncDatabase;
+ watchOptions: Settings;
+ placeholderData: Data;
+}
+
+/**
+ * @internal
+ */
+export interface LinkQueryOptions {
+ abortSignal: AbortSignal;
+ settings: Settings;
+}
+
+type MutableDeep =
+ T extends ReadonlyArray
+ ? U[] // convert readonly arrays to mutable arrays
+ : T;
+
+/**
+ * @internal Mutable version of {@link WatchedQueryState}.
+ * This is used internally to allow updates to the state.
+ */
+export type MutableWatchedQueryState = {
+ -readonly [P in keyof WatchedQueryState]: MutableDeep[P]>;
+};
+
+type WatchedQueryProcessorListener = WatchedQueryListener;
+
+/**
+ * Performs underlying watching and yields a stream of results.
+ * @internal
+ */
+export abstract class AbstractQueryProcessor<
+ Data = unknown[],
+ Settings extends WatchedQueryOptions = WatchedQueryOptions
+ >
+ extends MetaBaseObserver>
+ implements WatchedQuery
+{
+ readonly state: WatchedQueryState;
+
+ protected abortController: AbortController;
+ protected initialized: Promise;
+ protected _closed: boolean;
+ protected disposeListeners: (() => void) | null;
+
+ get closed() {
+ return this._closed;
+ }
+
+ constructor(protected options: AbstractQueryProcessorOptions) {
+ super();
+ this.abortController = new AbortController();
+ this._closed = false;
+ this.state = this.constructInitialState();
+ this.disposeListeners = null;
+ this.initialized = this.init();
+ }
+
+ protected constructInitialState(): WatchedQueryState {
+ return {
+ isLoading: true,
+ isFetching: this.reportFetching, // Only set to true if we will report updates in future
+ error: null,
+ lastUpdated: null,
+ data: this.options.placeholderData
+ };
+ }
+
+ protected get reportFetching() {
+ return this.options.watchOptions.reportFetching ?? true;
+ }
+
+ /**
+ * Updates the underlying query.
+ */
+ async updateSettings(settings: Settings) {
+ await this.initialized;
+
+ if (!this.state.isFetching && this.reportFetching) {
+ await this.updateState({
+ isFetching: true
+ });
+ }
+
+ this.options.watchOptions = settings;
+ this.abortController.abort();
+ this.abortController = new AbortController();
+ await this.runWithReporting(() =>
+ this.linkQuery({
+ abortSignal: this.abortController.signal,
+ settings
+ })
+ );
+ }
+
+ /**
+ * This method is used to link a query to the subscribers of this listener class.
+ * This method should perform actual query watching and report results via {@link updateState} method.
+ */
+ protected abstract linkQuery(options: LinkQueryOptions): Promise;
+
+ protected async updateState(update: Partial>) {
+ if (typeof update.error !== 'undefined') {
+ await this.iterateAsyncListenersWithError(async (l) => l.onError?.(update.error!));
+ // An error always stops for the current fetching state
+ update.isFetching = false;
+ update.isLoading = false;
+ }
+
+ Object.assign(this.state, { lastUpdated: new Date() } satisfies Partial>, update);
+
+ if (typeof update.data !== 'undefined') {
+ await this.iterateAsyncListenersWithError(async (l) => l.onData?.(this.state.data));
+ }
+
+ await this.iterateAsyncListenersWithError(async (l) => l.onStateChange?.(this.state));
+ }
+
+ /**
+ * Configures base DB listeners and links the query to listeners.
+ */
+ protected async init() {
+ const { db } = this.options;
+
+ const disposeCloseListener = db.registerListener({
+ closing: async () => {
+ await this.close();
+ }
+ });
+
+ // Wait for the schema to be set before listening to changes
+ await db.waitForReady();
+ const disposeSchemaListener = db.registerListener({
+ schemaChanged: async () => {
+ await this.runWithReporting(async () => {
+ await this.updateSettings(this.options.watchOptions);
+ });
+ }
+ });
+
+ this.disposeListeners = () => {
+ disposeCloseListener();
+ disposeSchemaListener();
+ };
+
+ // Initial setup
+ this.runWithReporting(async () => {
+ await this.updateSettings(this.options.watchOptions);
+ });
+ }
+
+ async close() {
+ await this.initialized;
+ this.abortController.abort();
+ this.disposeListeners?.();
+ this.disposeListeners = null;
+ this._closed = true;
+ this.iterateListeners((l) => l.closed?.());
+ this.listeners.clear();
+ }
+
+ /**
+ * Runs a callback and reports errors to the error listeners.
+ */
+ protected async runWithReporting(callback: () => Promise): Promise {
+ try {
+ await callback();
+ } catch (error) {
+ // This will update the error on the state and iterate error listeners
+ await this.updateState({ error });
+ }
+ }
+
+ /**
+ * Iterate listeners and reports errors to onError handlers.
+ */
+ protected async iterateAsyncListenersWithError(
+ callback: (listener: Partial>) => Promise | void
+ ) {
+ try {
+ await this.iterateAsyncListeners(async (l) => callback(l));
+ } catch (error) {
+ try {
+ await this.iterateAsyncListeners(async (l) => l.onError?.(error));
+ } catch (error) {
+ // Errors here are ignored
+ // since we are already in an error state
+ this.options.db.logger.error('Watched query error handler threw an Error', error);
+ }
+ }
+ }
+}
diff --git a/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts b/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts
new file mode 100644
index 000000000..48367e552
--- /dev/null
+++ b/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts
@@ -0,0 +1,297 @@
+import { WatchCompatibleQuery, WatchedQuery, WatchedQueryListener, WatchedQueryOptions } from '../WatchedQuery.js';
+import {
+ AbstractQueryProcessor,
+ AbstractQueryProcessorOptions,
+ LinkQueryOptions,
+ MutableWatchedQueryState
+} from './AbstractQueryProcessor.js';
+
+/**
+ * Represents an updated row in a differential watched query.
+ * It contains both the current and previous state of the row.
+ */
+export interface WatchedQueryRowDifferential {
+ readonly current: RowType;
+ readonly previous: RowType;
+}
+
+/**
+ * Represents the result of a watched query that has been diffed.
+ * {@link DifferentialWatchedQueryState#diff} is of the {@link WatchedQueryDifferential} form.
+ */
+export interface WatchedQueryDifferential {
+ readonly added: ReadonlyArray>;
+ /**
+ * The entire current result set.
+ * Array item object references are preserved between updates if the item is unchanged.
+ *
+ * e.g. In the query
+ * ```sql
+ * SELECT name, make FROM assets ORDER BY make ASC;
+ * ```
+ *
+ * If a previous result set contains an item (A) `{name: 'pc', make: 'Cool PC'}` and
+ * an update has been made which adds another item (B) to the result set (the item A is unchanged) - then
+ * the updated result set will be contain the same object reference, to item A, as the previous result set.
+ * This is regardless of the item A's position in the updated result set.
+ */
+ readonly all: ReadonlyArray>;
+ readonly removed: ReadonlyArray>;
+ readonly updated: ReadonlyArray>>;
+ readonly unchanged: ReadonlyArray>;
+}
+
+/**
+ * Row comparator for differentially watched queries which keys and compares items in the result set.
+ */
+export interface DifferentialWatchedQueryComparator {
+ /**
+ * Generates a unique key for the item.
+ */
+ keyBy: (item: RowType) => string;
+ /**
+ * Generates a token for comparing items with matching keys.
+ */
+ compareBy: (item: RowType) => string;
+}
+
+/**
+ * Options for building a differential watched query with the {@link Query} builder.
+ */
+export interface DifferentialWatchedQueryOptions extends WatchedQueryOptions {
+ /**
+ * Initial result data which is presented while the initial loading is executing.
+ */
+ placeholderData?: RowType[];
+
+ /**
+ * Row comparator used to identify and compare rows in the result set.
+ * If not provided, the default comparator will be used which keys items by their `id` property if available,
+ * otherwise it uses JSON stringification of the entire item for keying and comparison.
+ * @defaultValue {@link DEFAULT_ROW_COMPARATOR}
+ */
+ rowComparator?: DifferentialWatchedQueryComparator;
+}
+
+/**
+ * Settings for differential incremental watched queries using.
+ */
+export interface DifferentialWatchedQuerySettings extends DifferentialWatchedQueryOptions {
+ /**
+ * The query here must return an array of items that can be differentiated.
+ */
+ query: WatchCompatibleQuery;
+}
+
+export interface DifferentialWatchedQueryListener
+ extends WatchedQueryListener>> {
+ onDiff?: (diff: WatchedQueryDifferential) => void | Promise;
+}
+
+export type DifferentialWatchedQuery = WatchedQuery<
+ ReadonlyArray>,
+ DifferentialWatchedQuerySettings,
+ DifferentialWatchedQueryListener
+>;
+
+/**
+ * @internal
+ */
+export interface DifferentialQueryProcessorOptions
+ extends AbstractQueryProcessorOptions> {
+ rowComparator?: DifferentialWatchedQueryComparator;
+}
+
+type DataHashMap = Map;
+
+/**
+ * An empty differential result set.
+ * This is used as the initial state for differential incrementally watched queries.
+ */
+export const EMPTY_DIFFERENTIAL = {
+ added: [],
+ all: [],
+ removed: [],
+ updated: [],
+ unchanged: []
+};
+
+/**
+ * Default implementation of the {@link DifferentialWatchedQueryComparator} for watched queries.
+ * It keys items by their `id` property if available, alternatively it uses JSON stringification
+ * of the entire item for the key and comparison.
+ */
+export const DEFAULT_ROW_COMPARATOR: DifferentialWatchedQueryComparator = {
+ keyBy: (item) => {
+ if (item && typeof item == 'object' && typeof item['id'] == 'string') {
+ return item['id'];
+ }
+ return JSON.stringify(item);
+ },
+ compareBy: (item) => JSON.stringify(item)
+};
+
+/**
+ * Uses the PowerSync onChange event to trigger watched queries.
+ * Results are emitted on every change of the relevant tables.
+ * @internal
+ */
+export class DifferentialQueryProcessor
+ extends AbstractQueryProcessor>, DifferentialWatchedQuerySettings>
+ implements DifferentialWatchedQuery
+{
+ protected comparator: DifferentialWatchedQueryComparator;
+
+ constructor(protected options: DifferentialQueryProcessorOptions) {
+ super(options);
+ this.comparator = options.rowComparator ?? DEFAULT_ROW_COMPARATOR;
+ }
+
+ /*
+ * @returns If the sets are equal
+ */
+ protected differentiate(
+ current: RowType[],
+ previousMap: DataHashMap
+ ): { diff: WatchedQueryDifferential; map: DataHashMap; hasChanged: boolean } {
+ const { keyBy, compareBy } = this.comparator;
+
+ let hasChanged = false;
+ const currentMap = new Map();
+ const removedTracker = new Set(previousMap.keys());
+
+ // Allow mutating to populate the data temporarily.
+ const diff = {
+ all: [] as RowType[],
+ added: [] as RowType[],
+ removed: [] as RowType[],
+ updated: [] as WatchedQueryRowDifferential[],
+ unchanged: [] as RowType[]
+ };
+
+ /**
+ * Looping over the current result set array is important to preserve
+ * the ordering of the result set.
+ * We can replace items in the current array with previous object references if they are equal.
+ */
+ for (const item of current) {
+ const key = keyBy(item);
+ const hash = compareBy(item);
+ currentMap.set(key, { hash, item });
+
+ const previousItem = previousMap.get(key);
+ if (!previousItem) {
+ // New item
+ hasChanged = true;
+ diff.added.push(item);
+ diff.all.push(item);
+ } else {
+ // Existing item
+ if (hash == previousItem.hash) {
+ diff.unchanged.push(previousItem.item);
+ // Use the previous object reference
+ diff.all.push(previousItem.item);
+ // update the map to preserve the reference
+ currentMap.set(key, previousItem);
+ } else {
+ hasChanged = true;
+ diff.updated.push({ current: item, previous: previousItem.item });
+ // Use the new reference
+ diff.all.push(item);
+ }
+ }
+ // The item is present, we don't consider it removed
+ removedTracker.delete(key);
+ }
+
+ diff.removed = Array.from(removedTracker).map((key) => previousMap.get(key)!.item);
+ hasChanged = hasChanged || diff.removed.length > 0;
+
+ return {
+ diff,
+ hasChanged,
+ map: currentMap
+ };
+ }
+
+ protected async linkQuery(options: LinkQueryOptions>): Promise {
+ const { db, watchOptions } = this.options;
+ const { abortSignal } = options;
+
+ const compiledQuery = watchOptions.query.compile();
+ const tables = await db.resolveTables(compiledQuery.sql, compiledQuery.parameters as any[], {
+ tables: options.settings.triggerOnTables
+ });
+
+ let currentMap: DataHashMap = new Map();
+
+ // populate the currentMap from the placeholder data
+ this.state.data.forEach((item) => {
+ currentMap.set(this.comparator.keyBy(item), {
+ hash: this.comparator.compareBy(item),
+ item
+ });
+ });
+
+ db.onChangeWithCallback(
+ {
+ onChange: async () => {
+ if (this.closed) {
+ return;
+ }
+ // This fires for each change of the relevant tables
+ try {
+ if (this.reportFetching && !this.state.isFetching) {
+ await this.updateState({ isFetching: true });
+ }
+
+ const partialStateUpdate: Partial> = {};
+
+ // Always run the query if an underlying table has changed
+ const result = await watchOptions.query.execute({
+ sql: compiledQuery.sql,
+ // Allows casting from ReadOnlyArray[unknown] to Array
+ // This allows simpler compatibility with PowerSync queries
+ parameters: [...compiledQuery.parameters],
+ db: this.options.db
+ });
+
+ if (this.reportFetching) {
+ partialStateUpdate.isFetching = false;
+ }
+
+ if (this.state.isLoading) {
+ partialStateUpdate.isLoading = false;
+ }
+
+ const { diff, hasChanged, map } = this.differentiate(result, currentMap);
+ // Update for future comparisons
+ currentMap = map;
+
+ if (hasChanged) {
+ await this.iterateAsyncListenersWithError((l) => l.onDiff?.(diff));
+ Object.assign(partialStateUpdate, {
+ data: diff.all
+ });
+ }
+
+ if (Object.keys(partialStateUpdate).length > 0) {
+ await this.updateState(partialStateUpdate);
+ }
+ } catch (error) {
+ await this.updateState({ error });
+ }
+ },
+ onError: async (error) => {
+ await this.updateState({ error });
+ }
+ },
+ {
+ signal: abortSignal,
+ tables,
+ throttleMs: watchOptions.throttleMs,
+ triggerImmediate: true // used to emit the initial state
+ }
+ );
+ }
+}
diff --git a/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts b/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts
new file mode 100644
index 000000000..0763af4c6
--- /dev/null
+++ b/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts
@@ -0,0 +1,114 @@
+import { WatchCompatibleQuery, WatchedQuery, WatchedQueryOptions } from '../WatchedQuery.js';
+import {
+ AbstractQueryProcessor,
+ AbstractQueryProcessorOptions,
+ LinkQueryOptions,
+ MutableWatchedQueryState
+} from './AbstractQueryProcessor.js';
+import { WatchedQueryComparator } from './comparators.js';
+
+/**
+ * Settings for {@link WatchedQuery} instances created via {@link Query#watch}.
+ */
+export interface WatchedQuerySettings extends WatchedQueryOptions {
+ query: WatchCompatibleQuery;
+}
+
+/**
+ * {@link WatchedQuery} returned from {@link Query#watch}.
+ */
+export type StandardWatchedQuery = WatchedQuery>;
+
+/**
+ * @internal
+ */
+export interface OnChangeQueryProcessorOptions
+ extends AbstractQueryProcessorOptions> {
+ comparator?: WatchedQueryComparator;
+}
+
+/**
+ * Uses the PowerSync onChange event to trigger watched queries.
+ * Results are emitted on every change of the relevant tables.
+ * @internal
+ */
+export class OnChangeQueryProcessor extends AbstractQueryProcessor> {
+ constructor(protected options: OnChangeQueryProcessorOptions) {
+ super(options);
+ }
+
+ /**
+ * @returns If the sets are equal
+ */
+ protected checkEquality(current: Data, previous: Data): boolean {
+ // Use the provided comparator if available. Assume values are unique if not available.
+ return this.options.comparator?.checkEquality?.(current, previous) ?? false;
+ }
+
+ protected async linkQuery(options: LinkQueryOptions): Promise {
+ const { db, watchOptions } = this.options;
+ const { abortSignal } = options;
+
+ const compiledQuery = watchOptions.query.compile();
+ const tables = await db.resolveTables(compiledQuery.sql, compiledQuery.parameters as any[], {
+ tables: options.settings.triggerOnTables
+ });
+
+ db.onChangeWithCallback(
+ {
+ onChange: async () => {
+ if (this.closed) {
+ return;
+ }
+ // This fires for each change of the relevant tables
+ try {
+ if (this.reportFetching && !this.state.isFetching) {
+ await this.updateState({ isFetching: true });
+ }
+
+ const partialStateUpdate: Partial> & { data?: Data } = {};
+
+ // Always run the query if an underlying table has changed
+ const result = await watchOptions.query.execute({
+ sql: compiledQuery.sql,
+ // Allows casting from ReadOnlyArray[unknown] to Array
+ // This allows simpler compatibility with PowerSync queries
+ parameters: [...compiledQuery.parameters],
+ db: this.options.db
+ });
+
+ if (this.reportFetching) {
+ partialStateUpdate.isFetching = false;
+ }
+
+ if (this.state.isLoading) {
+ partialStateUpdate.isLoading = false;
+ }
+
+ // Check if the result has changed
+ if (!this.checkEquality(result, this.state.data)) {
+ Object.assign(partialStateUpdate, {
+ data: result
+ });
+ }
+
+ if (Object.keys(partialStateUpdate).length > 0) {
+ await this.updateState(partialStateUpdate);
+ }
+ } catch (error) {
+ await this.updateState({ error });
+ }
+ },
+ onError: async (error) => {
+ await this.updateState({ error });
+ }
+ },
+ {
+ signal: abortSignal,
+ tables,
+ throttleMs: watchOptions.throttleMs,
+ triggerImmediate: true // used to emit the initial state
+ }
+ );
+ }
+}
diff --git a/packages/common/src/client/watched/processors/comparators.ts b/packages/common/src/client/watched/processors/comparators.ts
new file mode 100644
index 000000000..038129d08
--- /dev/null
+++ b/packages/common/src/client/watched/processors/comparators.ts
@@ -0,0 +1,57 @@
+/**
+ * A basic comparator for incrementally watched queries. This performs a single comparison which
+ * determines if the result set has changed. The {@link WatchedQuery} will only emit the new result
+ * if a change has been detected.
+ */
+export interface WatchedQueryComparator {
+ checkEquality: (current: Data, previous: Data) => boolean;
+}
+
+/**
+ * Options for {@link ArrayComparator}
+ */
+export type ArrayComparatorOptions = {
+ /**
+ * Returns a string to uniquely identify an item in the array.
+ */
+ compareBy: (item: ItemType) => string;
+};
+
+/**
+ * An efficient comparator for {@link WatchedQuery} created with {@link Query#watch}. This has the ability to determine if a query
+ * result has changes without necessarily processing all items in the result.
+ */
+export class ArrayComparator implements WatchedQueryComparator {
+ constructor(protected options: ArrayComparatorOptions) {}
+
+ checkEquality(current: ItemType[], previous: ItemType[]) {
+ if (current.length === 0 && previous.length === 0) {
+ return true;
+ }
+
+ if (current.length !== previous.length) {
+ return false;
+ }
+
+ const { compareBy } = this.options;
+
+ // At this point the lengths are equal
+ for (let i = 0; i < current.length; i++) {
+ const currentItem = compareBy(current[i]);
+ const previousItem = compareBy(previous[i]);
+
+ if (currentItem !== previousItem) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
+
+/**
+ * Watched query comparator that always reports changed result sets.
+ */
+export const FalsyComparator: WatchedQueryComparator = {
+ checkEquality: () => false // Default comparator that always returns false
+};
diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts
index 44203a2b9..40d4f7330 100644
--- a/packages/common/src/index.ts
+++ b/packages/common/src/index.ts
@@ -31,6 +31,14 @@ export * from './db/schema/Schema.js';
export * from './db/schema/Table.js';
export * from './db/schema/TableV2.js';
+export * from './client/Query.js';
+export * from './client/watched/GetAllQuery.js';
+export * from './client/watched/processors/AbstractQueryProcessor.js';
+export * from './client/watched/processors/comparators.js';
+export * from './client/watched/processors/DifferentialQueryProcessor.js';
+export * from './client/watched/processors/OnChangeQueryProcessor.js';
+export * from './client/watched/WatchedQuery.js';
+
export * from './utils/AbortOperation.js';
export * from './utils/BaseObserver.js';
export * from './utils/DataStream.js';
diff --git a/packages/common/src/utils/BaseObserver.ts b/packages/common/src/utils/BaseObserver.ts
index df56e9e61..fa8067226 100644
--- a/packages/common/src/utils/BaseObserver.ts
+++ b/packages/common/src/utils/BaseObserver.ts
@@ -1,20 +1,22 @@
export interface Disposable {
- dispose: () => Promise;
+ dispose: () => Promise | void;
}
+export type BaseListener = Record any) | undefined>;
+
export interface BaseObserverInterface {
registerListener(listener: Partial): () => void;
}
-export type BaseListener = {
- [key: string]: ((...event: any) => any) | undefined;
-};
-
export class BaseObserver implements BaseObserverInterface {
protected listeners = new Set>();
constructor() {}
+ dispose(): void {
+ this.listeners.clear();
+ }
+
/**
* Register a listener for updates to the PowerSync client.
*/
diff --git a/packages/common/src/utils/MetaBaseObserver.ts b/packages/common/src/utils/MetaBaseObserver.ts
new file mode 100644
index 000000000..73bb038f8
--- /dev/null
+++ b/packages/common/src/utils/MetaBaseObserver.ts
@@ -0,0 +1,81 @@
+import { BaseListener, BaseObserver, BaseObserverInterface } from './BaseObserver.js';
+
+/**
+ * Represents the counts of listeners for each event type in a BaseListener.
+ */
+export type ListenerCounts = Partial> & {
+ total: number;
+};
+
+/**
+ * Meta listener which reports the counts of listeners for each event type.
+ */
+export interface MetaListener extends BaseListener {
+ listenersChanged?: (counts: ListenerCounts) => void;
+}
+
+export interface ListenerMetaManager
+ extends BaseObserverInterface> {
+ counts: ListenerCounts;
+}
+
+export interface MetaBaseObserverInterface extends BaseObserverInterface {
+ listenerMeta: ListenerMetaManager;
+}
+
+/**
+ * A BaseObserver that tracks the counts of listeners for each event type.
+ */
+export class MetaBaseObserver
+ extends BaseObserver
+ implements MetaBaseObserverInterface
+{
+ protected get listenerCounts(): ListenerCounts {
+ const counts = {} as Partial>;
+ let total = 0;
+ for (const listener of this.listeners) {
+ for (const key in listener) {
+ if (listener[key]) {
+ counts[key] = (counts[key] ?? 0) + 1;
+ total++;
+ }
+ }
+ }
+ return {
+ ...counts,
+ total
+ };
+ }
+
+ get listenerMeta(): ListenerMetaManager {
+ return {
+ counts: this.listenerCounts,
+ // Allows registering a meta listener that will be notified of changes in listener counts
+ registerListener: (listener: Partial>) => {
+ return this.metaListener.registerListener(listener);
+ }
+ };
+ }
+
+ protected metaListener: BaseObserver>;
+
+ constructor() {
+ super();
+ this.metaListener = new BaseObserver>();
+ }
+
+ registerListener(listener: Partial): () => void {
+ const dispose = super.registerListener(listener);
+ const updatedCount = this.listenerCounts;
+ this.metaListener.iterateListeners((l) => {
+ l.listenersChanged?.(updatedCount);
+ });
+ return () => {
+ dispose();
+ const updatedCount = this.listenerCounts;
+ this.metaListener.iterateListeners((l) => {
+ l.listenersChanged?.(updatedCount);
+ });
+ };
+ }
+}
diff --git a/packages/kysely-driver/tests/sqlite/watch.test.ts b/packages/kysely-driver/tests/sqlite/watch.test.ts
index 1c39a32e3..1a499a5fe 100644
--- a/packages/kysely-driver/tests/sqlite/watch.test.ts
+++ b/packages/kysely-driver/tests/sqlite/watch.test.ts
@@ -90,9 +90,9 @@ describe('Watch Tests', () => {
await db
.insertInto('assets')
.values({
- id: sql`uuid()`,
+ id: sql`uuid ()`,
make: 'test',
- customer_id: sql`uuid()`
+ customer_id: sql`uuid ()`
})
.execute();
@@ -126,9 +126,9 @@ describe('Watch Tests', () => {
await db
.insertInto('assets')
.values({
- id: sql`uuid()`,
+ id: sql`uuid ()`,
make: 'test',
- customer_id: sql`uuid()`
+ customer_id: sql`uuid ()`
})
.execute();
}
@@ -180,9 +180,9 @@ describe('Watch Tests', () => {
await db
.insertInto('assets')
.values({
- id: sql`uuid()`,
+ id: sql`uuid ()`,
make: 'test',
- customer_id: sql`uuid()`
+ customer_id: sql`uuid ()`
})
.execute();
@@ -210,7 +210,7 @@ describe('Watch Tests', () => {
const query = db.selectFrom('assets').select([
() => {
- const fullName = sql`fakeFunction()`; // Simulate an error with invalid function
+ const fullName = sql`fakeFunction ()`; // Simulate an error with invalid function
return fullName.as('full_name');
}
]);
@@ -246,9 +246,9 @@ describe('Watch Tests', () => {
for (let i = 0; i < updatesCount; i++) {
db.insertInto('assets')
.values({
- id: sql`uuid()`,
+ id: sql`uuid ()`,
make: 'test',
- customer_id: sql`uuid()`
+ customer_id: sql`uuid ()`
})
.execute();
}
@@ -261,4 +261,33 @@ describe('Watch Tests', () => {
expect(receivedWithManagedOverflowCount).greaterThan(2);
expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4);
});
+
+ it('incremental watch should accept queries', async () => {
+ const query = db.selectFrom('assets').select(db.fn.count('assets.id').as('count'));
+
+ const watch = powerSyncDb.customQuery(query).watch();
+
+ const latestDataPromise = new Promise>>((resolve) => {
+ const dispose = watch.registerListener({
+ onData: (data) => {
+ if (data.length > 0) {
+ resolve([...data]);
+ dispose();
+ }
+ }
+ });
+ });
+
+ await db
+ .insertInto('assets')
+ .values({
+ id: sql`uuid ()`,
+ make: 'test',
+ customer_id: sql`uuid ()`
+ })
+ .execute();
+
+ const data = await latestDataPromise;
+ expect(data.length).equals(1);
+ });
});
diff --git a/packages/node/package.json b/packages/node/package.json
index 2997a5867..a697c7e46 100644
--- a/packages/node/package.json
+++ b/packages/node/package.json
@@ -61,7 +61,7 @@
"drizzle-orm": "^0.35.2",
"rollup": "4.14.3",
"typescript": "^5.5.3",
- "vitest": "^3.0.5"
+ "vitest": "^3.2.4"
},
"keywords": [
"data sync",
diff --git a/packages/react-native/rollup.config.mjs b/packages/react-native/rollup.config.mjs
index d2ee7e7fa..08699165b 100644
--- a/packages/react-native/rollup.config.mjs
+++ b/packages/react-native/rollup.config.mjs
@@ -12,7 +12,7 @@ const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
export default (commandLineArgs) => {
- const sourcemap = (commandLineArgs.sourceMap || 'true') == 'true';
+ const sourceMap = (commandLineArgs.sourceMap || 'true') == 'true';
// Clears rollup CLI warning https://github.com/rollup/rollup/issues/2694
delete commandLineArgs.sourceMap;
@@ -22,7 +22,7 @@ export default (commandLineArgs) => {
output: {
file: 'dist/index.js',
format: 'cjs',
- sourcemap: sourcemap
+ sourcemap: sourceMap
},
plugins: [
// We do this so that we can inject on BSON's crypto usage.
@@ -54,7 +54,7 @@ export default (commandLineArgs) => {
}
]
}),
- terser({ sourceMap: sourcemap })
+ terser({ sourceMap })
],
external: [
'@journeyapps/react-native-quick-sqlite',
diff --git a/packages/react/README.md b/packages/react/README.md
index e02344e50..ff10730e1 100644
--- a/packages/react/README.md
+++ b/packages/react/README.md
@@ -69,7 +69,7 @@ const Component = () => {
## Reactive Queries
-The `useQuery` hook allows you to access the results of a watched query. Queries will automatically update when a dependant table is updated unless you set the `runQueryOnce` flag. You are also able to use a compilable query (e.g. [Kysely queries](https://github.com/powersync-ja/powersync-js/tree/main/packages/kysely-driver)) as a query argument in place of a SQL statement string.
+The `useQuery` hook allows you to access the results of a watched query. Queries will automatically update when a dependent table is updated unless you set the `runQueryOnce` flag. You are also able to use a compilable query (e.g. [Kysely queries](https://github.com/powersync-ja/powersync-js/tree/main/packages/kysely-driver)) as a query argument in place of a SQL statement string.
```JSX
// TodoListDisplay.jsx
@@ -271,3 +271,148 @@ export const TodoListDisplaySuspense = () => {
);
};
```
+
+## Preventing Unnecessary Renders
+
+The `useQuery` hook returns a stateful object which contains query fetching/loading state values and the query result set data.
+
+```tsx
+function MyWidget() {
+ // ... Widget code
+ // result is an object which contains `isLoading`, `isFetching`, `data` members.
+ const result = useQuery(...)
+
+ // ... Widget code
+}
+```
+
+### High Order Components
+
+The returned object is a new JS object reference whenever the internal state changes e.g. if the query `isFetching` alternates in value. The parent component which calls `useQuery` will render each time the watched query state changes - this can result in other child widgets re-rendering if they are not memoized. Using the `result` object in child component props will cause those children to re-render on any state change of the watched query. The first step to avoid re-renders is to call `useQuery` in a Higher Order Component which passes query results to memoized children.
+
+```tsx
+function MyWidget() {
+ // ... Widget code
+ // result is an object which contains `isLoading`, `isFetching`, `data` members.
+ const {data, error, isLoading} = useQuery(...)
+
+ // ... Widget code
+
+ return (
+ // ... Other components
+
+ // If MyWatchedWidget is not memoized
+ // - It will rerender on any state change of the watched query. E.g. if isFetching alternates
+ // If MyWatchedWidget is memoized
+ // - It will re-render if the data reference changes. By default the data reference changes after any
+ // change to the query's dependent tables. This can be optimized by using Incremental queries.
+
+ )
+}
+```
+
+### Incremental Queries
+
+By default watched queries are queried whenever a change to the underlying tables has been detected. These changes might not be relevant to the actual query, but will still trigger a query and `data` update.
+
+```tsx
+function MyWidget() {
+ // ... Widget code
+ // This query will update with a new data Array whenever any change is made to the `cats` table
+ // E.g. `INSERT INTO cats(name) VALUES ('silvester')` will return a new Array reference for `data`
+ const { data } = useQuery(`SELECT * FROM cats WHERE name = 'bob'`)
+
+ // ... Widget code
+
+ return (
+ // Other components
+ // This will rerender for any change to the `cats` table
+ // Memoization cannot prevent this component from re-rendering since `data[0]` is always new object reference
+ // whenever a query has been triggered
+
+ )
+}
+```
+
+Incremental watched queries ensure that the `data` member of the `useQuery` result maintains the same Array reference if the result set is unchanged.
+Additionally, the internal array items maintain object references when unchanged.
+
+```tsx
+function MyWidget() {
+ // ... Widget code
+ // This query will be fetched/queried whenever any change is made to the `cats` table.
+ // The `data` reference will only be changed if there have been changes since the previous value.
+ // This method performs a comparison in memory in order to determine changes.
+ // Note that isFetching is set (by default) whenever the query is being fetched/checked.
+ // This will result in `MyWidget` re-rendering for any change to the `cats` table.
+ const { data, isLoading, isFetching } = useQuery(`SELECT * FROM cats WHERE breed = 'tabby'`, [], {
+ rowComparator: {
+ keyBy: (item) => item.id,
+ compareBy: (item) => JSON.stringify(item)
+ }
+ })
+
+ // ... Widget code
+
+ return (
+ // Other components
+ // The data array is the same reference if no changes have occurred between fetches
+ // Note: The array is a new reference is there are any changes in the result set (individual row object references are preserved for unchanged rows)
+ // Note: CatCollection requires memoization in order to prevent re-rendering (due to the parent re-rendering on fetch)
+
+ )
+}
+```
+
+`useQuery` can be configured to disable reporting `isFetching` status. Disabling this setting reduces the number of events emitted from the hook, which can reduce renders in some circumstances.
+
+```tsx
+function MyWidget() {
+ // ... Widget code
+ // This query will be fetched/queried whenever any change is made to the `cats` table.
+ // The `data` reference will only be changed if there have been changes since the previous value.
+ // When reportFetching == false the object returned from useQuery will only be changed when the data, isLoading or error state changes.
+ // This method performs a comparison in memory in order to determine changes.
+ const { data, isLoading } = useQuery(`SELECT * FROM cats WHERE breed = 'tabby'`, [], {
+ rowComparator: {
+ keyBy: (item) => item.id,
+ compareBy: (item) => JSON.stringify(item)
+ }
+ reportFetching: false
+ })
+
+ // ... Widget code
+
+ return (
+ // Other components
+ // The data array is the same reference if no changes have occurred between fetches
+ // Note: The array is a new reference is there are any changes in the result set (individual row object references are not preserved)
+
+ )
+}
+```
+
+## Query Subscriptions
+
+The `useWatchedQuerySubscription` hook lets you access the state of an externally managed `WatchedQuery` instance. Managing a query outside of a component enables in-memory caching and sharing of results between multiple subscribers. This reduces async loading time during component mount (thanks to in-memory caching) and minimizes the number of SQLite queries (by sharing results between multiple components).
+
+```jsx
+// The lifecycle of this query is managed outside of any individual React component.
+// The data is kept up-to-date in the background and can be shared by multiple subscribers.
+const listsQuery = powerSync.query({ sql: 'SELECT * FROM lists' }).differentialWatch();
+
+export const ContentComponent = () => {
+ // Subscribes to the `listsQuery` instance. The subscription is automatically
+ // cleaned up when the component unmounts. The `data` value always reflects
+ // the latest state of the query.
+ const { data: lists } = useWatchedQuerySubscription(listsQuery);
+
+ return (
+
+ {lists.map((l) => (
+ {JSON.stringify(l)}
+ ))}
+
+ );
+};
+```
diff --git a/packages/react/package.json b/packages/react/package.json
index 475b52933..2c7cfcdb8 100644
--- a/packages/react/package.json
+++ b/packages/react/package.json
@@ -34,10 +34,13 @@
},
"devDependencies": {
"@powersync/common": "workspace:*",
+ "@powersync/web": "workspace:*",
"@testing-library/react": "^15.0.2",
"@types/react": "^18.3.1",
+ "chart.js": "^4.5.0",
"jsdom": "^24.0.0",
"react": "18.3.1",
+ "react-dom": "18.3.1",
"react-error-boundary": "^4.1.0"
}
}
diff --git a/packages/react/src/QueryStore.ts b/packages/react/src/QueryStore.ts
index a1954851e..36e709316 100644
--- a/packages/react/src/QueryStore.ts
+++ b/packages/react/src/QueryStore.ts
@@ -1,32 +1,69 @@
-import { AbstractPowerSyncDatabase } from '@powersync/common';
-import { Query, WatchedQuery } from './WatchedQuery';
-import { AdditionalOptions } from './hooks/useQuery';
+import {
+ AbstractPowerSyncDatabase,
+ WatchCompatibleQuery,
+ WatchedQuery,
+ WatchedQueryListenerEvent
+} from '@powersync/common';
+import { DifferentialHookOptions } from './hooks/watched/watch-types';
-export function generateQueryKey(sqlStatement: string, parameters: any[], options: AdditionalOptions): string {
+export function generateQueryKey(
+ sqlStatement: string,
+ parameters: ReadonlyArray,
+ options: DifferentialHookOptions
+): string {
return `${sqlStatement} -- ${JSON.stringify(parameters)} -- ${JSON.stringify(options)}`;
}
export class QueryStore {
- cache = new Map();
+ cache = new Map>();
constructor(private db: AbstractPowerSyncDatabase) {}
- getQuery(key: string, query: Query, options: AdditionalOptions) {
+ getQuery(key: string, query: WatchCompatibleQuery, options: DifferentialHookOptions) {
if (this.cache.has(key)) {
- return this.cache.get(key);
+ return this.cache.get(key) as WatchedQuery;
}
- const q = new WatchedQuery(this.db, query, options);
- const disposer = q.registerListener({
- disposed: () => {
+ const watch = options.rowComparator
+ ? this.db.customQuery(query).differentialWatch({
+ rowComparator: options.rowComparator,
+ reportFetching: options.reportFetching,
+ throttleMs: options.throttleMs
+ })
+ : this.db.customQuery(query).watch({
+ reportFetching: options.reportFetching,
+ throttleMs: options.throttleMs
+ });
+
+ this.cache.set(key, watch);
+
+ const disposer = watch.registerListener({
+ closed: () => {
this.cache.delete(key);
disposer?.();
}
});
- this.cache.set(key, q);
+ watch.listenerMeta.registerListener({
+ listenersChanged: (counts) => {
+ // Dispose this query if there are no subscribers present
+ // We don't use the total here since we don't want to consider `onclose` listeners
+ const relevantCounts = [
+ WatchedQueryListenerEvent.ON_DATA,
+ WatchedQueryListenerEvent.ON_STATE_CHANGE,
+ WatchedQueryListenerEvent.ON_ERROR
+ ].reduce((sum, event) => {
+ return sum + (counts[event] || 0);
+ }, 0);
+
+ if (relevantCounts == 0) {
+ watch.close();
+ this.cache.delete(key);
+ }
+ }
+ });
- return q;
+ return watch;
}
}
diff --git a/packages/react/src/WatchedQuery.ts b/packages/react/src/WatchedQuery.ts
deleted file mode 100644
index 250f91d6d..000000000
--- a/packages/react/src/WatchedQuery.ts
+++ /dev/null
@@ -1,194 +0,0 @@
-import {
- AbstractPowerSyncDatabase,
- BaseListener,
- BaseObserver,
- CompilableQuery,
- Disposable,
- runOnSchemaChange
-} from '@powersync/common';
-import { AdditionalOptions } from './hooks/useQuery';
-
-export class Query {
- rawQuery: string | CompilableQuery;
- sqlStatement: string;
- queryParameters: any[];
-}
-
-export interface WatchedQueryListener extends BaseListener {
- onUpdate: () => void;
- disposed: () => void;
-}
-
-export class WatchedQuery extends BaseObserver implements Disposable {
- readyPromise: Promise;
- isReady: boolean = false;
- currentData: any[] | undefined;
- currentError: any;
- tables: any[] | undefined;
-
- private temporaryHolds = new Set();
- private controller: AbortController | undefined;
- private db: AbstractPowerSyncDatabase;
-
- private resolveReady: undefined | (() => void);
-
- readonly query: Query;
- readonly options: AdditionalOptions;
-
- constructor(db: AbstractPowerSyncDatabase, query: Query, options: AdditionalOptions) {
- super();
- this.db = db;
- this.query = query;
- this.options = options;
-
- this.readyPromise = new Promise((resolve) => {
- this.resolveReady = resolve;
- });
- }
-
- get logger() {
- return this.db.logger ?? console;
- }
-
- addTemporaryHold() {
- const ref = new Object();
- this.temporaryHolds.add(ref);
- this.maybeListen();
-
- let timeout: any;
- const release = () => {
- this.temporaryHolds.delete(ref);
- if (timeout) {
- clearTimeout(timeout);
- }
- this.maybeDispose();
- };
-
- const timeoutRelease = () => {
- if (this.isReady || this.controller == null) {
- release();
- } else {
- // If the query is taking long, keep the temporary hold.
- timeout = setTimeout(timeoutRelease, 5_000);
- }
- };
-
- timeout = setTimeout(timeoutRelease, 5_000);
-
- return release;
- }
-
- registerListener(listener: Partial): () => void {
- const disposer = super.registerListener(listener);
-
- this.maybeListen();
- return () => {
- disposer();
- this.maybeDispose();
- };
- }
-
- private async fetchTables() {
- try {
- this.tables = await this.db.resolveTables(this.query.sqlStatement, this.query.queryParameters, this.options);
- } catch (e) {
- this.logger.error('Failed to fetch tables:', e);
- this.setError(e);
- }
- }
-
- async fetchData() {
- try {
- const result =
- typeof this.query.rawQuery == 'string'
- ? await this.db.getAll(this.query.sqlStatement, this.query.queryParameters)
- : await this.query.rawQuery.execute();
-
- const data = result ?? [];
- this.setData(data);
- } catch (e) {
- this.logger.error('Failed to fetch data:', e);
- this.setError(e);
- }
- }
-
- private maybeListen() {
- if (this.controller != null) {
- return;
- }
-
- if (this.onUpdateListenersCount() == 0 && this.temporaryHolds.size == 0) {
- return;
- }
-
- const controller = new AbortController();
- this.controller = controller;
-
- const onError = (error: Error) => {
- this.setError(error);
- };
-
- const watchQuery = async (abortSignal: AbortSignal) => {
- await this.fetchTables();
- await this.fetchData();
-
- if (!this.options.runQueryOnce) {
- this.db.onChangeWithCallback(
- {
- onChange: async () => {
- await this.fetchData();
- },
- onError
- },
- {
- ...this.options,
- signal: abortSignal,
- tables: this.tables
- }
- );
- }
- };
- runOnSchemaChange(watchQuery, this.db, { signal: this.controller.signal });
- }
-
- private setData(results: any[]) {
- this.isReady = true;
- this.currentData = results;
- this.currentError = undefined;
- this.resolveReady?.();
-
- this.iterateListeners((l) => l.onUpdate?.());
- }
-
- private setError(error: any) {
- this.isReady = true;
- this.currentData = undefined;
- this.currentError = error;
- this.resolveReady?.();
-
- this.iterateListeners((l) => l.onUpdate?.());
- }
-
- private onUpdateListenersCount(): number {
- return Array.from(this.listeners).filter((listener) => listener.onUpdate !== undefined).length;
- }
-
- private maybeDispose() {
- if (this.onUpdateListenersCount() == 0 && this.temporaryHolds.size == 0) {
- this.controller?.abort();
- this.controller = undefined;
- this.isReady = false;
- this.currentData = undefined;
- this.currentError = undefined;
- this.dispose();
-
- this.readyPromise = new Promise((resolve, reject) => {
- this.resolveReady = resolve;
- });
- }
- }
-
- async dispose() {
- this.iterateAsyncListeners(async (l) => l.disposed?.());
- }
-}
diff --git a/packages/react/src/hooks/usePowerSyncQuery.ts b/packages/react/src/hooks/deprecated/usePowerSyncQuery.ts
similarity index 94%
rename from packages/react/src/hooks/usePowerSyncQuery.ts
rename to packages/react/src/hooks/deprecated/usePowerSyncQuery.ts
index 59feb1d65..9f490274c 100644
--- a/packages/react/src/hooks/usePowerSyncQuery.ts
+++ b/packages/react/src/hooks/deprecated/usePowerSyncQuery.ts
@@ -1,5 +1,5 @@
import React from 'react';
-import { usePowerSync } from './PowerSyncContext';
+import { usePowerSync } from '../PowerSyncContext';
/**
* @deprecated use {@link useQuery} instead.
diff --git a/packages/react/src/hooks/usePowerSyncStatus.ts b/packages/react/src/hooks/deprecated/usePowerSyncStatus.ts
similarity index 93%
rename from packages/react/src/hooks/usePowerSyncStatus.ts
rename to packages/react/src/hooks/deprecated/usePowerSyncStatus.ts
index 0798db66a..34a3ee91e 100644
--- a/packages/react/src/hooks/usePowerSyncStatus.ts
+++ b/packages/react/src/hooks/deprecated/usePowerSyncStatus.ts
@@ -1,5 +1,5 @@
import { useContext, useEffect, useState } from 'react';
-import { PowerSyncContext } from './PowerSyncContext';
+import { PowerSyncContext } from '../PowerSyncContext';
/**
* @deprecated Use {@link useStatus} instead.
diff --git a/packages/react/src/hooks/usePowerSyncWatchedQuery.ts b/packages/react/src/hooks/deprecated/usePowerSyncWatchedQuery.ts
similarity index 96%
rename from packages/react/src/hooks/usePowerSyncWatchedQuery.ts
rename to packages/react/src/hooks/deprecated/usePowerSyncWatchedQuery.ts
index 7521f6b8a..581823afc 100644
--- a/packages/react/src/hooks/usePowerSyncWatchedQuery.ts
+++ b/packages/react/src/hooks/deprecated/usePowerSyncWatchedQuery.ts
@@ -1,6 +1,6 @@
import { SQLWatchOptions } from '@powersync/common';
import React from 'react';
-import { usePowerSync } from './PowerSyncContext';
+import { usePowerSync } from '../PowerSyncContext';
/**
* @deprecated use {@link useQuery} instead.
diff --git a/packages/react/src/hooks/suspense/SuspenseQueryResult.ts b/packages/react/src/hooks/suspense/SuspenseQueryResult.ts
new file mode 100644
index 000000000..d8b0f7b7e
--- /dev/null
+++ b/packages/react/src/hooks/suspense/SuspenseQueryResult.ts
@@ -0,0 +1,4 @@
+import { QueryResult, ReadonlyQueryResult } from '../watched/watch-types';
+
+export type SuspenseQueryResult = Pick, 'data' | 'refresh'>;
+export type ReadonlySuspenseQueryResult = Pick, 'data' | 'refresh'>;
diff --git a/packages/react/src/hooks/suspense/suspense-utils.ts b/packages/react/src/hooks/suspense/suspense-utils.ts
new file mode 100644
index 000000000..f80621587
--- /dev/null
+++ b/packages/react/src/hooks/suspense/suspense-utils.ts
@@ -0,0 +1,90 @@
+import { WatchedQuery } from '@powersync/common';
+import React from 'react';
+
+/**
+ * The store will dispose this query if it has no subscribers attached to it.
+ * The suspense promise adds a subscriber to the query, but the promise could resolve
+ * before this component is committed. The promise will release it's listener once the query is no longer loading.
+ * This temporary hold is used to ensure that the query is not disposed in the interim.
+ * Creates a subscription for state change which creates a temporary hold on the query
+ * @returns a function to release the hold
+ */
+export const useTemporaryHold = (watchedQuery?: WatchedQuery) => {
+ const releaseTemporaryHold = React.useRef<(() => void) | undefined>(undefined);
+ const addedHoldTo = React.useRef | undefined>(undefined);
+
+ if (addedHoldTo.current !== watchedQuery) {
+ releaseTemporaryHold.current?.();
+ addedHoldTo.current = watchedQuery;
+
+ if (!watchedQuery || !watchedQuery.state.isLoading) {
+ // No query to hold or no reason to hold, return a no-op
+ return {
+ releaseHold: () => {}
+ };
+ }
+
+ const disposeSubscription = watchedQuery.registerListener({
+ onStateChange: (state) => {}
+ });
+
+ let timeout: ReturnType;
+
+ const disposeClosedListener = watchedQuery.registerListener({
+ closed: () => {
+ if (timeout) {
+ clearTimeout(timeout);
+ }
+ disposeClosedListener();
+ }
+ });
+
+ const releaseHold = () => {
+ disposeSubscription();
+ disposeClosedListener();
+ };
+ releaseTemporaryHold.current = releaseHold;
+
+ const timeoutPollMs = 5_000;
+
+ const checkHold = () => {
+ if (watchedQuery.closed || !watchedQuery.state.isLoading || watchedQuery.state.error) {
+ // No need to keep a temporary hold on this query
+ releaseHold();
+ } else {
+ // Need to keep the hold, check again after timeout
+ setTimeout(checkHold, timeoutPollMs);
+ }
+ };
+
+ // Set a timeout to conditionally remove the temporary hold
+ setTimeout(checkHold, timeoutPollMs);
+ }
+
+ return {
+ releaseHold: releaseTemporaryHold.current
+ };
+};
+
+/**
+ * React suspense relies on a promise that resolves once the initial data has loaded.
+ * This creates a promise which registers a listener on the watched query.
+ * Registering a listener on the watched query will ensure that the query is not disposed
+ * while the component is suspended.
+ */
+export const createSuspendingPromise = (query: WatchedQuery) => {
+ return new Promise((resolve) => {
+ // The listener here will dispose itself once the loading is done
+ // This decreases the number of listeners on the query
+ // even if the component is unmounted
+ const dispose = query.registerListener({
+ onStateChange: (state) => {
+ // Returns to the hook if loading is completed or if loading resulted in an error
+ if (!state.isLoading || state.error) {
+ resolve();
+ dispose();
+ }
+ }
+ });
+ });
+};
diff --git a/packages/react/src/hooks/suspense/useSingleSuspenseQuery.ts b/packages/react/src/hooks/suspense/useSingleSuspenseQuery.ts
new file mode 100644
index 000000000..10adf3e4e
--- /dev/null
+++ b/packages/react/src/hooks/suspense/useSingleSuspenseQuery.ts
@@ -0,0 +1,85 @@
+import { CompilableQuery, WatchedQuery } from '@powersync/common';
+import React from 'react';
+import { generateQueryKey, getQueryStore } from '../../QueryStore';
+import { usePowerSync } from '../PowerSyncContext';
+import { AdditionalOptions } from '../watched/watch-types';
+import { constructCompatibleQuery } from '../watched/watch-utils';
+import { createSuspendingPromise, useTemporaryHold } from './suspense-utils';
+import { SuspenseQueryResult } from './SuspenseQueryResult';
+
+/**
+ * Use a query which is not watched, but suspends until the initial result has loaded.
+ * Internally this uses a WatchedQuery during suspense for state management. The watched
+ * query is potentially disposed, if there are no subscribers attached to it, after the initial load.
+ * The query can be refreshed by calling the `refresh` function after initial load.
+ */
+export const useSingleSuspenseQuery = (
+ query: string | CompilableQuery,
+ parameters: any[] = [],
+ options: AdditionalOptions = {}
+): SuspenseQueryResult => {
+ const powerSync = usePowerSync();
+ if (!powerSync) {
+ throw new Error('PowerSync not configured.');
+ }
+
+ // Manually track data for single queries
+ const [data, setData] = React.useState(null);
+ const [error, setError] = React.useState(null);
+
+ // Note, we don't need to check if the query changed since we fetch the WatchedQuery
+ // from the store given these query params
+ const { parsedQuery } = constructCompatibleQuery(query, parameters, options);
+ const { sql: parsedSql, parameters: parsedParameters } = parsedQuery.compile();
+
+ const key = generateQueryKey(parsedSql, parsedParameters, options);
+ const store = getQueryStore(powerSync);
+
+ // Only use a temporary watched query if we don't have data yet.
+ const watchedQuery = data ? null : (store.getQuery(key, parsedQuery, options) as WatchedQuery);
+ const { releaseHold } = useTemporaryHold(watchedQuery);
+ React.useEffect(() => {
+ // Set the initial yielded data
+ // it should be available once we commit the component
+ if (watchedQuery?.state.error) {
+ setError(watchedQuery.state.error);
+ } else if (watchedQuery?.state.isLoading === false) {
+ setData(watchedQuery.state.data);
+ setError(null);
+ }
+
+ if (!watchedQuery?.state.isLoading) {
+ releaseHold();
+ }
+ }, []);
+
+ if (error != null) {
+ // Report errors - this is caught by an error boundary
+ throw error;
+ } else if (data || watchedQuery?.state.isLoading === false) {
+ // Happy path data return
+ return {
+ data: data ?? watchedQuery?.state.data ?? [],
+ refresh: async (signal) => {
+ try {
+ const compiledQuery = parsedQuery.compile();
+ const result = await parsedQuery.execute({
+ sql: compiledQuery.sql,
+ parameters: [...compiledQuery.parameters],
+ db: powerSync
+ });
+ if (signal.aborted) {
+ return; // Abort if the signal is already aborted
+ }
+ setData(result);
+ setError(null);
+ } catch (e) {
+ setError(e);
+ }
+ }
+ };
+ } else {
+ // Notify suspense is required
+ throw createSuspendingPromise(watchedQuery!);
+ }
+};
diff --git a/packages/react/src/hooks/suspense/useSuspenseQuery.ts b/packages/react/src/hooks/suspense/useSuspenseQuery.ts
new file mode 100644
index 000000000..2a678735b
--- /dev/null
+++ b/packages/react/src/hooks/suspense/useSuspenseQuery.ts
@@ -0,0 +1,76 @@
+import { CompilableQuery } from '@powersync/common';
+import { AdditionalOptions, DifferentialHookOptions } from '../watched/watch-types';
+import { ReadonlySuspenseQueryResult, SuspenseQueryResult } from './SuspenseQueryResult';
+import { useSingleSuspenseQuery } from './useSingleSuspenseQuery';
+import { useWatchedSuspenseQuery } from './useWatchedSuspenseQuery';
+
+/**
+ * A hook to access the results of a watched query that suspends until the initial result has loaded.
+ * @example
+ * export const ContentComponent = () => {
+ * // The lists array here will be a new Array reference whenever a change to the
+ * // lists table is made.
+ * const { data: lists } = useSuspenseQuery('SELECT * from lists');
+ *
+ * return
+ * {lists.map((l) => (
+ * {JSON.stringify(l)}
+ * ))}
+ * ;
+ * }
+ *
+ * export const DisplayComponent = () => {
+ * return (
+ * Loading content...}>
+ *
+ *
+ * );
+ * }
+ *
+ * export const DiffContentComponent = () => {
+ * // A differential query will emit results when a change to the result set occurs.
+ * // The internal array object references are maintained for unchanged rows.
+ * // The returned lists array is read only when a `rowComparator` is provided.
+ * const { data: lists } = useSuspenseQuery('SELECT * from lists', [], {
+ * rowComparator: {
+ * keyBy: (item) => item.id,
+ * compareBy: (item) => JSON.stringify(item)
+ * }
+ * });
+ * return
+ * {lists.map((l) => (
+ * {JSON.stringify(l)}
+ * ))}
+ * ;
+ * }
+ *
+ * export const DisplayComponent = () => {
+ * return (
+ * Loading content...}>
+ *
+ *
+ * );
+ * }
+ */
+export function useSuspenseQuery(
+ query: string | CompilableQuery,
+ parameters?: any[],
+ options?: AdditionalOptions
+): SuspenseQueryResult;
+export function useSuspenseQuery(
+ query: string | CompilableQuery,
+ paramerers?: any[],
+ options?: DifferentialHookOptions
+): ReadonlySuspenseQueryResult;
+export function useSuspenseQuery(
+ query: string | CompilableQuery,
+ parameters: any[] = [],
+ options: AdditionalOptions & DifferentialHookOptions