From c7354761092664f58f10d5712b65d02cb81ebdfa Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Feb 2025 11:30:01 +0200 Subject: [PATCH 1/4] Allow updating sync rules with errors. --- .../src/storage/MongoBucketStorage.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index ada8922eb..8ce01dd91 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -166,13 +166,8 @@ export class MongoBucketStorage } async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise { - // Parse and validate before applying any changes - const parsed = SqlSyncRules.fromYaml(options.content, { - // No schema-based validation at this point - schema: undefined, - defaultSchema: 'not_applicable', // Not needed for validation - throwOnError: true - }); + // We do not validate sync rules at this point. + // That is done when using the sync rules, so that the diagnostics API can report the errors. let rules: MongoPersistedSyncRulesContent | undefined = undefined; From 7429939df3ce33025a0b4d774902c9f48921d655 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Feb 2025 11:31:53 +0200 Subject: [PATCH 2/4] Tweak log message. --- .../src/replication/ChangeStreamReplicationJob.ts | 2 +- modules/module-mysql/src/replication/BinLogReplicationJob.ts | 2 +- .../module-postgres/src/replication/WalStreamReplicationJob.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index cc6f8e11f..3245f0f20 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -78,7 +78,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ if (this.abortController.signal.aborted) { return; } - this.logger.error(`Replication error`, e); + this.logger.error(`${this.slotName} Replication error`, e); if (e.cause != null) { // Without this additional log, the cause may not be visible in the logs. this.logger.error(`cause`, e.cause); diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index aa1a838b2..900ca23f6 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -70,7 +70,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { if (this.abortController.signal.aborted) { return; } - this.logger.error(`Replication error`, e); + this.logger.error(`Sync rules ${this.id} Replication error`, e); if (e.cause != null) { this.logger.error(`cause`, e.cause); } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index e7db0e820..c487c5757 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -99,7 +99,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob }); await stream.replicate(); } catch (e) { - this.logger.error(`Replication error`, e); + this.logger.error(`${this.slotName} Replication error`, e); if (e.cause != null) { // Example: // PgError.conn_ended: Unable to do postgres query on ended connection From fc3ecc8363115f22d62b952d058c21e2cf3dbc91 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Feb 2025 11:53:08 +0200 Subject: [PATCH 3/4] Configurable exit_on_error for sync rule validation errors. --- .../src/storage/MongoBucketStorage.ts | 31 +++++++++----- .../test/src/change_stream_utils.ts | 2 +- .../test/src/BinlogStreamUtils.ts | 2 +- .../storage/PostgresBucketStorageFactory.ts | 40 +++++++++---------- .../test/src/wal_stream_utils.ts | 2 +- .../src/tests/register-data-storage-tests.ts | 2 +- .../src/replication/AbstractReplicator.ts | 12 ++++-- .../src/routes/endpoints/admin.ts | 5 ++- .../src/routes/endpoints/sync-rules.ts | 9 ++++- .../service-core/src/storage/BucketStorage.ts | 6 ++- .../util/config/compound-config-collector.ts | 3 +- .../impl/base64-sync-rules-collector.ts | 1 + .../impl/filesystem-sync-rules-collector.ts | 1 + .../impl/inline-sync-rules-collector.ts | 1 + .../config/sync-rules/sync-rules-provider.ts | 6 +++ .../service-core/src/util/config/types.ts | 1 + packages/types/src/config/PowerSyncConfig.ts | 3 +- 17 files changed, 82 insertions(+), 45 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 8ce01dd91..c164dc4d7 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -101,22 +101,19 @@ export class MongoBucketStorage }; } - async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) { + async configureSyncRules(options: storage.UpdateSyncRulesOptions) { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); - if (next?.sync_rules_content == sync_rules) { + if (next?.sync_rules_content == options.content) { logger.info('Sync rules from configuration unchanged'); return { updated: false }; - } else if (next == null && active?.sync_rules_content == sync_rules) { + } else if (next == null && active?.sync_rules_content == options.content) { logger.info('Sync rules from configuration unchanged'); return { updated: false }; } else { logger.info('Sync rules updated from configuration'); - const persisted_sync_rules = await this.updateSyncRules({ - content: sync_rules, - lock: options?.lock - }); + const persisted_sync_rules = await this.updateSyncRules(options); return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined }; } } @@ -130,7 +127,8 @@ export class MongoBucketStorage if (next != null && next.slot_name == slot_name) { // We need to redo the "next" sync rules await this.updateSyncRules({ - content: next.sync_rules_content + content: next.sync_rules_content, + validate: false }); // Pro-actively stop replicating await this.db.sync_rules.updateOne( @@ -147,7 +145,8 @@ export class MongoBucketStorage } else if (next == null && active?.slot_name == slot_name) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ - content: active.sync_rules_content + content: active.sync_rules_content, + validate: false }); // Pro-actively stop replicating @@ -166,8 +165,18 @@ export class MongoBucketStorage } async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise { - // We do not validate sync rules at this point. - // That is done when using the sync rules, so that the diagnostics API can report the errors. + if (options.validate) { + // Parse and validate before applying any changes + SqlSyncRules.fromYaml(options.content, { + // No schema-based validation at this point + schema: undefined, + defaultSchema: 'not_applicable', // Not needed for validation + throwOnError: true + }); + } else { + // We do not validate sync rules at this point. + // That is done when using the sync rules, so that the diagnostics API can report the errors. + } let rules: MongoPersistedSyncRulesContent | undefined = undefined; diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 6d21ee817..bf7abbcdf 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -58,7 +58,7 @@ export class ChangeStreamTestContext { } async updateSyncRules(content: string) { - const syncRules = await this.factory.updateSyncRules({ content: content }); + const syncRules = await this.factory.updateSyncRules({ content: content, validate: true }); this.storage = this.factory.getInstance(syncRules); return this.storage!; } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 5f7062960..8b4c331c0 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -61,7 +61,7 @@ export class BinlogStreamTestContext { } async updateSyncRules(content: string): Promise { - const syncRules = await this.factory.updateSyncRules({ content: content }); + const syncRules = await this.factory.updateSyncRules({ content: content, validate: true }); this.storage = this.factory.getInstance(syncRules); return this.storage!; } diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index d78a0a5de..b9a70e6c5 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -1,5 +1,5 @@ import * as framework from '@powersync/lib-services-framework'; -import { storage, sync, utils } from '@powersync/service-core'; +import { storage, sync, UpdateSyncRulesOptions, utils } from '@powersync/service-core'; import * as pg_wire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; import crypto from 'crypto'; @@ -169,10 +169,7 @@ export class PostgresBucketStorageFactory } // TODO possibly share implementation in abstract class - async configureSyncRules( - sync_rules: string, - options?: { lock?: boolean } - ): Promise<{ + async configureSyncRules(options: UpdateSyncRulesOptions): Promise<{ updated: boolean; persisted_sync_rules?: storage.PersistedSyncRulesContent; lock?: storage.ReplicationLock; @@ -180,31 +177,32 @@ export class PostgresBucketStorageFactory const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); - if (next?.sync_rules_content == sync_rules) { + if (next?.sync_rules_content == options.content) { framework.logger.info('Sync rules from configuration unchanged'); return { updated: false }; - } else if (next == null && active?.sync_rules_content == sync_rules) { + } else if (next == null && active?.sync_rules_content == options.content) { framework.logger.info('Sync rules from configuration unchanged'); return { updated: false }; } else { framework.logger.info('Sync rules updated from configuration'); - const persisted_sync_rules = await this.updateSyncRules({ - content: sync_rules, - lock: options?.lock - }); + const persisted_sync_rules = await this.updateSyncRules(options); return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined }; } } async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise { // TODO some shared implementation for this might be nice - // Parse and validate before applying any changes - sync_rules.SqlSyncRules.fromYaml(options.content, { - // No schema-based validation at this point - schema: undefined, - defaultSchema: 'not_applicable', // Not needed for validation - throwOnError: true - }); + if (options.validate) { + // Parse and validate before applying any changes + sync_rules.SqlSyncRules.fromYaml(options.content, { + // No schema-based validation at this point + schema: undefined, + defaultSchema: 'not_applicable', // Not needed for validation + throwOnError: true + }); + } else { + // Apply unconditionally. Any errors will be reported via the diagnostics API. + } return this.db.transaction(async (db) => { await db.sql` @@ -266,7 +264,8 @@ export class PostgresBucketStorageFactory if (next != null && next.slot_name == slot_name) { // We need to redo the "next" sync rules await this.updateSyncRules({ - content: next.sync_rules_content + content: next.sync_rules_content, + validate: false }); // Pro-actively stop replicating await this.db.sql` @@ -280,7 +279,8 @@ export class PostgresBucketStorageFactory } else if (next == null && active?.slot_name == slot_name) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ - content: active.sync_rules_content + content: active.sync_rules_content, + validate: false }); // Pro-actively stop replicating diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index f25d6d083..25af4347f 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -62,7 +62,7 @@ export class WalStreamTestContext implements AsyncDisposable { } async updateSyncRules(content: string) { - const syncRules = await this.factory.updateSyncRules({ content: content }); + const syncRules = await this.factory.updateSyncRules({ content: content, validate: true }); this.storage = this.factory.getInstance(syncRules); return this.storage!; } diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 01e337be7..fdfde9b66 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1502,7 +1502,7 @@ bucket_definitions: replication_size_bytes: 0 }); - const r = await f.configureSyncRules('bucket_definitions: {}'); + const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false }); const storage = f.getInstance(r.persisted_sync_rules!); await storage.autoActivate(); diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index 0b12d9983..dd505da7b 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -93,21 +93,27 @@ export abstract class AbstractReplicator; /** @@ -90,6 +89,8 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient; @@ -232,6 +233,7 @@ export interface PersistedSyncRules { export interface UpdateSyncRulesOptions { content: string; lock?: boolean; + validate?: boolean; } export interface SyncRulesBucketStorageOptions { diff --git a/packages/service-core/src/util/config/compound-config-collector.ts b/packages/service-core/src/util/config/compound-config-collector.ts index 48f32e1da..8305ad855 100644 --- a/packages/service-core/src/util/config/compound-config-collector.ts +++ b/packages/service-core/src/util/config/compound-config-collector.ts @@ -196,7 +196,8 @@ export class CompoundConfigCollector { } } return { - present: false + present: false, + exit_on_error: true }; } } diff --git a/packages/service-core/src/util/config/sync-rules/impl/base64-sync-rules-collector.ts b/packages/service-core/src/util/config/sync-rules/impl/base64-sync-rules-collector.ts index 7f4b53f73..eb2abe117 100644 --- a/packages/service-core/src/util/config/sync-rules/impl/base64-sync-rules-collector.ts +++ b/packages/service-core/src/util/config/sync-rules/impl/base64-sync-rules-collector.ts @@ -15,6 +15,7 @@ export class Base64SyncRulesCollector extends SyncRulesCollector { return { present: true, + exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true, content: Buffer.from(sync_rules_base64, 'base64').toString() }; } diff --git a/packages/service-core/src/util/config/sync-rules/impl/filesystem-sync-rules-collector.ts b/packages/service-core/src/util/config/sync-rules/impl/filesystem-sync-rules-collector.ts index e2bf02e99..f28fdcecd 100644 --- a/packages/service-core/src/util/config/sync-rules/impl/filesystem-sync-rules-collector.ts +++ b/packages/service-core/src/util/config/sync-rules/impl/filesystem-sync-rules-collector.ts @@ -20,6 +20,7 @@ export class FileSystemSyncRulesCollector extends SyncRulesCollector { // Only persist the path here, and load on demand using `loadSyncRules()`. return { present: true, + exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true, path: config_path ? path.resolve(path.dirname(config_path), sync_path) : sync_path }; } diff --git a/packages/service-core/src/util/config/sync-rules/impl/inline-sync-rules-collector.ts b/packages/service-core/src/util/config/sync-rules/impl/inline-sync-rules-collector.ts index bff349c76..44e0c132b 100644 --- a/packages/service-core/src/util/config/sync-rules/impl/inline-sync-rules-collector.ts +++ b/packages/service-core/src/util/config/sync-rules/impl/inline-sync-rules-collector.ts @@ -15,6 +15,7 @@ export class InlineSyncRulesCollector extends SyncRulesCollector { return { present: true, + exit_on_error: true, ...baseConfig.sync_rules }; } diff --git a/packages/service-core/src/util/config/sync-rules/sync-rules-provider.ts b/packages/service-core/src/util/config/sync-rules/sync-rules-provider.ts index 150a1cd1b..78eda8d85 100644 --- a/packages/service-core/src/util/config/sync-rules/sync-rules-provider.ts +++ b/packages/service-core/src/util/config/sync-rules/sync-rules-provider.ts @@ -3,6 +3,8 @@ import fs from 'fs/promises'; export interface SyncRulesProvider { get(): Promise; + + readonly exitOnError: boolean; } export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider { @@ -15,4 +17,8 @@ export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider { return await fs.readFile(this.config.path, 'utf-8'); } } + + get exitOnError() { + return this.config.exit_on_error; + } } diff --git a/packages/service-core/src/util/config/types.ts b/packages/service-core/src/util/config/types.ts index d1d208bd8..915d960cd 100644 --- a/packages/service-core/src/util/config/types.ts +++ b/packages/service-core/src/util/config/types.ts @@ -26,6 +26,7 @@ export type SyncRulesConfig = { present: boolean; content?: string; path?: string; + exit_on_error: boolean; }; export type ResolvedPowerSyncConfig = { diff --git a/packages/types/src/config/PowerSyncConfig.ts b/packages/types/src/config/PowerSyncConfig.ts index 0bafebe65..add2b5c57 100644 --- a/packages/types/src/config/PowerSyncConfig.ts +++ b/packages/types/src/config/PowerSyncConfig.ts @@ -161,7 +161,8 @@ export const powerSyncConfig = t.object({ sync_rules: t .object({ path: t.string.optional(), - content: t.string.optional() + content: t.string.optional(), + exit_on_error: t.boolean.optional() }) .optional(), From 688d3382a0018042bc3d85a0e8280cac96e02e48 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Feb 2025 12:01:35 +0200 Subject: [PATCH 4/4] Add changeset. --- .changeset/tough-lamps-beam.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .changeset/tough-lamps-beam.md diff --git a/.changeset/tough-lamps-beam.md b/.changeset/tough-lamps-beam.md new file mode 100644 index 000000000..72014a7b5 --- /dev/null +++ b/.changeset/tough-lamps-beam.md @@ -0,0 +1,13 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +'@powersync/service-types': minor +'@powersync/service-image': minor +--- + +Exit replication process when sync rules are not valid; configurable with a new `sync_rules.exit_on_error` option.