Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { api } from '@powersync/service-core';
import { api, ParseSyncRulesOptions } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';

import * as sync_rules from '@powersync/service-sync-rules';
Expand All @@ -23,6 +23,12 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
this.connectionTag = config.tag ?? sync_rules.DEFAULT_TAG;
}

getParseSyncRulesOptions(): ParseSyncRulesOptions {
return {
defaultSchema: 'public'
};
}

async shutdown(): Promise<void> {
await this.pool.end();
}
Expand Down
45 changes: 37 additions & 8 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import * as util from '../utils/pgwire_utils.js';
import { container, errors, logger } from '@powersync/lib-services-framework';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
import { getPgOutputRelation, getRelId } from './PgRelation.js';
import { Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js';
import { PgManager } from './PgManager.js';

export const ZERO_LSN = '00000000/00000000';
export const PUBLICATION_NAME = 'powersync';
export const POSTGRES_DEFAULT_SCHEMA = 'public';

export interface WalStreamOptions {
connections: PgManager;
Expand Down Expand Up @@ -46,7 +47,7 @@ export class WalStream {

constructor(options: WalStreamOptions) {
this.storage = options.storage;
this.sync_rules = options.storage.sync_rules;
this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA });
this.group_id = options.storage.group_id;
this.slot_name = options.storage.slot_name;
this.connections = options.connections;
Expand Down Expand Up @@ -333,7 +334,7 @@ WHERE oid = $1::regclass`,

async initialReplication(db: pgwire.PgConnection, lsn: string) {
const sourceTables = this.sync_rules.getSourceTables();
await this.storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => {
await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => {
for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
for (let table of tables) {
Expand Down Expand Up @@ -389,7 +390,14 @@ WHERE oid = $1::regclass`,

for (let record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: record });
await batch.save({
tag: 'insert',
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}
at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);
Expand Down Expand Up @@ -481,19 +489,40 @@ WHERE oid = $1::regclass`,
if (msg.tag == 'insert') {
Metrics.getInstance().rows_replicated_total.add(1);
const baseRecord = util.constructAfterRecord(msg);
return await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: baseRecord });
return await batch.save({
tag: 'insert',
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: baseRecord,
afterReplicaId: getUuidReplicaIdentityBson(baseRecord, table.replicaIdColumns)
});
} else if (msg.tag == 'update') {
Metrics.getInstance().rows_replicated_total.add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const before = util.constructBeforeRecord(msg);
const after = util.constructAfterRecord(msg);
return await batch.save({ tag: 'update', sourceTable: table, before: before, after: after });
return await batch.save({
tag: 'update',
sourceTable: table,
before: before,
beforeReplicaId: before ? getUuidReplicaIdentityBson(before, table.replicaIdColumns) : undefined,
after: after,
afterReplicaId: getUuidReplicaIdentityBson(after, table.replicaIdColumns)
});
} else if (msg.tag == 'delete') {
Metrics.getInstance().rows_replicated_total.add(1);
const before = util.constructBeforeRecord(msg)!;

return await batch.save({ tag: 'delete', sourceTable: table, before: before, after: undefined });
return await batch.save({
tag: 'delete',
sourceTable: table,
before: before,
beforeReplicaId: getUuidReplicaIdentityBson(before, table.replicaIdColumns),
after: undefined,
afterReplicaId: undefined
});
}
} else if (msg.tag == 'truncate') {
let tables: storage.SourceTable[] = [];
Expand Down Expand Up @@ -541,7 +570,7 @@ WHERE oid = $1::regclass`,
// Auto-activate as soon as initial replication is done
await this.storage.autoActivate();

await this.storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => {
await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => {
// Replication never starts in the middle of a transaction
let inTx = false;
let count = 0;
Expand Down
4 changes: 2 additions & 2 deletions modules/module-postgres/test/src/slow_tests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ bucket_definitions:
- SELECT * FROM "test_data"
`;
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
const storage = f.getInstance(syncRules.parsed());
const storage = f.getInstance(syncRules);
abortController = new AbortController();
const options: WalStreamOptions = {
abort_signal: abortController.signal,
Expand Down Expand Up @@ -235,7 +235,7 @@ bucket_definitions:
- SELECT id, description FROM "test_data"
`;
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
const storage = f.getInstance(syncRules.parsed());
const storage = f.getInstance(syncRules);

// 1. Setup some base data that will be replicated in initial replication
await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`);
Expand Down
4 changes: 2 additions & 2 deletions modules/module-postgres/test/src/validation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ bucket_definitions:

const syncRules = await context.factory.updateSyncRules({ content: syncRuleContent });

const tablePatterns = syncRules.parsed().sync_rules.getSourceTables();
const tablePatterns = syncRules.parsed({ defaultSchema: 'public' }).sync_rules.getSourceTables();
const tableInfo = await getDebugTablesInfo({
db: pool,
publicationName: context.publicationName,
connectionTag: context.connectionTag,
tablePatterns: tablePatterns,
syncRules: syncRules.parsed().sync_rules
syncRules: syncRules.parsed({ defaultSchema: 'public' }).sync_rules
});
expect(tableInfo).toEqual([
{
Expand Down
2 changes: 1 addition & 1 deletion modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class WalStreamTestContext {

async updateSyncRules(content: string) {
const syncRules = await this.factory.updateSyncRules({ content: content });
this.storage = this.factory.getInstance(syncRules.parsed());
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}

Expand Down
6 changes: 6 additions & 0 deletions packages/service-core/src/api/RouteAPI.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import * as types from '@powersync/service-types';
import { ParseSyncRulesOptions } from '../storage/BucketStorage.js';

export interface PatternResult {
schema: string;
Expand Down Expand Up @@ -69,4 +70,9 @@ export interface RouteAPI {
* Close any resources that need graceful termination.
*/
shutdown(): Promise<void>;

/**
* Get the default schema (or database) when only a table name is specified in sync rules.
*/
getParseSyncRulesOptions(): ParseSyncRulesOptions;
}
4 changes: 2 additions & 2 deletions packages/service-core/src/api/diagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function getSyncRulesStatus(
let rules: SqlSyncRules;
let persisted: storage.PersistedSyncRules;
try {
persisted = sync_rules.parsed();
persisted = sync_rules.parsed(apiHandler.getParseSyncRulesOptions());
rules = persisted.sync_rules;
} catch (e) {
return {
Expand All @@ -53,7 +53,7 @@ export async function getSyncRulesStatus(
};
}

const systemStorage = live_status ? bucketStorage.getInstance(persisted) : undefined;
const systemStorage = live_status ? bucketStorage.getInstance(sync_rules) : undefined;
const status = await systemStorage?.getStatus();
let replication_lag_bytes: number | undefined = undefined;

Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/entry/commands/compact-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export function registerCompactAction(program: Command) {
await client.connect();
try {
const bucketStorage = new storage.MongoBucketStorage(psdb, { slot_name_prefix: configuration.slot_name_prefix });
const active = await bucketStorage.getActiveSyncRules();
const active = await bucketStorage.getActiveSyncRulesContent();
if (active == null) {
logger.info('No active instance to compact');
return;
Expand Down
5 changes: 2 additions & 3 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
} else {
lock = await syncRules.lock();
}
const parsed = syncRules.parsed();
const storage = this.storage.getInstance(parsed);
const storage = this.storage.getInstance(syncRules);
const newJob = this.createJob({
lock: lock,
storage: storage
Expand Down Expand Up @@ -203,7 +202,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
const stopped = await this.storage.getStoppedSyncRules();
for (let syncRules of stopped) {
try {
const syncRuleStorage = this.storage.getInstance(syncRules.parsed());
const syncRuleStorage = this.storage.getInstance(syncRules);
await this.terminateSyncRules(syncRuleStorage);
} catch (e) {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
Expand Down
12 changes: 7 additions & 5 deletions packages/service-core/src/routes/endpoints/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ export const reprocess = routeDefinition({
const {
storageEngine: { activeBucketStorage }
} = service_context;
const next = await activeBucketStorage.getNextSyncRules();
const apiHandler = service_context.routerEngine!.getAPI();
const next = await activeBucketStorage.getNextSyncRules(apiHandler.getParseSyncRulesOptions());
if (next != null) {
throw new Error(`Busy processing sync rules - cannot reprocess`);
}

const active = await activeBucketStorage.getActiveSyncRules();
const active = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions());
if (active == null) {
throw new errors.JourneyError({
status: 422,
Expand All @@ -130,8 +131,6 @@ export const reprocess = routeDefinition({
content: active.sync_rules.content
});

const apiHandler = service_context.routerEngine!.getAPI();

const baseConfig = await apiHandler.getSourceConfig();

return internal_routes.ReprocessResponse.encode({
Expand Down Expand Up @@ -170,7 +169,10 @@ export const validate = routeDefinition({
parsed() {
return {
...this,
sync_rules: SqlSyncRules.fromYaml(content, { throwOnError: false, schema })
sync_rules: SqlSyncRules.fromYaml(content, {
...apiHandler.getParseSyncRulesOptions(),
schema
})
};
},
sync_rules_content: content,
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { SyncRoutes } from './sync-stream.js';
export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal}) => {
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
const { service_context } = context;
const { routerEngine } = service_context;

Expand Down Expand Up @@ -72,6 +72,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
try {
for await (const data of sync.streamResponse({
storage: activeBucketStorage,
parseOptions: routerEngine!.getAPI().getParseSyncRulesOptions(),
params: {
...params,
binary_data: true // always true for web sockets
Expand Down
16 changes: 13 additions & 3 deletions packages/service-core/src/routes/endpoints/sync-rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ export const deploySyncRules = routeDefinition({
const content = payload.params.content;

try {
SqlSyncRules.fromYaml(payload.params.content);
const apiHandler = service_context.routerEngine!.getAPI();
SqlSyncRules.fromYaml(payload.params.content, {
...apiHandler.getParseSyncRulesOptions(),
// We don't do any schema-level validation at this point
schema: undefined
});
} catch (e) {
throw new errors.JourneyError({
status: 422,
Expand Down Expand Up @@ -151,7 +156,8 @@ export const reprocessSyncRules = routeDefinition({
const {
storageEngine: { activeBucketStorage }
} = payload.context.service_context;
const sync_rules = await activeBucketStorage.getActiveSyncRules();
const apiHandler = payload.context.service_context.routerEngine!.getAPI();
const sync_rules = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions());
if (sync_rules == null) {
throw new errors.JourneyError({
status: 422,
Expand Down Expand Up @@ -181,7 +187,11 @@ function replyPrettyJson(payload: any) {

async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
try {
const rules = SqlSyncRules.fromYaml(sync_rules);
const rules = SqlSyncRules.fromYaml(sync_rules, {
...apiHandler.getParseSyncRulesOptions(),
// No schema-based validation at this point
schema: undefined
});
const source_table_patterns = rules.getSourceTables();
const resolved_tables = await apiHandler.getDebugTablesInfo(source_table_patterns, rules);

Expand Down
1 change: 1 addition & 0 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export const syncStreamed = routeDefinition({
sync.ndjson(
sync.streamResponse({
storage: storageEngine.activeBucketStorage,
parseOptions: routerEngine!.getAPI().getParseSyncRulesOptions(),
params,
syncParams,
token: payload.context.token_payload!,
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/runner/teardown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function terminateSyncRules(storageFactory: storage.BucketStorageFactory,

// Mark the sync rules as terminated
for (let syncRules of combinedSyncRules) {
const syncRulesStorage = storageFactory.getInstance(syncRules.parsed());
const syncRulesStorage = storageFactory.getInstance(syncRules);
// The storage will be dropped at the end of the teardown, so we don't need to clear it here
await syncRulesStorage.terminate({ clearStorage: false });
}
Expand Down
Loading
Loading