Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/nice-dragons-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/common': minor
'@powersync/node': minor
'@powersync/react-native': minor
'@powersync/web': minor
---

Add `getCrudTransactions()`, returning an async iterator of transactions. This can be used to batch transactions when uploading CRUD data.
99 changes: 72 additions & 27 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { throttleTrailing } from '../utils/async.js';
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
import { ConnectionManager } from './ConnectionManager.js';
import { CustomQuery } from './CustomQuery.js';
import { ArrayQueryDefinition, Query } from './Query.js';
Expand Down Expand Up @@ -632,35 +632,80 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @returns A transaction of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
return await this.readTransaction(async (tx) => {
const first = await tx.getOptional<CrudEntryJSON>(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`
);
const iterator = this.getCrudTransactions()[symbolAsyncIterator]();
return (await iterator.next()).value;
}

if (!first) {
return null;
}
const txId = first.tx_id;
/**
* Returns an async iterator of completed transactions with local writes against the database.
*
* This is typically used from the {@link PowerSyncBackendConnector.uploadData} callback. Each entry emitted by the
* returned iterator is a full transaction containing all local writes made while that transaction was active.
*
* Unlike {@link getNextCrudTransaction}, which always returns the oldest transaction that hasn't been
* {@link CrudTransaction.complete}d yet, this iterator can be used to receive multiple transactions. Calling
* {@link CrudTransaction.complete} will mark that and all prior transactions emitted by the iterator as completed.
*
* This can be used to upload multiple transactions in a single batch, e.g with:
*
* ```JavaScript
* let lastTransaction = null;
* let batch = [];
*
* for await (const transaction of database.getCrudTransactions()) {
* batch.push(...transaction.crud);
* lastTransaction = transaction;
*
* if (batch.length > 10) {
* break;
* }
* }
* ```
*
* If there is no local data to upload, the async iterator complete without emitting any items.
*
* Note that iterating over async iterables requires a [polyfill](https://github.com/powersync-ja/powersync-js/tree/main/packages/react-native#babel-plugins-watched-queries)
* for React Native.
*/
getCrudTransactions(): AsyncIterable<CrudTransaction, null> {
return {
[symbolAsyncIterator]: () => {
let lastCrudItemId = -1;
const sql = `
WITH RECURSIVE crud_entries AS (
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
UNION ALL
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
WHERE crud_entries.tx_id = ps_crud.tx_id
)
SELECT * FROM crud_entries;
`;

return {
next: async () => {
const nextTransaction = await this.database.getAll<CrudEntryJSON>(sql, [lastCrudItemId]);
if (nextTransaction.length == 0) {
return { done: true, value: null };
}

let all: CrudEntry[];
if (!txId) {
all = [CrudEntry.fromRow(first)];
} else {
const result = await tx.getAll<CrudEntryJSON>(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
[txId]
);
all = result.map((row) => CrudEntry.fromRow(row));
const items = nextTransaction.map((row) => CrudEntry.fromRow(row));
const last = items[items.length - 1];
const txId = last.transactionId;
lastCrudItemId = last.clientId;

return {
done: false,
value: new CrudTransaction(
items,
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
txId
)
};
}
};
}

const last = all[all.length - 1];

return new CrudTransaction(
all,
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
txId
);
});
};
}

/**
Expand Down
11 changes: 11 additions & 0 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/**
* A ponyfill for `Symbol.asyncIterator` that is compatible with the
* [recommended polyfill](https://github.com/Azure/azure-sdk-for-js/blob/%40azure/core-asynciterator-polyfill_1.0.2/sdk/core/core-asynciterator-polyfill/src/index.ts#L4-L6)
* we recommend for React Native.
*
* As long as we use this symbol (instead of `for await` and `async *`) in this package, we can be compatible with async
* iterators without requiring them.
*/
export const symbolAsyncIterator: typeof Symbol.asyncIterator =
Symbol.asyncIterator ?? Symbol.for('Symbol.asyncIterator');

/**
* Throttle a function to be called at most once every "wait" milliseconds,
* on the trailing edge.
Expand Down
38 changes: 37 additions & 1 deletion packages/node/tests/PowerSyncDatabase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker } from 'node:worker_threads';

import { vi, expect, test } from 'vitest';
import { AppSchema, databaseTest, tempDirectoryTest } from './utils';
import { PowerSyncDatabase } from '../lib';
import { CrudEntry, CrudTransaction, PowerSyncDatabase } from '../lib';
import { WorkerOpener } from '../lib/db/options';

test('validates options', async () => {
Expand Down Expand Up @@ -131,3 +131,39 @@ databaseTest.skip('can watch queries', async ({ database }) => {
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']);
expect((await query.next()).value.rows).toHaveLength(4);
});

databaseTest('getCrudTransactions', async ({ database }) => {
async function createTransaction(amount: number) {
await database.writeTransaction(async (tx) => {
for (let i = 0; i < amount; i++) {
await tx.execute('insert into todos (id) values (uuid())');
}
});
}

let iterator = database.getCrudTransactions()[Symbol.asyncIterator]();
expect(await iterator.next()).toMatchObject({ done: true });

await createTransaction(5);
await createTransaction(10);
await createTransaction(15);

let lastTransaction: CrudTransaction | null = null;
let batch: CrudEntry[] = [];

// Take the first two transactions via the async generator.
for await (const transaction of database.getCrudTransactions()) {
batch.push(...transaction.crud);
lastTransaction = transaction;

if (batch.length > 10) {
break;
}
}

expect(batch).toHaveLength(15);
await lastTransaction!.complete();

const remainingTransaction = await database.getNextCrudTransaction();
expect(remainingTransaction?.crud).toHaveLength(15);
});
1 change: 1 addition & 0 deletions packages/react-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ npx expo install @journeyapps/react-native-quick-sqlite
Watched queries can be used with either a callback response or Async Iterator response.

Watched queries using the Async Iterator response format require support for Async Iterators.
`PowerSyncDatabase.getCrudTransactions()` also returns an Async Iterator and requires this workaround.

Expo apps currently require polyfill and Babel plugins in order to use this functionality.

Expand Down
Loading