Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/nice-dragons-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/common': minor
'@powersync/node': minor
'@powersync/react-native': minor
'@powersync/web': minor
---

Add `getCrudTransactions()`, returning an async iterator of transactions. This can be used to batch transactions when uploading CRUD data.
5 changes: 5 additions & 0 deletions .github/workflows/test-simulators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ jobs:
with:
persist-credentials: false

- name: Set up XCode
uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: latest-stable

- name: CocoaPods Cache
uses: actions/cache@v3
id: cocoapods-cache
Expand Down
99 changes: 72 additions & 27 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { throttleTrailing } from '../utils/async.js';
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
import { ConnectionManager } from './ConnectionManager.js';
import { CustomQuery } from './CustomQuery.js';
import { ArrayQueryDefinition, Query } from './Query.js';
Expand Down Expand Up @@ -632,35 +632,80 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @returns A transaction of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
return await this.readTransaction(async (tx) => {
const first = await tx.getOptional<CrudEntryJSON>(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`
);
const iterator = this.getCrudTransactions()[symbolAsyncIterator]();
return (await iterator.next()).value;
}

if (!first) {
return null;
}
const txId = first.tx_id;
/**
* Returns an async iterator of completed transactions with local writes against the database.
*
* This is typically used from the {@link PowerSyncBackendConnector.uploadData} callback. Each entry emitted by the
* returned iterator is a full transaction containing all local writes made while that transaction was active.
*
* Unlike {@link getNextCrudTransaction}, which always returns the oldest transaction that hasn't been
* {@link CrudTransaction.complete}d yet, this iterator can be used to receive multiple transactions. Calling
* {@link CrudTransaction.complete} will mark that and all prior transactions emitted by the iterator as completed.
*
* This can be used to upload multiple transactions in a single batch, e.g with:
*
* ```JavaScript
* let lastTransaction = null;
* let batch = [];
*
* for await (const transaction of database.getCrudTransactions()) {
* batch.push(...transaction.crud);
* lastTransaction = transaction;
*
* if (batch.length > 10) {
* break;
* }
* }
* ```
*
* If there is no local data to upload, the async iterator complete without emitting any items.
*
* Note that iterating over async iterables requires a [polyfill](https://github.com/powersync-ja/powersync-js/tree/main/packages/react-native#babel-plugins-watched-queries)
* for React Native.
*/
getCrudTransactions(): AsyncIterable<CrudTransaction, null> {
return {
[symbolAsyncIterator]: () => {
let lastCrudItemId = -1;
const sql = `
WITH RECURSIVE crud_entries AS (
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
UNION ALL
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
WHERE crud_entries.tx_id = ps_crud.tx_id
)
SELECT * FROM crud_entries;
`;

return {
next: async () => {
const nextTransaction = await this.database.getAll<CrudEntryJSON>(sql, [lastCrudItemId]);
if (nextTransaction.length == 0) {
return { done: true, value: null };
}

let all: CrudEntry[];
if (!txId) {
all = [CrudEntry.fromRow(first)];
} else {
const result = await tx.getAll<CrudEntryJSON>(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
[txId]
);
all = result.map((row) => CrudEntry.fromRow(row));
const items = nextTransaction.map((row) => CrudEntry.fromRow(row));
const last = items[items.length - 1];
const txId = last.transactionId;
lastCrudItemId = last.clientId;

return {
done: false,
value: new CrudTransaction(
items,
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
txId
)
};
}
};
}

const last = all[all.length - 1];

return new CrudTransaction(
all,
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
txId
);
});
};
}

/**
Expand Down
11 changes: 11 additions & 0 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/**
* A ponyfill for `Symbol.asyncIterator` that is compatible with the
* [recommended polyfill](https://github.com/Azure/azure-sdk-for-js/blob/%40azure/core-asynciterator-polyfill_1.0.2/sdk/core/core-asynciterator-polyfill/src/index.ts#L4-L6)
* we recommend for React Native.
*
* As long as we use this symbol (instead of `for await` and `async *`) in this package, we can be compatible with async
* iterators without requiring them.
*/
export const symbolAsyncIterator: typeof Symbol.asyncIterator =
Symbol.asyncIterator ?? Symbol.for('Symbol.asyncIterator');

/**
* Throttle a function to be called at most once every "wait" milliseconds,
* on the trailing edge.
Expand Down
38 changes: 37 additions & 1 deletion packages/node/tests/PowerSyncDatabase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker } from 'node:worker_threads';

import { vi, expect, test } from 'vitest';
import { AppSchema, databaseTest, tempDirectoryTest } from './utils';
import { PowerSyncDatabase } from '../lib';
import { CrudEntry, CrudTransaction, PowerSyncDatabase } from '../lib';
import { WorkerOpener } from '../lib/db/options';

test('validates options', async () => {
Expand Down Expand Up @@ -131,3 +131,39 @@ databaseTest.skip('can watch queries', async ({ database }) => {
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']);
expect((await query.next()).value.rows).toHaveLength(4);
});

databaseTest('getCrudTransactions', async ({ database }) => {
async function createTransaction(amount: number) {
await database.writeTransaction(async (tx) => {
for (let i = 0; i < amount; i++) {
await tx.execute('insert into todos (id) values (uuid())');
}
});
}

let iterator = database.getCrudTransactions()[Symbol.asyncIterator]();
expect(await iterator.next()).toMatchObject({ done: true });

await createTransaction(5);
await createTransaction(10);
await createTransaction(15);

let lastTransaction: CrudTransaction | null = null;
let batch: CrudEntry[] = [];

// Take the first two transactions via the async generator.
for await (const transaction of database.getCrudTransactions()) {
batch.push(...transaction.crud);
lastTransaction = transaction;

if (batch.length > 10) {
break;
}
}

expect(batch).toHaveLength(15);
await lastTransaction!.complete();

const remainingTransaction = await database.getNextCrudTransaction();
expect(remainingTransaction?.crud).toHaveLength(15);
});
1 change: 1 addition & 0 deletions packages/react-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ npx expo install @journeyapps/react-native-quick-sqlite
Watched queries can be used with either a callback response or Async Iterator response.

Watched queries using the Async Iterator response format require support for Async Iterators.
`PowerSyncDatabase.getCrudTransactions()` also returns an Async Iterator and requires this workaround.

Expo apps currently require polyfill and Babel plugins in order to use this functionality.

Expand Down