Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/tough-lamps-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/service-types': minor
'@powersync/service-image': minor
---

Exit replication process when sync rules are not valid; configurable with a new `sync_rules.exit_on_error` option.
36 changes: 20 additions & 16 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,19 @@ export class MongoBucketStorage
};
}

async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) {
async configureSyncRules(options: storage.UpdateSyncRulesOptions) {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

if (next?.sync_rules_content == sync_rules) {
if (next?.sync_rules_content == options.content) {
logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else if (next == null && active?.sync_rules_content == sync_rules) {
} else if (next == null && active?.sync_rules_content == options.content) {
logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else {
logger.info('Sync rules updated from configuration');
const persisted_sync_rules = await this.updateSyncRules({
content: sync_rules,
lock: options?.lock
});
const persisted_sync_rules = await this.updateSyncRules(options);
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
}
}
Expand All @@ -130,7 +127,8 @@ export class MongoBucketStorage
if (next != null && next.slot_name == slot_name) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content
content: next.sync_rules_content,
validate: false
});
// Pro-actively stop replicating
await this.db.sync_rules.updateOne(
Expand All @@ -147,7 +145,8 @@ export class MongoBucketStorage
} else if (next == null && active?.slot_name == slot_name) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
Expand All @@ -166,13 +165,18 @@ export class MongoBucketStorage
}

async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<MongoPersistedSyncRulesContent> {
// Parse and validate before applying any changes
const parsed = SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
if (options.validate) {
// Parse and validate before applying any changes
SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
} else {
// We do not validate sync rules at this point.
// That is done when using the sync rules, so that the diagnostics API can report the errors.
}

let rules: MongoPersistedSyncRulesContent | undefined = undefined;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
if (this.abortController.signal.aborted) {
return;
}
this.logger.error(`Replication error`, e);
this.logger.error(`${this.slotName} Replication error`, e);
if (e.cause != null) {
// Without this additional log, the cause may not be visible in the logs.
this.logger.error(`cause`, e.cause);
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class ChangeStreamTestContext {
}

async updateSyncRules(content: string) {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
if (this.abortController.signal.aborted) {
return;
}
this.logger.error(`Replication error`, e);
this.logger.error(`Sync rules ${this.id} Replication error`, e);
if (e.cause != null) {
this.logger.error(`cause`, e.cause);
}
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/test/src/BinlogStreamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class BinlogStreamTestContext {
}

async updateSyncRules(content: string): Promise<SyncRulesBucketStorage> {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as framework from '@powersync/lib-services-framework';
import { storage, sync, utils } from '@powersync/service-core';
import { storage, sync, UpdateSyncRulesOptions, utils } from '@powersync/service-core';
import * as pg_wire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import crypto from 'crypto';
Expand Down Expand Up @@ -169,42 +169,40 @@ export class PostgresBucketStorageFactory
}

// TODO possibly share implementation in abstract class
async configureSyncRules(
sync_rules: string,
options?: { lock?: boolean }
): Promise<{
async configureSyncRules(options: UpdateSyncRulesOptions): Promise<{
updated: boolean;
persisted_sync_rules?: storage.PersistedSyncRulesContent;
lock?: storage.ReplicationLock;
}> {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

if (next?.sync_rules_content == sync_rules) {
if (next?.sync_rules_content == options.content) {
framework.logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else if (next == null && active?.sync_rules_content == sync_rules) {
} else if (next == null && active?.sync_rules_content == options.content) {
framework.logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else {
framework.logger.info('Sync rules updated from configuration');
const persisted_sync_rules = await this.updateSyncRules({
content: sync_rules,
lock: options?.lock
});
const persisted_sync_rules = await this.updateSyncRules(options);
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
}
}

async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<PostgresPersistedSyncRulesContent> {
// TODO some shared implementation for this might be nice
// Parse and validate before applying any changes
sync_rules.SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
if (options.validate) {
// Parse and validate before applying any changes
sync_rules.SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
} else {
// Apply unconditionally. Any errors will be reported via the diagnostics API.
}

return this.db.transaction(async (db) => {
await db.sql`
Expand Down Expand Up @@ -266,7 +264,8 @@ export class PostgresBucketStorageFactory
if (next != null && next.slot_name == slot_name) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content
content: next.sync_rules_content,
validate: false
});
// Pro-actively stop replicating
await this.db.sql`
Expand All @@ -280,7 +279,8 @@ export class PostgresBucketStorageFactory
} else if (next == null && active?.slot_name == slot_name) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
});
await stream.replicate();
} catch (e) {
this.logger.error(`Replication error`, e);
this.logger.error(`${this.slotName} Replication error`, e);
if (e.cause != null) {
// Example:
// PgError.conn_ended: Unable to do postgres query on ended connection
Expand Down
2 changes: 1 addition & 1 deletion modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class WalStreamTestContext implements AsyncDisposable {
}

async updateSyncRules(content: string) {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ bucket_definitions:
replication_size_bytes: 0
});

const r = await f.configureSyncRules('bucket_definitions: {}');
const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false });
const storage = f.getInstance(r.persisted_sync_rules!);
await storage.autoActivate();

Expand Down
12 changes: 9 additions & 3 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,27 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

private async runLoop() {
const syncRules = await this.syncRuleProvider.get();

let configuredLock: storage.ReplicationLock | undefined = undefined;
if (syncRules != null) {
this.logger.info('Loaded sync rules');
try {
// Configure new sync rules, if they have changed.
// In that case, also immediately take out a lock, so that another process doesn't start replication on it.
const { lock } = await this.storage.configureSyncRules(syncRules, {
lock: true

const { lock } = await this.storage.configureSyncRules({
content: syncRules,
lock: true,
validate: this.syncRuleProvider.exitOnError
});
if (lock) {
configuredLock = lock;
}
} catch (e) {
// Log, but continue with previous sync rules
// Log and re-raise to exit.
// Should only reach this due to validation errors if exit_on_error is true.
this.logger.error(`Failed to update sync rules from configuration`, e);
throw e;
}
} else {
this.logger.info('No sync rules configured - configure via API');
Expand Down
5 changes: 4 additions & 1 deletion packages/service-core/src/routes/endpoints/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ export const reprocess = routeDefinition({
}

const new_rules = await activeBucketStorage.updateSyncRules({
content: active.sync_rules.content
content: active.sync_rules.content,
// These sync rules already passed validation. But if the rules are not valid anymore due
// to a service change, we do want to report the error here.
validate: true
});

const baseConfig = await apiHandler.getSourceConfig();
Expand Down
9 changes: 7 additions & 2 deletions packages/service-core/src/routes/endpoints/sync-rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ export const deploySyncRules = routeDefinition({
}

const sync_rules = await storageEngine.activeBucketStorage.updateSyncRules({
content: content
content: content,
// Aready validated above
validate: false
});

return {
Expand Down Expand Up @@ -167,7 +169,10 @@ export const reprocessSyncRules = routeDefinition({
}

const new_rules = await activeBucketStorage.updateSyncRules({
content: sync_rules.sync_rules.content
content: sync_rules.sync_rules.content,
// These sync rules already passed validation. But if the rules are not valid anymore due
// to a service change, we do want to report the error here.
validate: true
});
return {
slot_name: new_rules.slot_name
Expand Down
6 changes: 4 additions & 2 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient<Buck
* Update sync rules from configuration, if changed.
*/
configureSyncRules(
sync_rules: string,
options?: { lock?: boolean }
options: UpdateSyncRulesOptions
): Promise<{ updated: boolean; persisted_sync_rules?: PersistedSyncRulesContent; lock?: ReplicationLock }>;

/**
Expand All @@ -90,6 +89,8 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient<Buck

/**
* Deploy new sync rules.
*
* Similar to configureSyncRules, but applies the update unconditionally.
*/
updateSyncRules(options: UpdateSyncRulesOptions): Promise<PersistedSyncRulesContent>;

Expand Down Expand Up @@ -232,6 +233,7 @@ export interface PersistedSyncRules {
export interface UpdateSyncRulesOptions {
content: string;
lock?: boolean;
validate?: boolean;
}

export interface SyncRulesBucketStorageOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ export class CompoundConfigCollector {
}
}
return {
present: false
present: false,
exit_on_error: true
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class Base64SyncRulesCollector extends SyncRulesCollector {

return {
present: true,
exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true,
content: Buffer.from(sync_rules_base64, 'base64').toString()
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class FileSystemSyncRulesCollector extends SyncRulesCollector {
// Only persist the path here, and load on demand using `loadSyncRules()`.
return {
present: true,
exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true,
path: config_path ? path.resolve(path.dirname(config_path), sync_path) : sync_path
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class InlineSyncRulesCollector extends SyncRulesCollector {

return {
present: true,
exit_on_error: true,
...baseConfig.sync_rules
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import fs from 'fs/promises';

export interface SyncRulesProvider {
get(): Promise<string | undefined>;

readonly exitOnError: boolean;
}

export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider {
Expand All @@ -15,4 +17,8 @@ export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider {
return await fs.readFile(this.config.path, 'utf-8');
}
}

get exitOnError() {
return this.config.exit_on_error;
}
}
1 change: 1 addition & 0 deletions packages/service-core/src/util/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type SyncRulesConfig = {
present: boolean;
content?: string;
path?: string;
exit_on_error: boolean;
};

export type ResolvedPowerSyncConfig = {
Expand Down
3 changes: 2 additions & 1 deletion packages/types/src/config/PowerSyncConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ export const powerSyncConfig = t.object({
sync_rules: t
.object({
path: t.string.optional(),
content: t.string.optional()
content: t.string.optional(),
exit_on_error: t.boolean.optional()
})
.optional(),

Expand Down