Skip to content

Commit 4b43cdb

Browse files
authored
Exit on sync rule validation error (#198)
* Allow updating sync rules with errors. * Tweak log message. * Configurable exit_on_error for sync rule validation errors. * Add changeset.
1 parent 7f9bb19 commit 4b43cdb

File tree

21 files changed

+98
-53
lines changed

21 files changed

+98
-53
lines changed

.changeset/tough-lamps-beam.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
'@powersync/service-types': minor
10+
'@powersync/service-image': minor
11+
---
12+
13+
Exit replication process when sync rules are not valid; configurable with a new `sync_rules.exit_on_error` option.

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,22 +101,19 @@ export class MongoBucketStorage
101101
};
102102
}
103103

104-
async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) {
104+
async configureSyncRules(options: storage.UpdateSyncRulesOptions) {
105105
const next = await this.getNextSyncRulesContent();
106106
const active = await this.getActiveSyncRulesContent();
107107

108-
if (next?.sync_rules_content == sync_rules) {
108+
if (next?.sync_rules_content == options.content) {
109109
logger.info('Sync rules from configuration unchanged');
110110
return { updated: false };
111-
} else if (next == null && active?.sync_rules_content == sync_rules) {
111+
} else if (next == null && active?.sync_rules_content == options.content) {
112112
logger.info('Sync rules from configuration unchanged');
113113
return { updated: false };
114114
} else {
115115
logger.info('Sync rules updated from configuration');
116-
const persisted_sync_rules = await this.updateSyncRules({
117-
content: sync_rules,
118-
lock: options?.lock
119-
});
116+
const persisted_sync_rules = await this.updateSyncRules(options);
120117
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
121118
}
122119
}
@@ -130,7 +127,8 @@ export class MongoBucketStorage
130127
if (next != null && next.slot_name == slot_name) {
131128
// We need to redo the "next" sync rules
132129
await this.updateSyncRules({
133-
content: next.sync_rules_content
130+
content: next.sync_rules_content,
131+
validate: false
134132
});
135133
// Pro-actively stop replicating
136134
await this.db.sync_rules.updateOne(
@@ -147,7 +145,8 @@ export class MongoBucketStorage
147145
} else if (next == null && active?.slot_name == slot_name) {
148146
// Slot removed for "active" sync rules, while there is no "next" one.
149147
await this.updateSyncRules({
150-
content: active.sync_rules_content
148+
content: active.sync_rules_content,
149+
validate: false
151150
});
152151

153152
// Pro-actively stop replicating
@@ -166,13 +165,18 @@ export class MongoBucketStorage
166165
}
167166

168167
async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<MongoPersistedSyncRulesContent> {
169-
// Parse and validate before applying any changes
170-
const parsed = SqlSyncRules.fromYaml(options.content, {
171-
// No schema-based validation at this point
172-
schema: undefined,
173-
defaultSchema: 'not_applicable', // Not needed for validation
174-
throwOnError: true
175-
});
168+
if (options.validate) {
169+
// Parse and validate before applying any changes
170+
SqlSyncRules.fromYaml(options.content, {
171+
// No schema-based validation at this point
172+
schema: undefined,
173+
defaultSchema: 'not_applicable', // Not needed for validation
174+
throwOnError: true
175+
});
176+
} else {
177+
// We do not validate sync rules at this point.
178+
// That is done when using the sync rules, so that the diagnostics API can report the errors.
179+
}
176180

177181
let rules: MongoPersistedSyncRulesContent | undefined = undefined;
178182

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
7878
if (this.abortController.signal.aborted) {
7979
return;
8080
}
81-
this.logger.error(`Replication error`, e);
81+
this.logger.error(`${this.slotName} Replication error`, e);
8282
if (e.cause != null) {
8383
// Without this additional log, the cause may not be visible in the logs.
8484
this.logger.error(`cause`, e.cause);

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export class ChangeStreamTestContext {
5858
}
5959

6060
async updateSyncRules(content: string) {
61-
const syncRules = await this.factory.updateSyncRules({ content: content });
61+
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
6262
this.storage = this.factory.getInstance(syncRules);
6363
return this.storage!;
6464
}

modules/module-mysql/src/replication/BinLogReplicationJob.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
7070
if (this.abortController.signal.aborted) {
7171
return;
7272
}
73-
this.logger.error(`Replication error`, e);
73+
this.logger.error(`Sync rules ${this.id} Replication error`, e);
7474
if (e.cause != null) {
7575
this.logger.error(`cause`, e.cause);
7676
}

modules/module-mysql/test/src/BinlogStreamUtils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class BinlogStreamTestContext {
6161
}
6262

6363
async updateSyncRules(content: string): Promise<SyncRulesBucketStorage> {
64-
const syncRules = await this.factory.updateSyncRules({ content: content });
64+
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
6565
this.storage = this.factory.getInstance(syncRules);
6666
return this.storage!;
6767
}

modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as framework from '@powersync/lib-services-framework';
2-
import { storage, sync, utils } from '@powersync/service-core';
2+
import { storage, sync, UpdateSyncRulesOptions, utils } from '@powersync/service-core';
33
import * as pg_wire from '@powersync/service-jpgwire';
44
import * as sync_rules from '@powersync/service-sync-rules';
55
import crypto from 'crypto';
@@ -169,42 +169,40 @@ export class PostgresBucketStorageFactory
169169
}
170170

171171
// TODO possibly share implementation in abstract class
172-
async configureSyncRules(
173-
sync_rules: string,
174-
options?: { lock?: boolean }
175-
): Promise<{
172+
async configureSyncRules(options: UpdateSyncRulesOptions): Promise<{
176173
updated: boolean;
177174
persisted_sync_rules?: storage.PersistedSyncRulesContent;
178175
lock?: storage.ReplicationLock;
179176
}> {
180177
const next = await this.getNextSyncRulesContent();
181178
const active = await this.getActiveSyncRulesContent();
182179

183-
if (next?.sync_rules_content == sync_rules) {
180+
if (next?.sync_rules_content == options.content) {
184181
framework.logger.info('Sync rules from configuration unchanged');
185182
return { updated: false };
186-
} else if (next == null && active?.sync_rules_content == sync_rules) {
183+
} else if (next == null && active?.sync_rules_content == options.content) {
187184
framework.logger.info('Sync rules from configuration unchanged');
188185
return { updated: false };
189186
} else {
190187
framework.logger.info('Sync rules updated from configuration');
191-
const persisted_sync_rules = await this.updateSyncRules({
192-
content: sync_rules,
193-
lock: options?.lock
194-
});
188+
const persisted_sync_rules = await this.updateSyncRules(options);
195189
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
196190
}
197191
}
198192

199193
async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<PostgresPersistedSyncRulesContent> {
200194
// TODO some shared implementation for this might be nice
201-
// Parse and validate before applying any changes
202-
sync_rules.SqlSyncRules.fromYaml(options.content, {
203-
// No schema-based validation at this point
204-
schema: undefined,
205-
defaultSchema: 'not_applicable', // Not needed for validation
206-
throwOnError: true
207-
});
195+
if (options.validate) {
196+
// Parse and validate before applying any changes
197+
sync_rules.SqlSyncRules.fromYaml(options.content, {
198+
// No schema-based validation at this point
199+
schema: undefined,
200+
defaultSchema: 'not_applicable', // Not needed for validation
201+
throwOnError: true
202+
});
203+
} else {
204+
// Apply unconditionally. Any errors will be reported via the diagnostics API.
205+
}
208206

209207
return this.db.transaction(async (db) => {
210208
await db.sql`
@@ -266,7 +264,8 @@ export class PostgresBucketStorageFactory
266264
if (next != null && next.slot_name == slot_name) {
267265
// We need to redo the "next" sync rules
268266
await this.updateSyncRules({
269-
content: next.sync_rules_content
267+
content: next.sync_rules_content,
268+
validate: false
270269
});
271270
// Pro-actively stop replicating
272271
await this.db.sql`
@@ -280,7 +279,8 @@ export class PostgresBucketStorageFactory
280279
} else if (next == null && active?.slot_name == slot_name) {
281280
// Slot removed for "active" sync rules, while there is no "next" one.
282281
await this.updateSyncRules({
283-
content: active.sync_rules_content
282+
content: active.sync_rules_content,
283+
validate: false
284284
});
285285

286286
// Pro-actively stop replicating

modules/module-postgres/src/replication/WalStreamReplicationJob.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
9999
});
100100
await stream.replicate();
101101
} catch (e) {
102-
this.logger.error(`Replication error`, e);
102+
this.logger.error(`${this.slotName} Replication error`, e);
103103
if (e.cause != null) {
104104
// Example:
105105
// PgError.conn_ended: Unable to do postgres query on ended connection

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export class WalStreamTestContext implements AsyncDisposable {
6262
}
6363

6464
async updateSyncRules(content: string) {
65-
const syncRules = await this.factory.updateSyncRules({ content: content });
65+
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
6666
this.storage = this.factory.getInstance(syncRules);
6767
return this.storage!;
6868
}

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,7 @@ bucket_definitions:
15021502
replication_size_bytes: 0
15031503
});
15041504

1505-
const r = await f.configureSyncRules('bucket_definitions: {}');
1505+
const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false });
15061506
const storage = f.getInstance(r.persisted_sync_rules!);
15071507
await storage.autoActivate();
15081508

0 commit comments

Comments
 (0)