Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7324968
initial sync rules syntax for events
stevensJourney Sep 3, 2024
eae63d0
initial implementation of replication events
stevensJourney Sep 4, 2024
d6ad8c1
evaluate parameter queries
stevensJourney Sep 4, 2024
744c8f6
add event batching
stevensJourney Sep 10, 2024
fda1a24
add sync rules id to checkpoints
stevensJourney Sep 10, 2024
527a7cd
wip: cleanup event emission
stevensJourney Sep 12, 2024
9cb58f8
wip
stevensJourney Sep 16, 2024
d336ebe
update
stevensJourney Sep 16, 2024
a05dea6
move replication events to storage. Separate implementations for cust…
stevensJourney Sep 16, 2024
00393d5
cleanup
stevensJourney Sep 17, 2024
6e749c2
use event source definitions for event sync rules
stevensJourney Sep 17, 2024
f174f14
support events for tables which aren't used in buckets
stevensJourney Sep 18, 2024
8b8905d
Added batching for events
stevensJourney Sep 23, 2024
3451ef4
move base sql data query
stevensJourney Sep 23, 2024
4a21205
splitting data query types
stevensJourney Sep 23, 2024
684c3f1
rename abstract sql data query
stevensJourney Sep 23, 2024
249b5df
cleanup data query classes
stevensJourney Sep 23, 2024
349fbd3
update from public main
stevensJourney Sep 23, 2024
a5af72b
update from modular architecture branch
stevensJourney Sep 23, 2024
ba13a86
fix lsn undefined error
stevensJourney Sep 23, 2024
d32421c
Added batch checkpoint method
stevensJourney Sep 23, 2024
0fc61f8
rename basesql query
stevensJourney Sep 23, 2024
27c8b08
rename base data query
stevensJourney Sep 23, 2024
7a584e3
cleanup
stevensJourney Sep 23, 2024
a65749d
add index for custom write checkpoints
stevensJourney Sep 23, 2024
e9a6374
cleanup
stevensJourney Sep 23, 2024
7a6f08b
wip: Fire individual replication events. Spread Write Checkpoint API.…
stevensJourney Sep 26, 2024
bd35dcd
Merge remote-tracking branch 'public/feat/modular-replication-archite…
stevensJourney Oct 8, 2024
d7db921
Update index to be unique. Cleanup.
stevensJourney Oct 8, 2024
6b42be2
update custom write checkpoint types
stevensJourney Oct 8, 2024
89b16f7
cleanup
stevensJourney Oct 8, 2024
0d45657
fix tests. work around circular dependencies in sync rules package
stevensJourney Oct 8, 2024
b10c664
throw if incorrect type of checkpoint is created
stevensJourney Oct 8, 2024
85e1d4a
Replace Replication Event manager with Listeners
stevensJourney Oct 9, 2024
056de1a
Add functionality for nested disposed listeners
stevensJourney Oct 10, 2024
c2283a1
Add tests
stevensJourney Oct 10, 2024
d2524a0
added changesets
stevensJourney Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/orange-eagles-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/lib-services-framework': minor
---

Added disposable listeners and observers
6 changes: 6 additions & 0 deletions .changeset/popular-snails-cough.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': minor
'@powersync/service-sync-rules': minor
---

Added ability to emit data replication events
7 changes: 7 additions & 0 deletions .changeset/sour-turkeys-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-rsocket-router': patch
'@powersync/service-types': patch
---

Updates from Replication events changes
6 changes: 5 additions & 1 deletion libs/lib-services/src/utils/BaseObserver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { v4 as uuid } from 'uuid';

export class BaseObserver<T> {
export interface ObserverClient<T> {
registerListener(listener: Partial<T>): () => void;
}

export class BaseObserver<T> implements ObserverClient<T> {
protected listeners: { [id: string]: Partial<T> };

constructor() {
Expand Down
37 changes: 37 additions & 0 deletions libs/lib-services/src/utils/DisposableObserver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { BaseObserver, ObserverClient } from './BaseObserver.js';

export interface DisposableListener {
/**
* Event which is fired when the `[Symbol.disposed]` method is called.
*/
disposed: () => void;
}

export interface DisposableObserverClient<T extends DisposableListener> extends ObserverClient<T>, Disposable {
/**
* Registers a listener that is automatically disposed when the parent is disposed.
* This is useful for disposing nested listeners.
*/
registerManagedListener: (parent: DisposableObserverClient<DisposableListener>, cb: Partial<T>) => () => void;
}

export class DisposableObserver<T extends DisposableListener>
extends BaseObserver<T>
implements DisposableObserverClient<T>
{
registerManagedListener(parent: DisposableObserverClient<DisposableListener>, cb: Partial<T>) {
const disposer = this.registerListener(cb);
parent.registerListener({
disposed: () => {
disposer();
}
});
return disposer;
}

[Symbol.dispose]() {
this.iterateListeners((cb) => cb.disposed?.());
// Delete all callbacks
Object.keys(this.listeners).forEach((key) => delete this.listeners[key]);
}
}
1 change: 1 addition & 0 deletions libs/lib-services/src/utils/utils-index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './BaseObserver.js';
export * from './DisposableObserver.js';
export * from './environment-variables.js';
58 changes: 58 additions & 0 deletions libs/lib-services/test/src/DisposeableObserver.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { describe, expect, test } from 'vitest';

import { DisposableListener, DisposableObserver } from '../../src/utils/DisposableObserver.js';

describe('DisposableObserver', () => {
test('it should dispose all listeners on dispose', () => {
const listener = new DisposableObserver();

let wasDisposed = false;
listener.registerListener({
disposed: () => {
wasDisposed = true;
}
});

listener[Symbol.dispose]();

expect(wasDisposed).equals(true);
expect(Object.keys(listener['listeners']).length).equals(0);
});

test('it should dispose nested listeners for managed listeners', () => {
interface ParentListener extends DisposableListener {
childCreated: (child: DisposableObserver<any>) => void;
}
class ParentObserver extends DisposableObserver<ParentListener> {
createChild() {
const child = new DisposableObserver();
this.iterateListeners((cb) => cb.childCreated?.(child));
}
}

const parent = new ParentObserver();
let aChild: DisposableObserver<any> | null = null;

parent.registerListener({
childCreated: (child) => {
aChild = child;
child.registerManagedListener(parent, {
test: () => {
// this does nothing
}
});
}
});

parent.createChild();

// The managed listener should add a `disposed` listener
expect(Object.keys(parent['listeners']).length).equals(2);
expect(Object.keys(aChild!['listeners']).length).equals(1);

parent[Symbol.dispose]();
expect(Object.keys(parent['listeners']).length).equals(0);
// The listener attached to the child should be disposed when the parent was disposed
expect(Object.keys(aChild!['listeners']).length).equals(0);
});
});
17 changes: 5 additions & 12 deletions modules/module-postgres/src/module/PostgresModule.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import {
api,
auth,
ConfigurationFileSyncRulesProvider,
replication,
system,
TearDownOptions
} from '@powersync/service-core';
import { api, auth, ConfigurationFileSyncRulesProvider, modules, replication, system } from '@powersync/service-core';
import * as jpgwire from '@powersync/service-jpgwire';
import * as types from '../types/types.js';
import { PostgresRouteAPIAdapter } from '../api/PostgresRouteAPIAdapter.js';
import { SupabaseKeyCollector } from '../auth/SupabaseKeyCollector.js';
import { WalStreamReplicator } from '../replication/WalStreamReplicator.js';
import { ConnectionManagerFactory } from '../replication/ConnectionManagerFactory.js';
import { PgManager } from '../replication/PgManager.js';
import { PostgresErrorRateLimiter } from '../replication/PostgresErrorRateLimiter.js';
import { cleanUpReplicationSlot } from '../replication/replication-utils.js';
import { PgManager } from '../replication/PgManager.js';
import { WalStreamReplicator } from '../replication/WalStreamReplicator.js';
import * as types from '../types/types.js';

export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
constructor() {
Expand Down Expand Up @@ -70,7 +63,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
};
}

async teardown(options: TearDownOptions): Promise<void> {
async teardown(options: modules.TearDownOptions): Promise<void> {
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
const connectionManager = new PgManager(normalisedConfig, {
idleTimeout: 30_000,
Expand Down
25 changes: 12 additions & 13 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as pgwire from '@powersync/service-jpgwire';
import * as util from '../utils/pgwire_utils.js';
import { container, errors, logger } from '@powersync/lib-services-framework';
import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
import * as pg_utils from '../utils/pgwire_utils.js';
import { PgManager } from './PgManager.js';
import { getPgOutputRelation, getRelId } from './PgRelation.js';
import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js';
import { PgManager } from './PgManager.js';

export const ZERO_LSN = '00000000/00000000';
export const PUBLICATION_NAME = 'powersync';
Expand Down Expand Up @@ -60,7 +60,7 @@ export class WalStream {
// Ping to speed up cancellation of streaming replication
// We're not using pg_snapshot here, since it could be in the middle of
// an initial replication transaction.
const promise = util.retriedQuery(
const promise = pg_utils.retriedQuery(
this.connections.pool,
`SELECT * FROM pg_logical_emit_message(false, 'powersync', 'ping')`
);
Expand Down Expand Up @@ -347,7 +347,6 @@ WHERE oid = $1::regclass`,
for (let table of tables) {
await this.snapshotTable(batch, db, table);
await batch.markSnapshotDone([table], lsn);

await touch();
}
}
Expand Down Expand Up @@ -395,7 +394,7 @@ WHERE oid = $1::regclass`,
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}

for (let record of WalStream.getQueryData(rows)) {
for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: 'insert',
Expand All @@ -406,6 +405,7 @@ WHERE oid = $1::regclass`,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}

at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);

Expand Down Expand Up @@ -495,7 +495,7 @@ WHERE oid = $1::regclass`,

if (msg.tag == 'insert') {
Metrics.getInstance().rows_replicated_total.add(1);
const baseRecord = util.constructAfterRecord(msg);
const baseRecord = pg_utils.constructAfterRecord(msg);
return await batch.save({
tag: 'insert',
sourceTable: table,
Expand All @@ -508,8 +508,8 @@ WHERE oid = $1::regclass`,
Metrics.getInstance().rows_replicated_total.add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const before = util.constructBeforeRecord(msg);
const after = util.constructAfterRecord(msg);
const before = pg_utils.constructBeforeRecord(msg);
const after = pg_utils.constructAfterRecord(msg);
return await batch.save({
tag: 'update',
sourceTable: table,
Expand All @@ -520,7 +520,7 @@ WHERE oid = $1::regclass`,
});
} else if (msg.tag == 'delete') {
Metrics.getInstance().rows_replicated_total.add(1);
const before = util.constructBeforeRecord(msg)!;
const before = pg_utils.constructBeforeRecord(msg)!;

return await batch.save({
tag: 'delete',
Expand Down Expand Up @@ -592,7 +592,6 @@ WHERE oid = $1::regclass`,
// chunkLastLsn may come from normal messages in the chunk,
// or from a PrimaryKeepalive message.
const { messages, lastLsn: chunkLastLsn } = chunk;

for (const msg of messages) {
if (msg.tag == 'relation') {
await this.handleRelation(batch, getPgOutputRelation(msg), true);
Expand All @@ -609,7 +608,7 @@ WHERE oid = $1::regclass`,
}

count += 1;
const result = await this.writeChange(batch, msg);
await this.writeChange(batch, msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MissingReplicationSlotError, WalStream } from './WalStream.js';
import { container } from '@powersync/lib-services-framework';
import { PgManager } from './PgManager.js';
import { MissingReplicationSlotError, WalStream } from './WalStream.js';

import { replication } from '@powersync/service-core';
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { storage, replication } from '@powersync/service-core';
import { WalStreamReplicationJob } from './WalStreamReplicationJob.js';
import { replication, storage } from '@powersync/service-core';
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
import { cleanUpReplicationSlot } from './replication-utils.js';
import { WalStreamReplicationJob } from './WalStreamReplicationJob.js';

export interface WalStreamReplicatorOptions extends replication.AbstractReplicatorOptions {
connectionFactory: ConnectionManagerFactory;
Expand Down
10 changes: 5 additions & 5 deletions modules/module-postgres/test/src/slow_tests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import * as bson from 'bson';
import { afterEach, describe, expect, test } from 'vitest';
import { WalStream, WalStreamOptions } from '../../src/replication/WalStream.js';
import { env } from './env.js';
import { TEST_CONNECTION_OPTIONS, clearTestDb, connectPgPool, getClientCheckpoint } from './util.js';
import { clearTestDb, connectPgPool, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js';

import * as pgwire from '@powersync/service-jpgwire';
import { SqliteRow } from '@powersync/service-sync-rules';

import { mapOpEntry, MongoBucketStorage } from '@/storage/storage-index.js';
import * as timers from 'node:timers/promises';
import { reduceBucket, validateCompactedBucket } from '@core-tests/bucket_validation.js';
import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js';
import { PgManager } from '@module/replication/PgManager.js';
import { reduceBucket, validateCompactedBucket } from '@core-tests/bucket_validation.js';
import * as timers from 'node:timers/promises';

describe('slow tests - mongodb', function () {
// These are slow, inconsistent tests.
Expand Down Expand Up @@ -82,7 +82,7 @@ bucket_definitions:
- SELECT * FROM "test_data"
`;
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
const storage = f.getInstance(syncRules);
using storage = f.getInstance(syncRules);
abortController = new AbortController();
const options: WalStreamOptions = {
abort_signal: abortController.signal,
Expand Down Expand Up @@ -234,7 +234,7 @@ bucket_definitions:
- SELECT id, description FROM "test_data"
`;
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
const storage = f.getInstance(syncRules);
using storage = f.getInstance(syncRules);

// 1. Setup some base data that will be replicated in initial replication
await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`);
Expand Down
10 changes: 6 additions & 4 deletions modules/module-postgres/test/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { connectMongo } from '@core-tests/util.js';
import * as types from '@module/types/types.js';
import * as pg_utils from '@module/utils/pgwire_utils.js';
import { logger } from '@powersync/lib-services-framework';
import { BucketStorageFactory, Metrics, MongoBucketStorage, OpId } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import { env } from './env.js';
import { pgwireRows } from '@powersync/service-jpgwire';
import { logger } from '@powersync/lib-services-framework';
import { connectMongo } from '@core-tests/util.js';
import { env } from './env.js';

// The metrics need to be initialized before they can be used
await Metrics.initialise({
Expand Down Expand Up @@ -35,7 +35,9 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => {

await db.clear();

return new MongoBucketStorage(db, { slot_name_prefix: 'test_' });
return new MongoBucketStorage(db, {
slot_name_prefix: 'test_'
});
};

export async function clearTestDb(db: pgwire.PgClient) {
Expand Down
21 changes: 9 additions & 12 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { BucketStorageFactory, SyncRulesBucketStorage } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import { TEST_CONNECTION_OPTIONS, clearTestDb, getClientCheckpoint } from './util.js';
import { WalStream, WalStreamOptions, PUBLICATION_NAME } from '@module/replication/WalStream.js';
import { fromAsync } from '@core-tests/stream_utils.js';
import { PgManager } from '@module/replication/PgManager.js';
import { PUBLICATION_NAME, WalStream, WalStreamOptions } from '@module/replication/WalStream.js';
import { BucketStorageFactory, SyncRulesBucketStorage } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js';

/**
* Tests operating on the wal stream need to configure the stream and manage asynchronous
Expand All @@ -20,16 +20,12 @@ export function walStreamTest(
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {});

await clearTestDb(connectionManager.pool);
const context = new WalStreamTestContext(f, connectionManager);
try {
await test(context);
} finally {
await context.dispose();
}
await using context = new WalStreamTestContext(f, connectionManager);
await test(context);
};
}

export class WalStreamTestContext {
export class WalStreamTestContext implements AsyncDisposable {
private _walStream?: WalStream;
private abortController = new AbortController();
private streamPromise?: Promise<void>;
Expand All @@ -41,10 +37,11 @@ export class WalStreamTestContext {
public connectionManager: PgManager
) {}

async dispose() {
async [Symbol.asyncDispose]() {
this.abortController.abort();
await this.streamPromise;
await this.connectionManager.destroy();
this.storage?.[Symbol.dispose]();
}

get pool() {
Expand Down
Loading
Loading