diff --git a/.changeset/nice-dragons-smile.md b/.changeset/nice-dragons-smile.md new file mode 100644 index 000000000..89f9bfd61 --- /dev/null +++ b/.changeset/nice-dragons-smile.md @@ -0,0 +1,8 @@ +--- +'@powersync/common': minor +'@powersync/node': minor +'@powersync/react-native': minor +'@powersync/web': minor +--- + +Add `getCrudTransactions()`, returning an async iterator of transactions. This can be used to batch transactions when uploading CRUD data. diff --git a/.github/workflows/test-simulators.yaml b/.github/workflows/test-simulators.yaml index dc96496e6..bf838b828 100644 --- a/.github/workflows/test-simulators.yaml +++ b/.github/workflows/test-simulators.yaml @@ -138,6 +138,11 @@ jobs: with: persist-credentials: false + - name: Set up XCode + uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: latest-stable + - name: CocoaPods Cache uses: actions/cache@v3 id: cocoapods-cache diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 6050057f8..f5b0397da 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -15,7 +15,7 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; -import { throttleTrailing } from '../utils/async.js'; +import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js'; import { ConnectionManager } from './ConnectionManager.js'; import { CustomQuery } from './CustomQuery.js'; import { ArrayQueryDefinition, Query } from './Query.js'; @@ -632,35 +632,80 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { - return await this.readTransaction(async (tx) => { - const first = await tx.getOptional( - `SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1` - ); + const iterator = this.getCrudTransactions()[symbolAsyncIterator](); + return (await iterator.next()).value; + } - if (!first) { - return null; - } - const txId = first.tx_id; + /** + * Returns an async iterator of completed transactions with local writes against the database. + * + * This is typically used from the {@link PowerSyncBackendConnector.uploadData} callback. Each entry emitted by the + * returned iterator is a full transaction containing all local writes made while that transaction was active. + * + * Unlike {@link getNextCrudTransaction}, which always returns the oldest transaction that hasn't been + * {@link CrudTransaction.complete}d yet, this iterator can be used to receive multiple transactions. Calling + * {@link CrudTransaction.complete} will mark that and all prior transactions emitted by the iterator as completed. + * + * This can be used to upload multiple transactions in a single batch, e.g with: + * + * ```JavaScript + * let lastTransaction = null; + * let batch = []; + * + * for await (const transaction of database.getCrudTransactions()) { + * batch.push(...transaction.crud); + * lastTransaction = transaction; + * + * if (batch.length > 10) { + * break; + * } + * } + * ``` + * + * If there is no local data to upload, the async iterator complete without emitting any items. + * + * Note that iterating over async iterables requires a [polyfill](https://github.com/powersync-ja/powersync-js/tree/main/packages/react-native#babel-plugins-watched-queries) + * for React Native. + */ + getCrudTransactions(): AsyncIterable { + return { + [symbolAsyncIterator]: () => { + let lastCrudItemId = -1; + const sql = ` +WITH RECURSIVE crud_entries AS ( + SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?) + UNION ALL + SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud + INNER JOIN crud_entries ON crud_entries.id + 1 = rowid + WHERE crud_entries.tx_id = ps_crud.tx_id +) +SELECT * FROM crud_entries; + `; + + return { + next: async () => { + const nextTransaction = await this.database.getAll(sql, [lastCrudItemId]); + if (nextTransaction.length == 0) { + return { done: true, value: null }; + } - let all: CrudEntry[]; - if (!txId) { - all = [CrudEntry.fromRow(first)]; - } else { - const result = await tx.getAll( - `SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`, - [txId] - ); - all = result.map((row) => CrudEntry.fromRow(row)); + const items = nextTransaction.map((row) => CrudEntry.fromRow(row)); + const last = items[items.length - 1]; + const txId = last.transactionId; + lastCrudItemId = last.clientId; + + return { + done: false, + value: new CrudTransaction( + items, + async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint), + txId + ) + }; + } + }; } - - const last = all[all.length - 1]; - - return new CrudTransaction( - all, - async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint), - txId - ); - }); + }; } /** diff --git a/packages/common/src/utils/async.ts b/packages/common/src/utils/async.ts index d869f8c36..64f639516 100644 --- a/packages/common/src/utils/async.ts +++ b/packages/common/src/utils/async.ts @@ -1,3 +1,14 @@ +/** + * A ponyfill for `Symbol.asyncIterator` that is compatible with the + * [recommended polyfill](https://github.com/Azure/azure-sdk-for-js/blob/%40azure/core-asynciterator-polyfill_1.0.2/sdk/core/core-asynciterator-polyfill/src/index.ts#L4-L6) + * we recommend for React Native. + * + * As long as we use this symbol (instead of `for await` and `async *`) in this package, we can be compatible with async + * iterators without requiring them. + */ +export const symbolAsyncIterator: typeof Symbol.asyncIterator = + Symbol.asyncIterator ?? Symbol.for('Symbol.asyncIterator'); + /** * Throttle a function to be called at most once every "wait" milliseconds, * on the trailing edge. diff --git a/packages/node/tests/PowerSyncDatabase.test.ts b/packages/node/tests/PowerSyncDatabase.test.ts index 28ab4b31f..88732522f 100644 --- a/packages/node/tests/PowerSyncDatabase.test.ts +++ b/packages/node/tests/PowerSyncDatabase.test.ts @@ -3,7 +3,7 @@ import { Worker } from 'node:worker_threads'; import { vi, expect, test } from 'vitest'; import { AppSchema, databaseTest, tempDirectoryTest } from './utils'; -import { PowerSyncDatabase } from '../lib'; +import { CrudEntry, CrudTransaction, PowerSyncDatabase } from '../lib'; import { WorkerOpener } from '../lib/db/options'; test('validates options', async () => { @@ -131,3 +131,39 @@ databaseTest.skip('can watch queries', async ({ database }) => { await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']); expect((await query.next()).value.rows).toHaveLength(4); }); + +databaseTest('getCrudTransactions', async ({ database }) => { + async function createTransaction(amount: number) { + await database.writeTransaction(async (tx) => { + for (let i = 0; i < amount; i++) { + await tx.execute('insert into todos (id) values (uuid())'); + } + }); + } + + let iterator = database.getCrudTransactions()[Symbol.asyncIterator](); + expect(await iterator.next()).toMatchObject({ done: true }); + + await createTransaction(5); + await createTransaction(10); + await createTransaction(15); + + let lastTransaction: CrudTransaction | null = null; + let batch: CrudEntry[] = []; + + // Take the first two transactions via the async generator. + for await (const transaction of database.getCrudTransactions()) { + batch.push(...transaction.crud); + lastTransaction = transaction; + + if (batch.length > 10) { + break; + } + } + + expect(batch).toHaveLength(15); + await lastTransaction!.complete(); + + const remainingTransaction = await database.getNextCrudTransaction(); + expect(remainingTransaction?.crud).toHaveLength(15); +}); diff --git a/packages/react-native/README.md b/packages/react-native/README.md index 090bdf219..df5cfadb2 100644 --- a/packages/react-native/README.md +++ b/packages/react-native/README.md @@ -37,6 +37,7 @@ npx expo install @journeyapps/react-native-quick-sqlite Watched queries can be used with either a callback response or Async Iterator response. Watched queries using the Async Iterator response format require support for Async Iterators. +`PowerSyncDatabase.getCrudTransactions()` also returns an Async Iterator and requires this workaround. Expo apps currently require polyfill and Babel plugins in order to use this functionality.