diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index cfd11b132..205a2c744 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -1,4 +1,4 @@ -import { api } from '@powersync/service-core'; +import { api, ParseSyncRulesOptions } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; @@ -23,6 +23,12 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI { this.connectionTag = config.tag ?? sync_rules.DEFAULT_TAG; } + getParseSyncRulesOptions(): ParseSyncRulesOptions { + return { + defaultSchema: 'public' + }; + } + async shutdown(): Promise { await this.pool.end(); } diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 29ac6bfa5..69cffc80e 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -3,12 +3,13 @@ import * as util from '../utils/pgwire_utils.js'; import { container, errors, logger } from '@powersync/lib-services-framework'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; import { getPgOutputRelation, getRelId } from './PgRelation.js'; -import { Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; +import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js'; import { PgManager } from './PgManager.js'; export const ZERO_LSN = '00000000/00000000'; export const PUBLICATION_NAME = 'powersync'; +export const POSTGRES_DEFAULT_SCHEMA = 'public'; export interface WalStreamOptions { connections: PgManager; @@ -46,7 +47,7 @@ export class WalStream { constructor(options: WalStreamOptions) { this.storage = options.storage; - this.sync_rules = options.storage.sync_rules; + this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA }); this.group_id = options.storage.group_id; this.slot_name = options.storage.slot_name; this.connections = options.connections; @@ -333,7 +334,7 @@ WHERE oid = $1::regclass`, async initialReplication(db: pgwire.PgConnection, lsn: string) { const sourceTables = this.sync_rules.getSourceTables(); - await this.storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => { for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, db, tablePattern); for (let table of tables) { @@ -389,7 +390,14 @@ WHERE oid = $1::regclass`, for (let record of WalStream.getQueryData(rows)) { // This auto-flushes when the batch reaches its size limit - await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: record }); + await batch.save({ + tag: 'insert', + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); } at += rows.length; Metrics.getInstance().rows_replicated_total.add(rows.length); @@ -481,19 +489,40 @@ WHERE oid = $1::regclass`, if (msg.tag == 'insert') { Metrics.getInstance().rows_replicated_total.add(1); const baseRecord = util.constructAfterRecord(msg); - return await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: baseRecord }); + return await batch.save({ + tag: 'insert', + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: baseRecord, + afterReplicaId: getUuidReplicaIdentityBson(baseRecord, table.replicaIdColumns) + }); } else if (msg.tag == 'update') { Metrics.getInstance().rows_replicated_total.add(1); // "before" may be null if the replica id columns are unchanged // It's fine to treat that the same as an insert. const before = util.constructBeforeRecord(msg); const after = util.constructAfterRecord(msg); - return await batch.save({ tag: 'update', sourceTable: table, before: before, after: after }); + return await batch.save({ + tag: 'update', + sourceTable: table, + before: before, + beforeReplicaId: before ? getUuidReplicaIdentityBson(before, table.replicaIdColumns) : undefined, + after: after, + afterReplicaId: getUuidReplicaIdentityBson(after, table.replicaIdColumns) + }); } else if (msg.tag == 'delete') { Metrics.getInstance().rows_replicated_total.add(1); const before = util.constructBeforeRecord(msg)!; - return await batch.save({ tag: 'delete', sourceTable: table, before: before, after: undefined }); + return await batch.save({ + tag: 'delete', + sourceTable: table, + before: before, + beforeReplicaId: getUuidReplicaIdentityBson(before, table.replicaIdColumns), + after: undefined, + afterReplicaId: undefined + }); } } else if (msg.tag == 'truncate') { let tables: storage.SourceTable[] = []; @@ -541,7 +570,7 @@ WHERE oid = $1::regclass`, // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); - await this.storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => { // Replication never starts in the middle of a transaction let inTx = false; let count = 0; diff --git a/modules/module-postgres/test/src/slow_tests.test.ts b/modules/module-postgres/test/src/slow_tests.test.ts index aea33acf9..0b3ae38fc 100644 --- a/modules/module-postgres/test/src/slow_tests.test.ts +++ b/modules/module-postgres/test/src/slow_tests.test.ts @@ -83,7 +83,7 @@ bucket_definitions: - SELECT * FROM "test_data" `; const syncRules = await f.updateSyncRules({ content: syncRuleContent }); - const storage = f.getInstance(syncRules.parsed()); + const storage = f.getInstance(syncRules); abortController = new AbortController(); const options: WalStreamOptions = { abort_signal: abortController.signal, @@ -235,7 +235,7 @@ bucket_definitions: - SELECT id, description FROM "test_data" `; const syncRules = await f.updateSyncRules({ content: syncRuleContent }); - const storage = f.getInstance(syncRules.parsed()); + const storage = f.getInstance(syncRules); // 1. Setup some base data that will be replicated in initial replication await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); diff --git a/modules/module-postgres/test/src/validation.test.ts b/modules/module-postgres/test/src/validation.test.ts index 54dced77e..b7b7b23f2 100644 --- a/modules/module-postgres/test/src/validation.test.ts +++ b/modules/module-postgres/test/src/validation.test.ts @@ -22,13 +22,13 @@ bucket_definitions: const syncRules = await context.factory.updateSyncRules({ content: syncRuleContent }); - const tablePatterns = syncRules.parsed().sync_rules.getSourceTables(); + const tablePatterns = syncRules.parsed({ defaultSchema: 'public' }).sync_rules.getSourceTables(); const tableInfo = await getDebugTablesInfo({ db: pool, publicationName: context.publicationName, connectionTag: context.connectionTag, tablePatterns: tablePatterns, - syncRules: syncRules.parsed().sync_rules + syncRules: syncRules.parsed({ defaultSchema: 'public' }).sync_rules }); expect(tableInfo).toEqual([ { diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 0de968e8e..d9e4f2609 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -58,7 +58,7 @@ export class WalStreamTestContext { async updateSyncRules(content: string) { const syncRules = await this.factory.updateSyncRules({ content: content }); - this.storage = this.factory.getInstance(syncRules.parsed()); + this.storage = this.factory.getInstance(syncRules); return this.storage!; } diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index b5ad5ccab..393d56342 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -1,5 +1,6 @@ import { SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; import * as types from '@powersync/service-types'; +import { ParseSyncRulesOptions } from '../storage/BucketStorage.js'; export interface PatternResult { schema: string; @@ -69,4 +70,9 @@ export interface RouteAPI { * Close any resources that need graceful termination. */ shutdown(): Promise; + + /** + * Get the default schema (or database) when only a table name is specified in sync rules. + */ + getParseSyncRulesOptions(): ParseSyncRulesOptions; } diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 7fd4ef440..7b6da99ce 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -43,7 +43,7 @@ export async function getSyncRulesStatus( let rules: SqlSyncRules; let persisted: storage.PersistedSyncRules; try { - persisted = sync_rules.parsed(); + persisted = sync_rules.parsed(apiHandler.getParseSyncRulesOptions()); rules = persisted.sync_rules; } catch (e) { return { @@ -53,7 +53,7 @@ export async function getSyncRulesStatus( }; } - const systemStorage = live_status ? bucketStorage.getInstance(persisted) : undefined; + const systemStorage = live_status ? bucketStorage.getInstance(sync_rules) : undefined; const status = await systemStorage?.getStatus(); let replication_lag_bytes: number | undefined = undefined; diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index d737b2232..e5f34ea9c 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -37,7 +37,7 @@ export function registerCompactAction(program: Command) { await client.connect(); try { const bucketStorage = new storage.MongoBucketStorage(psdb, { slot_name_prefix: configuration.slot_name_prefix }); - const active = await bucketStorage.getActiveSyncRules(); + const active = await bucketStorage.getActiveSyncRulesContent(); if (active == null) { logger.info('No active instance to compact'); return; diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index bcf480fc9..6c5a2a934 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -166,8 +166,7 @@ export abstract class AbstractReplicator router.reactiveStream(SyncRoutes.STREAM, { validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), - handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal}) => { + handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => { const { service_context } = context; const { routerEngine } = service_context; @@ -72,6 +72,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => try { for await (const data of sync.streamResponse({ storage: activeBucketStorage, + parseOptions: routerEngine!.getAPI().getParseSyncRulesOptions(), params: { ...params, binary_data: true // always true for web sockets diff --git a/packages/service-core/src/routes/endpoints/sync-rules.ts b/packages/service-core/src/routes/endpoints/sync-rules.ts index 605d3c82d..1129a5855 100644 --- a/packages/service-core/src/routes/endpoints/sync-rules.ts +++ b/packages/service-core/src/routes/endpoints/sync-rules.ts @@ -53,7 +53,12 @@ export const deploySyncRules = routeDefinition({ const content = payload.params.content; try { - SqlSyncRules.fromYaml(payload.params.content); + const apiHandler = service_context.routerEngine!.getAPI(); + SqlSyncRules.fromYaml(payload.params.content, { + ...apiHandler.getParseSyncRulesOptions(), + // We don't do any schema-level validation at this point + schema: undefined + }); } catch (e) { throw new errors.JourneyError({ status: 422, @@ -151,7 +156,8 @@ export const reprocessSyncRules = routeDefinition({ const { storageEngine: { activeBucketStorage } } = payload.context.service_context; - const sync_rules = await activeBucketStorage.getActiveSyncRules(); + const apiHandler = payload.context.service_context.routerEngine!.getAPI(); + const sync_rules = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions()); if (sync_rules == null) { throw new errors.JourneyError({ status: 422, @@ -181,7 +187,11 @@ function replyPrettyJson(payload: any) { async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) { try { - const rules = SqlSyncRules.fromYaml(sync_rules); + const rules = SqlSyncRules.fromYaml(sync_rules, { + ...apiHandler.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + }); const source_table_patterns = rules.getSourceTables(); const resolved_tables = await apiHandler.getDebugTablesInfo(source_table_patterns, rules); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index e637e99ea..83d8f3994 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -54,6 +54,7 @@ export const syncStreamed = routeDefinition({ sync.ndjson( sync.streamResponse({ storage: storageEngine.activeBucketStorage, + parseOptions: routerEngine!.getAPI().getParseSyncRulesOptions(), params, syncParams, token: payload.context.token_payload!, diff --git a/packages/service-core/src/runner/teardown.ts b/packages/service-core/src/runner/teardown.ts index 49e2bc66d..583635642 100644 --- a/packages/service-core/src/runner/teardown.ts +++ b/packages/service-core/src/runner/teardown.ts @@ -51,7 +51,7 @@ async function terminateSyncRules(storageFactory: storage.BucketStorageFactory, // Mark the sync rules as terminated for (let syncRules of combinedSyncRules) { - const syncRulesStorage = storageFactory.getInstance(syncRules.parsed()); + const syncRulesStorage = storageFactory.getInstance(syncRules); // The storage will be dropped at the end of the teardown, so we don't need to clear it here await syncRulesStorage.terminate({ clearStorage: false }); } diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 6f8ea0220..f2117f673 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -10,6 +10,7 @@ import { import * as util from '../util/util-index.js'; import { SourceTable } from './SourceTable.js'; import { SourceEntityDescriptor } from './SourceEntity.js'; +import { ReplicaId } from './storage-index.js'; export interface BucketStorageFactory { /** @@ -23,7 +24,7 @@ export interface BucketStorageFactory { /** * Get a storage instance to query sync data for specific sync rules. */ - getInstance(options: PersistedSyncRules): SyncRulesBucketStorage; + getInstance(options: PersistedSyncRulesContent): SyncRulesBucketStorage; /** * Deploy new sync rules. @@ -47,7 +48,7 @@ export interface BucketStorageFactory { /** * Get the sync rules used for querying. */ - getActiveSyncRules(): Promise; + getActiveSyncRules(options: ParseSyncRulesOptions): Promise; /** * Get the sync rules used for querying. @@ -57,7 +58,7 @@ export interface BucketStorageFactory { /** * Get the sync rules that will be active next once done with initial replicatino. */ - getNextSyncRules(): Promise; + getNextSyncRules(options: ParseSyncRulesOptions): Promise; /** * Get the sync rules that will be active next once done with initial replicatino. @@ -130,6 +131,10 @@ export interface StorageMetrics { replication_size_bytes: number; } +export interface ParseSyncRulesOptions { + defaultSchema: string; +} + export interface PersistedSyncRulesContent { readonly id: number; readonly sync_rules_content: string; @@ -139,7 +144,7 @@ export interface PersistedSyncRulesContent { readonly last_keepalive_ts?: Date | null; readonly last_checkpoint_ts?: Date | null; - parsed(): PersistedSyncRules; + parsed(options: ParseSyncRulesOptions): PersistedSyncRules; lock(): Promise; } @@ -185,12 +190,11 @@ export interface BucketDataBatchOptions { chunkLimitBytes?: number; } -export interface StartBatchOptions { +export interface StartBatchOptions extends ParseSyncRulesOptions { zeroLSN: string; } export interface SyncRulesBucketStorage { - readonly sync_rules: SqlSyncRules; readonly group_id: number; readonly slot_name: string; @@ -205,6 +209,8 @@ export interface SyncRulesBucketStorage { getCheckpoint(): Promise<{ checkpoint: util.OpId }>; + getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules; + getParameterSets(checkpoint: util.OpId, lookups: SqliteJsonValue[][]): Promise; /** @@ -357,7 +363,9 @@ export interface SaveInsert { tag: 'insert'; sourceTable: SourceTable; before?: undefined; + beforeReplicaId?: undefined; after: SqliteRow; + afterReplicaId: ReplicaId; } export interface SaveUpdate { @@ -368,6 +376,7 @@ export interface SaveUpdate { * This is only present when the id has changed, and will only contain replica identity columns. */ before?: SqliteRow; + beforeReplicaId?: ReplicaId; /** * A null value means null column. @@ -375,13 +384,16 @@ export interface SaveUpdate { * An undefined value means it's a TOAST value - must be copied from another record. */ after: ToastableSqliteRow; + afterReplicaId: ReplicaId; } export interface SaveDelete { tag: 'delete'; sourceTable: SourceTable; - before: SqliteRow; + before?: SqliteRow; + beforeReplicaId: ReplicaId; after?: undefined; + afterReplicaId?: undefined; } export interface SyncBucketDataBatch { diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index 2cf45c436..c3e59d0b7 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -13,6 +13,7 @@ import { v4 as uuid } from 'uuid'; import { ActiveCheckpoint, BucketStorageFactory, + ParseSyncRulesOptions, PersistedSyncRules, PersistedSyncRulesContent, StorageMetrics, @@ -47,7 +48,7 @@ export class MongoBucketStorage implements BucketStorageFactory { return undefined; } const rules = new MongoPersistedSyncRulesContent(this.db, doc2); - return this.getInstance(rules.parsed()); + return this.getInstance(rules); } }); @@ -60,12 +61,12 @@ export class MongoBucketStorage implements BucketStorageFactory { this.slot_name_prefix = options.slot_name_prefix; } - getInstance(options: PersistedSyncRules): MongoSyncBucketStorage { - let { id, sync_rules, slot_name } = options; + getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage { + let { id, slot_name } = options; if ((typeof id as any) == 'bigint') { id = Number(id); } - return new MongoSyncBucketStorage(this, id, sync_rules, slot_name); + return new MongoSyncBucketStorage(this, id, options, slot_name); } async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) { @@ -135,7 +136,12 @@ export class MongoBucketStorage implements BucketStorageFactory { async updateSyncRules(options: UpdateSyncRulesOptions): Promise { // Parse and validate before applying any changes - const parsed = SqlSyncRules.fromYaml(options.content); + const parsed = SqlSyncRules.fromYaml(options.content, { + // No schema-based validation at this point + schema: undefined, + defaultSchema: 'not_applicable', // Not needed for validation + throwOnError: true + }); let rules: MongoPersistedSyncRulesContent | undefined = undefined; @@ -203,9 +209,9 @@ export class MongoBucketStorage implements BucketStorageFactory { return new MongoPersistedSyncRulesContent(this.db, doc); } - async getActiveSyncRules(): Promise { + async getActiveSyncRules(options: ParseSyncRulesOptions): Promise { const content = await this.getActiveSyncRulesContent(); - return content?.parsed() ?? null; + return content?.parsed(options) ?? null; } async getNextSyncRulesContent(): Promise { @@ -222,9 +228,9 @@ export class MongoBucketStorage implements BucketStorageFactory { return new MongoPersistedSyncRulesContent(this.db, doc); } - async getNextSyncRules(): Promise { + async getNextSyncRules(options: ParseSyncRulesOptions): Promise { const content = await this.getNextSyncRulesContent(); - return content?.parsed() ?? null; + return content?.parsed(options) ?? null; } async getReplicatingSyncRules(): Promise { @@ -293,14 +299,6 @@ export class MongoBucketStorage implements BucketStorageFactory { } async getStorageMetrics(): Promise { - const active_sync_rules = await this.getActiveSyncRules(); - if (active_sync_rules == null) { - return { - operations_size_bytes: 0, - parameters_size_bytes: 0, - replication_size_bytes: 0 - }; - } const operations_aggregate = await this.db.bucket_data .aggregate([ diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index 6379732f5..2f5364633 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -1,9 +1,8 @@ -import { DEFAULT_SCHEMA, DEFAULT_TAG } from '@powersync/service-sync-rules'; +import { DEFAULT_TAG } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { ColumnDescriptor } from './SourceEntity.js'; export class SourceTable { - static readonly DEFAULT_SCHEMA = DEFAULT_SCHEMA; static readonly DEFAULT_TAG = DEFAULT_TAG; /** diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 973f6a28a..81c03e3e5 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -11,7 +11,7 @@ import { CurrentBucket, CurrentDataDocument, SourceKey } from './models.js'; import { MongoIdSequence } from './MongoIdSequence.js'; import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js'; import { PersistedBatch } from './PersistedBatch.js'; -import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, serializeLookup } from './util.js'; +import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, replicaIdEquals, serializeLookup } from './util.js'; /** * 15MB @@ -301,7 +301,7 @@ export class MongoBucketBatch implements BucketStorageBatch { } // 2. Save bucket data - if (beforeId != null && (afterId == null || !beforeId.equals(afterId))) { + if (beforeId != null && (afterId == null || !replicaIdEquals(beforeId, afterId))) { // Source ID updated if (sourceTable.syncData) { // Delete old record @@ -431,7 +431,7 @@ export class MongoBucketBatch implements BucketStorageBatch { }; } - if (afterId == null || !beforeId.equals(afterId)) { + if (afterId == null || !replicaIdEquals(beforeId, afterId)) { // Either a delete (afterId == null), or replaced the old replication id batch.deleteCurrentData(before_key); } diff --git a/packages/service-core/src/storage/mongo/MongoCompactor.ts b/packages/service-core/src/storage/mongo/MongoCompactor.ts index 3c52936ba..ff754973d 100644 --- a/packages/service-core/src/storage/mongo/MongoCompactor.ts +++ b/packages/service-core/src/storage/mongo/MongoCompactor.ts @@ -4,6 +4,7 @@ import { addChecksums } from '../../util/utils.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey } from './models.js'; import { CompactOptions } from '../BucketStorage.js'; +import { cacheKey } from './OperationBatch.js'; interface CurrentBucketState { /** Bucket name */ @@ -168,7 +169,7 @@ export class MongoCompactor { let isPersistentPut = doc.op == 'PUT'; if (doc.op == 'REMOVE' || doc.op == 'PUT') { - const key = `${doc.table}/${doc.row_id}/${doc.source_table}/${doc.source_key?.toHexString()}`; + const key = `${doc.table}/${doc.row_id}/${cacheKey(doc.source_table!, doc.source_key!)}`; const targetOp = currentState.seen.get(key); if (targetOp) { // Will convert to MOVE, so don't count as PUT diff --git a/packages/service-core/src/storage/mongo/MongoPersistedSyncRulesContent.ts b/packages/service-core/src/storage/mongo/MongoPersistedSyncRulesContent.ts index a32cf6fc1..363766df4 100644 --- a/packages/service-core/src/storage/mongo/MongoPersistedSyncRulesContent.ts +++ b/packages/service-core/src/storage/mongo/MongoPersistedSyncRulesContent.ts @@ -1,7 +1,7 @@ import { SqlSyncRules } from '@powersync/service-sync-rules'; import * as mongo from 'mongodb'; -import { PersistedSyncRulesContent } from '../BucketStorage.js'; +import { ParseSyncRulesOptions, PersistedSyncRulesContent } from '../BucketStorage.js'; import { MongoPersistedSyncRules } from './MongoPersistedSyncRules.js'; import { MongoSyncRulesLock } from './MongoSyncRulesLock.js'; import { PowerSyncMongo } from './db.js'; @@ -30,10 +30,10 @@ export class MongoPersistedSyncRulesContent implements PersistedSyncRulesContent this.last_keepalive_ts = doc.last_keepalive_ts; } - parsed() { + parsed(options: ParseSyncRulesOptions) { return new MongoPersistedSyncRules( this.id, - SqlSyncRules.fromYaml(this.sync_rules_content), + SqlSyncRules.fromYaml(this.sync_rules_content, options), this.last_checkpoint_lsn, this.slot_name ); diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 95715d45c..771f5093d 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -11,6 +11,9 @@ import { DEFAULT_DOCUMENT_BATCH_LIMIT, DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES, FlushedResult, + ParseSyncRulesOptions, + PersistedSyncRules, + PersistedSyncRulesContent, ResolveTableOptions, ResolveTableResult, StartBatchOptions, @@ -36,15 +39,22 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { } }); + private parsedSyncRulesCache: SqlSyncRules | undefined; + constructor( public readonly factory: MongoBucketStorage, public readonly group_id: number, - public readonly sync_rules: SqlSyncRules, + private readonly sync_rules: PersistedSyncRulesContent, public readonly slot_name: string ) { this.db = factory.db; } + getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules { + this.parsedSyncRulesCache ??= this.sync_rules.parsed(options).sync_rules; + return this.parsedSyncRulesCache; + } + async getCheckpoint() { const doc = await this.db.sync_rules.findOne( { _id: this.group_id }, @@ -71,7 +81,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { const batch = new MongoBucketBatch( this.db, - this.sync_rules, + this.sync_rules.parsed(options).sync_rules, this.group_id, this.slot_name, checkpoint_lsn, diff --git a/packages/service-core/src/storage/mongo/OperationBatch.ts b/packages/service-core/src/storage/mongo/OperationBatch.ts index 9e1edbb6d..4bcc0e64a 100644 --- a/packages/service-core/src/storage/mongo/OperationBatch.ts +++ b/packages/service-core/src/storage/mongo/OperationBatch.ts @@ -1,8 +1,9 @@ -import * as bson from 'bson'; import { ToastableSqliteRow } from '@powersync/service-sync-rules'; +import * as bson from 'bson'; -import * as util from '../../util/util-index.js'; import { SaveOptions } from '../BucketStorage.js'; +import { isUUID } from './util.js'; +import { ReplicaId } from './models.js'; /** * Maximum number of operations in a batch. @@ -63,18 +64,15 @@ export class OperationBatch { } export class RecordOperation { - public readonly afterId: bson.UUID | null; - public readonly beforeId: bson.UUID; + public readonly afterId: ReplicaId | null; + public readonly beforeId: ReplicaId; public readonly internalBeforeKey: string; public readonly internalAfterKey: string | null; public readonly estimatedSize: number; constructor(public readonly record: SaveOptions) { - const after = record.after; - const afterId = after ? util.getUuidReplicaIdentityBson(after, record.sourceTable.replicaIdColumns!) : null; - const beforeId = record.before - ? util.getUuidReplicaIdentityBson(record.before, record.sourceTable.replicaIdColumns!) - : afterId!; + const afterId = record.afterReplicaId ?? null; + const beforeId = record.beforeReplicaId ?? record.afterReplicaId; this.afterId = afterId; this.beforeId = beforeId; this.internalBeforeKey = cacheKey(record.sourceTable.id, beforeId); @@ -84,8 +82,17 @@ export class RecordOperation { } } -export function cacheKey(table: bson.ObjectId, id: bson.UUID) { - return `${table.toHexString()}.${id.toHexString()}`; +/** + * In-memory cache key - must not be persisted. + */ +export function cacheKey(table: bson.ObjectId, id: ReplicaId) { + if (isUUID(id)) { + return `${table.toHexString()}.${id.toHexString()}`; + } else if (typeof id == 'string') { + return `${table.toHexString()}.${id}`; + } else { + return `${table.toHexString()}.${(bson.serialize({ id: id }) as Buffer).toString('base64')}`; + } } /** diff --git a/packages/service-core/src/storage/mongo/PersistedBatch.ts b/packages/service-core/src/storage/mongo/PersistedBatch.ts index 486c9d800..58b0be659 100644 --- a/packages/service-core/src/storage/mongo/PersistedBatch.ts +++ b/packages/service-core/src/storage/mongo/PersistedBatch.ts @@ -13,9 +13,10 @@ import { BucketParameterDocument, CurrentBucket, CurrentDataDocument, - SourceKey + SourceKey, + ReplicaId } from './models.js'; -import { serializeLookup } from './util.js'; +import { replicaIdToSubkey, serializeLookup } from './util.js'; import { logger } from '@powersync/lib-services-framework'; /** @@ -59,7 +60,7 @@ export class PersistedBatch { saveBucketData(options: { op_seq: MongoIdSequence; - sourceKey: bson.UUID; + sourceKey: ReplicaId; table: SourceTable; evaluated: EvaluatedRow[]; before_buckets: CurrentBucket[]; @@ -70,7 +71,7 @@ export class PersistedBatch { remaining_buckets.set(key, b); } - const dchecksum = util.hashDelete(`${options.table.id}/${options.sourceKey}`); + const dchecksum = util.hashDelete(replicaIdToSubkey(options.table.id, options.sourceKey)); for (let k of options.evaluated) { const key = currentBucketKey(k); @@ -134,7 +135,7 @@ export class PersistedBatch { saveParameterData(data: { op_seq: MongoIdSequence; - sourceKey: bson.UUID; + sourceKey: ReplicaId; sourceTable: SourceTable; evaluated: EvaluatedParameters[]; existing_lookups: bson.Binary[]; diff --git a/packages/service-core/src/storage/mongo/models.ts b/packages/service-core/src/storage/mongo/models.ts index ef26564bc..fa52a37da 100644 --- a/packages/service-core/src/storage/mongo/models.ts +++ b/packages/service-core/src/storage/mongo/models.ts @@ -1,13 +1,22 @@ import * as bson from 'bson'; import { SqliteJsonValue } from '@powersync/service-sync-rules'; +/** + * Replica id uniquely identifying a row on the source database. + * + * Can be any value serializable to BSON. + * + * If the value is an entire document, the data serialized to a v5 UUID may be a good choice here. + */ +export type ReplicaId = bson.UUID | bson.Document | any; + export interface SourceKey { /** group_id */ g: number; /** source table id */ t: bson.ObjectId; /** source key */ - k: bson.UUID; + k: ReplicaId; } export interface BucketDataKey { @@ -43,7 +52,7 @@ export interface BucketDataDocument { _id: BucketDataKey; op: OpType; source_table?: bson.ObjectId; - source_key?: bson.UUID; + source_key?: ReplicaId; table?: string; row_id?: string; checksum: number; diff --git a/packages/service-core/src/storage/mongo/util.ts b/packages/service-core/src/storage/mongo/util.ts index fef4bc396..d6b36a7dc 100644 --- a/packages/service-core/src/storage/mongo/util.ts +++ b/packages/service-core/src/storage/mongo/util.ts @@ -1,10 +1,11 @@ import { SqliteJsonValue } from '@powersync/service-sync-rules'; import * as bson from 'bson'; -import * as mongo from 'mongodb'; import * as crypto from 'crypto'; -import { BucketDataDocument } from './models.js'; -import { timestampToOpId } from '../../util/utils.js'; +import * as mongo from 'mongodb'; +import * as uuid from 'uuid'; import { OplogEntry } from '../../util/protocol-types.js'; +import { ID_NAMESPACE, timestampToOpId } from '../../util/utils.js'; +import { BucketDataDocument, ReplicaId } from './models.js'; /** * Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers. @@ -98,7 +99,7 @@ export function mapOpEntry(row: BucketDataDocument): OplogEntry { object_type: row.table, object_id: row.row_id, checksum: Number(row.checksum), - subkey: `${row.source_table}/${row.source_key!.toHexString()}`, + subkey: replicaIdToSubkey(row.source_table!, row.source_key!), data: row.data }; } else { @@ -111,3 +112,47 @@ export function mapOpEntry(row: BucketDataDocument): OplogEntry { }; } } + +/** + * Returns true if two ReplicaId values are the same (serializes to the same BSON value). + */ +export function replicaIdEquals(a: ReplicaId, b: ReplicaId) { + if (a === b) { + return true; + } else if (typeof a == 'string' && typeof b == 'string') { + return a == b; + } else if (isUUID(a) && isUUID(b)) { + return a.equals(b); + } else if (a == null && b == null) { + return true; + } else if (a != null || b != null) { + return false; + } else { + // There are many possible primitive values, this covers them all + return (bson.serialize({ id: a }) as Buffer).equals(bson.serialize({ id: b })); + } +} + +export function replicaIdToSubkey(table: bson.ObjectId, id: ReplicaId): string { + if (isUUID(id)) { + // Special case for UUID for backwards-compatiblity + return `${table.toHexString()}/${id.toHexString()}`; + } else { + // Hashed UUID from the table and id + const repr = bson.serialize({ table, id }); + return uuid.v5(repr, ID_NAMESPACE); + } +} + +/** + * True if this is a bson.UUID. + * + * Works even with multiple copies of the bson package. + */ +export function isUUID(value: any): value is bson.UUID { + if (value == null || typeof value != 'object') { + return false; + } + const uuid = value as bson.UUID; + return uuid._bsontype == 'Binary' && uuid.sub_type == bson.Binary.SUBTYPE_UUID; +} diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index a04c53182..8f2f900a0 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,6 +1,7 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { RequestParameters } from '@powersync/service-sync-rules'; import { Semaphore, withTimeout } from 'async-mutex'; + import { AbortError } from 'ix/aborterror.js'; import * as auth from '../auth/auth-index.js'; @@ -35,6 +36,7 @@ export interface SyncStreamParameters { params: util.StreamingSyncRequest; syncParams: RequestParameters; token: auth.JwtPayload; + parseOptions: storage.ParseSyncRulesOptions; /** * If this signal is aborted, the stream response ends as soon as possible, without error. */ @@ -47,7 +49,7 @@ export interface SyncStreamParameters { export async function* streamResponse( options: SyncStreamParameters ): AsyncIterable { - const { storage, params, syncParams, token, tokenStreamOptions, tracker, signal } = options; + const { storage, params, syncParams, token, tokenStreamOptions, tracker, signal, parseOptions } = options; // We also need to be able to abort, so we create our own controller. const controller = new AbortController(); if (signal) { @@ -63,7 +65,7 @@ export async function* streamResponse( } } const ki = tokenStream(token, controller.signal, tokenStreamOptions); - const stream = streamResponseInner(storage, params, syncParams, tracker, controller.signal); + const stream = streamResponseInner(storage, params, syncParams, tracker, parseOptions, controller.signal); // Merge the two streams, and abort as soon as one of the streams end. const merged = mergeAsyncIterables([stream, ki], controller.signal); @@ -87,6 +89,7 @@ async function* streamResponseInner( params: util.StreamingSyncRequest, syncParams: RequestParameters, tracker: RequestTracker, + parseOptions: storage.ParseSyncRulesOptions, signal: AbortSignal ): AsyncGenerator { // Bucket state of bucket id -> op_id. @@ -115,9 +118,9 @@ async function* streamResponseInner( // Sync rules deleted in the meantime - try again with the next checkpoint. continue; } - const sync_rules = storage.sync_rules; + const syncRules = storage.getParsedSyncRules(parseOptions); - const allBuckets = await sync_rules.queryBucketIds({ + const allBuckets = await syncRules.queryBucketIds({ getParameterSets(lookups) { return storage.getParameterSets(checkpoint, lookups); }, diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 84cfa634e..9a40785cd 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -93,22 +93,6 @@ function getRawReplicaIdentity( return result; } -export function getUuidReplicaIdentityString( - tuple: sync_rules.ToastableSqliteRow, - columns: storage.ColumnDescriptor[] -): string { - const rawIdentity = getRawReplicaIdentity(tuple, columns); - - return uuidForRow(rawIdentity); -} - -export function uuidForRow(row: sync_rules.SqliteRow): string { - // Important: This must not change, since it will affect how ids are generated. - // Use BSON so that it's a well-defined format without encoding ambiguities. - const repr = bson.serialize(row); - return uuid.v5(repr, ID_NAMESPACE); -} - export function getUuidReplicaIdentityBson( tuple: sync_rules.ToastableSqliteRow, columns: storage.ColumnDescriptor[] diff --git a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap index 806abf375..f1e9b6762 100644 --- a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap +++ b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap @@ -61,7 +61,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "object_type": "test", "op": "PUT", "op_id": "3", - "subkey": "6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, { "checksum": 3028503153n, @@ -70,7 +70,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "object_type": "test", "op": "PUT", "op_id": "4", - "subkey": "6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee", + "subkey": "13423353-9f27-59b4-baf0-64a5e09f1769", }, ], "has_more": false, @@ -151,7 +151,7 @@ exports[`sync - mongodb > sync global data 1`] = ` "object_type": "test", "op": "PUT", "op_id": "1", - "subkey": "6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, { "checksum": 3280762209n, @@ -160,7 +160,7 @@ exports[`sync - mongodb > sync global data 1`] = ` "object_type": "test", "op": "PUT", "op_id": "2", - "subkey": "6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee", + "subkey": "13423353-9f27-59b4-baf0-64a5e09f1769", }, ], "has_more": false, @@ -207,7 +207,7 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = ` "object_type": "test", "op": "PUT", "op_id": "1", - "subkey": "6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, ], "has_more": false, @@ -273,7 +273,7 @@ exports[`sync - mongodb > sync updates to global data 2`] = ` "object_type": "test", "op": "PUT", "op_id": "1", - "subkey": "6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, ], "has_more": false, @@ -316,7 +316,7 @@ exports[`sync - mongodb > sync updates to global data 3`] = ` "object_type": "test", "op": "PUT", "op_id": "2", - "subkey": "6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee", + "subkey": "13423353-9f27-59b4-baf0-64a5e09f1769", }, ], "has_more": false, diff --git a/packages/service-core/test/src/compacting.test.ts b/packages/service-core/test/src/compacting.test.ts index e7cb6d313..5fc50a33a 100644 --- a/packages/service-core/test/src/compacting.test.ts +++ b/packages/service-core/test/src/compacting.test.ts @@ -3,7 +3,9 @@ import { SqlSyncRules } from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; import { validateCompactedBucket } from './bucket_validation.js'; import { oneFromAsync } from './stream_utils.js'; -import { makeTestTable, MONGO_STORAGE_FACTORY, ZERO_LSN } from './util.js'; +import { BATCH_OPTIONS, makeTestTable, MONGO_STORAGE_FACTORY, rid, testRules, ZERO_LSN } from './util.js'; +import { ParseSyncRulesOptions, PersistedSyncRulesContent, StartBatchOptions } from '@/storage/BucketStorage.js'; +import { getUuidReplicaIdentityBson } from '@/util/util-index.js'; const TEST_TABLE = makeTestTable('test', ['id']); @@ -18,21 +20,22 @@ function compactTests(compactOptions: MongoCompactOptions) { const factory = MONGO_STORAGE_FACTORY; test('compacting (1)', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules(` bucket_definitions: global: data: [select * from test] `); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't1' - } + }, + afterReplicaId: rid('t1') }); await batch.save({ @@ -40,7 +43,8 @@ bucket_definitions: tag: 'insert', after: { id: 't2' - } + }, + afterReplicaId: rid('t2') }); await batch.save({ @@ -48,7 +52,8 @@ bucket_definitions: tag: 'update', after: { id: 't2' - } + }, + afterReplicaId: rid('t2') }); }); @@ -108,21 +113,22 @@ bucket_definitions: }); test('compacting (2)', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules(` bucket_definitions: global: data: [select * from test] `); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't1' - } + }, + afterReplicaId: rid('t1') }); await batch.save({ @@ -130,7 +136,8 @@ bucket_definitions: tag: 'insert', after: { id: 't2' - } + }, + afterReplicaId: rid('t2') }); await batch.save({ @@ -138,7 +145,8 @@ bucket_definitions: tag: 'delete', before: { id: 't1' - } + }, + beforeReplicaId: rid('t1') }); await batch.save({ @@ -146,7 +154,8 @@ bucket_definitions: tag: 'update', after: { id: 't2' - } + }, + afterReplicaId: rid('t2') }); }); diff --git a/packages/service-core/test/src/data_storage.test.ts b/packages/service-core/test/src/data_storage.test.ts index c2035148f..103ff4b4d 100644 --- a/packages/service-core/test/src/data_storage.test.ts +++ b/packages/service-core/test/src/data_storage.test.ts @@ -1,8 +1,25 @@ -import { BucketDataBatchOptions } from '@/storage/BucketStorage.js'; +import { + BucketDataBatchOptions, + ParseSyncRulesOptions, + PersistedSyncRulesContent, + StartBatchOptions +} from '@/storage/BucketStorage.js'; import { RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; import { fromAsync, oneFromAsync } from './stream_utils.js'; -import { getBatchData, getBatchMeta, makeTestTable, MONGO_STORAGE_FACTORY, StorageFactory, ZERO_LSN } from './util.js'; +import { + BATCH_OPTIONS, + getBatchData, + getBatchMeta, + makeTestTable, + MONGO_STORAGE_FACTORY, + PARSE_OPTIONS, + rid, + StorageFactory, + testRules, + ZERO_LSN +} from './util.js'; +import { getUuidReplicaIdentityBson } from '@/util/util-index.js'; const TEST_TABLE = makeTestTable('test', ['id']); @@ -12,7 +29,7 @@ describe('store - mongodb', function () { function defineDataStorageTests(factory: StorageFactory) { test('save and load parameters', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules(` bucket_definitions: mybucket: parameters: @@ -20,9 +37,9 @@ bucket_definitions: data: [] `); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', @@ -31,7 +48,8 @@ bucket_definitions: id1: 'user3', id2: 'user4', group_id: 'group2a' - } + }, + afterReplicaId: rid('t2') }); await batch.save({ @@ -42,7 +60,8 @@ bucket_definitions: id1: 'user1', id2: 'user2', group_id: 'group1a' - } + }, + afterReplicaId: rid('t1') }); }); @@ -55,34 +74,38 @@ bucket_definitions: }); test('it should use the latest version', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE id = token_parameters.user_id data: [] - `); + ` + ); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result1 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result1 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 'user1', group_id: 'group1' - } + }, + afterReplicaId: rid('user1') }); }); - const result2 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result2 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 'user1', group_id: 'group2' - } + }, + afterReplicaId: rid('user1') }); }); @@ -103,17 +126,19 @@ bucket_definitions: }); test('save and load parameters with different number types', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE n1 = token_parameters.n1 and f2 = token_parameters.f2 and f3 = token_parameters.f3 data: [] - `); + ` + ); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', @@ -123,7 +148,8 @@ bucket_definitions: n1: 314n, f2: 314, f3: 3.14 - } + }, + afterReplicaId: rid('t1') }); }); @@ -144,17 +170,19 @@ bucket_definitions: // This specific case tested here cannot happen with postgres in practice, but we still // test this to ensure correct deserialization. - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE n1 = token_parameters.n1 data: [] - `); + ` + ); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', @@ -162,7 +190,8 @@ bucket_definitions: id: 't1', group_id: 'group1', n1: 1152921504606846976n // 2^60 - } + }, + afterReplicaId: rid('t1') }); await batch.save({ @@ -174,7 +203,8 @@ bucket_definitions: // Simulate a TOAST value, even though it can't happen for values like this // in practice. n1: undefined - } + }, + afterReplicaId: rid('t1') }); }); @@ -187,15 +217,17 @@ bucket_definitions: }); test('removing row', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -204,14 +236,13 @@ bucket_definitions: after: { id: 'test1', description: 'test1' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ sourceTable, tag: 'delete', - before: { - id: 'test1' - } + beforeReplicaId: rid('test1') }); }); @@ -247,25 +278,29 @@ bucket_definitions: test('save and load parameters with workspaceId', async () => { const WORKSPACE_TABLE = makeTestTable('workspace', ['id']); - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules_content = testRules( + ` bucket_definitions: by_workspace: parameters: - SELECT id as workspace_id FROM workspace WHERE workspace."userId" = token_parameters.user_id data: [] - `); + ` + ); + const sync_rules = sync_rules_content.parsed(PARSE_OPTIONS).sync_rules; - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules_content); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: 'insert', after: { id: 'workspace1', userId: 'u1' - } + }, + afterReplicaId: rid('workspace1') }); }); @@ -293,25 +328,29 @@ bucket_definitions: test('save and load parameters with dynamic global buckets', async () => { const WORKSPACE_TABLE = makeTestTable('workspace'); - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules_content = testRules( + ` bucket_definitions: by_public_workspace: parameters: - SELECT id as workspace_id FROM workspace WHERE workspace.visibility = 'public' data: [] - `); + ` + ); + const sync_rules = sync_rules_content.parsed(PARSE_OPTIONS).sync_rules; - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules_content); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: 'insert', after: { id: 'workspace1', visibility: 'public' - } + }, + afterReplicaId: rid('workspace1') }); await batch.save({ @@ -320,7 +359,8 @@ bucket_definitions: after: { id: 'workspace2', visibility: 'private' - } + }, + afterReplicaId: rid('workspace2') }); await batch.save({ @@ -329,7 +369,8 @@ bucket_definitions: after: { id: 'workspace3', visibility: 'public' - } + }, + afterReplicaId: rid('workspace3') }); }); @@ -359,7 +400,8 @@ bucket_definitions: test('multiple parameter queries', async () => { const WORKSPACE_TABLE = makeTestTable('workspace'); - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules_content = testRules( + ` bucket_definitions: by_workspace: parameters: @@ -368,18 +410,21 @@ bucket_definitions: - SELECT id as workspace_id FROM workspace WHERE workspace.user_id = token_parameters.user_id data: [] - `); + ` + ); + const sync_rules = sync_rules_content.parsed(PARSE_OPTIONS).sync_rules; - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules_content); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: 'insert', after: { id: 'workspace1', visibility: 'public' - } + }, + afterReplicaId: rid('workspace1') }); await batch.save({ @@ -388,7 +433,8 @@ bucket_definitions: after: { id: 'workspace2', visibility: 'private' - } + }, + afterReplicaId: rid('workspace2') }); await batch.save({ @@ -398,7 +444,8 @@ bucket_definitions: id: 'workspace3', user_id: 'u1', visibility: 'private' - } + }, + afterReplicaId: rid('workspace3') }); await batch.save({ @@ -408,7 +455,8 @@ bucket_definitions: id: 'workspace4', user_id: 'u2', visibility: 'private' - } + }, + afterReplicaId: rid('workspace4') }); }); @@ -445,16 +493,18 @@ bucket_definitions: }); test('changing client ids', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT client_id as id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); const sourceTable = TEST_TABLE; - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable, tag: 'insert', @@ -462,7 +512,8 @@ bucket_definitions: id: 'test1', client_id: 'client1a', description: 'test1a' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ sourceTable, @@ -471,7 +522,8 @@ bucket_definitions: id: 'test1', client_id: 'client1b', description: 'test1b' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -481,7 +533,8 @@ bucket_definitions: id: 'test2', client_id: 'client2', description: 'test2' - } + }, + afterReplicaId: rid('test2') }); }); const checkpoint = result!.flushed_op; @@ -502,15 +555,17 @@ bucket_definitions: }); test('re-apply delete', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -519,31 +574,28 @@ bucket_definitions: after: { id: 'test1', description: 'test1' - } + }, + afterReplicaId: rid('test1') }); }); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ sourceTable, tag: 'delete', - before: { - id: 'test1' - } + beforeReplicaId: rid('test1') }); }); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ sourceTable, tag: 'delete', - before: { - id: 'test1' - } + beforeReplicaId: rid('test1') }); }); @@ -577,15 +629,17 @@ bucket_definitions: }); test('re-apply update + delete', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -594,11 +648,12 @@ bucket_definitions: after: { id: 'test1', description: 'test1' - } + }, + afterReplicaId: rid('test1') }); }); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -607,7 +662,8 @@ bucket_definitions: after: { id: 'test1', description: undefined - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -616,19 +672,18 @@ bucket_definitions: after: { id: 'test1', description: undefined - } + }, + afterReplicaId: rid('test1') }); await batch.save({ sourceTable, tag: 'delete', - before: { - id: 'test1' - } + beforeReplicaId: rid('test1') }); }); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -637,7 +692,8 @@ bucket_definitions: after: { id: 'test1', description: undefined - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -646,15 +702,14 @@ bucket_definitions: after: { id: 'test1', description: undefined - } + }, + afterReplicaId: rid('test1') }); await batch.save({ sourceTable, tag: 'delete', - before: { - id: 'test1' - } + beforeReplicaId: rid('test1') }); }); @@ -691,17 +746,19 @@ bucket_definitions: }); test('truncate parameters', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id data: [] - `); + ` + ); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); + const storage = (await factory()).getInstance(sync_rules); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', @@ -710,7 +767,8 @@ bucket_definitions: id1: 'user3', id2: 'user4', group_id: 'group2a' - } + }, + afterReplicaId: rid('t2') }); await batch.truncate([TEST_TABLE]); @@ -731,16 +789,18 @@ bucket_definitions: // 1. Not getting the correct "current_data" state for each operation. // 2. Output order not being correct. - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "test" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); // Pre-setup - const result1 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result1 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -749,7 +809,8 @@ bucket_definitions: after: { id: 'test1', description: 'test1a' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -758,14 +819,15 @@ bucket_definitions: after: { id: 'test2', description: 'test2a' - } + }, + afterReplicaId: rid('test2') }); }); const checkpoint1 = result1?.flushed_op ?? '0'; // Test batch - const result2 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result2 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; // b await batch.save({ @@ -774,7 +836,8 @@ bucket_definitions: after: { id: 'test1', description: 'test1b' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -783,10 +846,12 @@ bucket_definitions: before: { id: 'test1' }, + beforeReplicaId: rid('test1'), after: { id: 'test2', description: 'test2b' - } + }, + afterReplicaId: rid('test2') }); await batch.save({ @@ -795,10 +860,13 @@ bucket_definitions: before: { id: 'test2' }, + beforeReplicaId: rid('test2'), after: { id: 'test3', description: 'test3b' - } + }, + + afterReplicaId: rid('test3') }); // c @@ -808,7 +876,8 @@ bucket_definitions: after: { id: 'test2', description: 'test2c' - } + }, + afterReplicaId: rid('test2') }); // d @@ -818,7 +887,8 @@ bucket_definitions: after: { id: 'test4', description: 'test4d' - } + }, + afterReplicaId: rid('test4') }); await batch.save({ @@ -827,10 +897,12 @@ bucket_definitions: before: { id: 'test4' }, + beforeReplicaId: rid('test4'), after: { id: 'test5', description: 'test5d' - } + }, + afterReplicaId: rid('test5') }); }); @@ -865,31 +937,40 @@ bucket_definitions: }); test('changed data with replica identity full', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "test" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + function rid2(id: string, description: string) { + return getUuidReplicaIdentityBson({ id, description }, [ + { name: 'id', type: 'VARCHAR', typeId: 25 }, + { name: 'description', type: 'VARCHAR', typeId: 25 } + ]); + } + const storage = (await factory()).getInstance(sync_rules); const sourceTable = makeTestTable('test', ['id', 'description']); // Pre-setup - const result1 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result1 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable, tag: 'insert', after: { id: 'test1', description: 'test1a' - } + }, + afterReplicaId: rid2('test1', 'test1a') }); }); const checkpoint1 = result1?.flushed_op ?? '0'; - const result2 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result2 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { // Unchanged, but has a before id await batch.save({ sourceTable, @@ -898,14 +979,16 @@ bucket_definitions: id: 'test1', description: 'test1a' }, + beforeReplicaId: rid2('test1', 'test1a'), after: { id: 'test1', description: 'test1b' - } + }, + afterReplicaId: rid2('test1', 'test1b') }); }); - const result3 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result3 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { // Delete await batch.save({ sourceTable, @@ -914,6 +997,7 @@ bucket_definitions: id: 'test1', description: 'test1b' }, + beforeReplicaId: rid2('test1', 'test1b'), after: undefined }); }); @@ -957,31 +1041,41 @@ bucket_definitions: }); test('unchanged data with replica identity full', async () => { - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "test" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + function rid2(id: string, description: string) { + return getUuidReplicaIdentityBson({ id, description }, [ + { name: 'id', type: 'VARCHAR', typeId: 25 }, + { name: 'description', type: 'VARCHAR', typeId: 25 } + ]); + } + + const storage = (await factory()).getInstance(sync_rules); const sourceTable = makeTestTable('test', ['id', 'description']); // Pre-setup - const result1 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result1 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable, tag: 'insert', after: { id: 'test1', description: 'test1a' - } + }, + afterReplicaId: rid2('test1', 'test1a') }); }); const checkpoint1 = result1?.flushed_op ?? '0'; - const result2 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result2 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { // Unchanged, but has a before id await batch.save({ sourceTable, @@ -990,14 +1084,16 @@ bucket_definitions: id: 'test1', description: 'test1a' }, + beforeReplicaId: rid2('test1', 'test1a'), after: { id: 'test1', description: 'test1a' - } + }, + afterReplicaId: rid2('test1', 'test1a') }); }); - const result3 = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result3 = await storage.startBatch(BATCH_OPTIONS, async (batch) => { // Delete await batch.save({ sourceTable, @@ -1006,6 +1102,7 @@ bucket_definitions: id: 'test1', description: 'test1a' }, + beforeReplicaId: rid2('test1', 'test1a'), after: undefined }); }); @@ -1046,15 +1143,17 @@ bucket_definitions: // but large enough in size to be split over multiple returned batches. // The specific batch splits is an implementation detail of the storage driver, // and the test will have to updated when other implementations are added. - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; const largeDescription = '0123456789'.repeat(12_000_00); @@ -1065,7 +1164,8 @@ bucket_definitions: after: { id: 'test1', description: 'test1' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -1074,7 +1174,8 @@ bucket_definitions: after: { id: 'large1', description: largeDescription - } + }, + afterReplicaId: rid('large1') }); // Large enough to split the returned batch @@ -1084,7 +1185,8 @@ bucket_definitions: after: { id: 'large2', description: largeDescription - } + }, + afterReplicaId: rid('large2') }); await batch.save({ @@ -1093,7 +1195,8 @@ bucket_definitions: after: { id: 'test3', description: 'test3' - } + }, + afterReplicaId: rid('test3') }); }); @@ -1138,15 +1241,17 @@ bucket_definitions: // Test syncing a batch of data that is small in count, // but large enough in size to be split over multiple returned chunks. // Similar to the above test, but splits over 1MB chunks. - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; const largeDescription = '0123456789'.repeat(2_000_00); @@ -1157,7 +1262,8 @@ bucket_definitions: after: { id: 'test1', description: 'test1' - } + }, + afterReplicaId: rid('test1') }); await batch.save({ @@ -1166,7 +1272,8 @@ bucket_definitions: after: { id: 'large1', description: largeDescription - } + }, + afterReplicaId: rid('large1') }); // Large enough to split the returned batch @@ -1176,7 +1283,8 @@ bucket_definitions: after: { id: 'large2', description: largeDescription - } + }, + afterReplicaId: rid('large2') }); await batch.save({ @@ -1185,7 +1293,8 @@ bucket_definitions: after: { id: 'test3', description: 'test3' - } + }, + afterReplicaId: rid('test3') }); }); @@ -1227,15 +1336,17 @@ bucket_definitions: test('long batch', async () => { // Test syncing a batch of data that is limited by count. - const sync_rules = SqlSyncRules.fromYaml(` + const sync_rules = testRules( + ` bucket_definitions: global: data: - SELECT id, description FROM "%" -`); - const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' }); +` + ); + const storage = (await factory()).getInstance(sync_rules); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; for (let i = 1; i <= 6; i++) { @@ -1245,7 +1356,8 @@ bucket_definitions: after: { id: `test${i}`, description: `test${i}` - } + }, + afterReplicaId: `test${i}` }); } }); diff --git a/packages/service-core/test/src/sync.test.ts b/packages/service-core/test/src/sync.test.ts index c5e702633..f1c4c7272 100644 --- a/packages/service-core/test/src/sync.test.ts +++ b/packages/service-core/test/src/sync.test.ts @@ -5,7 +5,15 @@ import { JSONBig } from '@powersync/service-jsonbig'; import { RequestParameters } from '@powersync/service-sync-rules'; import * as timers from 'timers/promises'; import { describe, expect, test } from 'vitest'; -import { makeTestTable, MONGO_STORAGE_FACTORY, StorageFactory, ZERO_LSN } from './util.js'; +import { + BATCH_OPTIONS, + makeTestTable, + MONGO_STORAGE_FACTORY, + PARSE_OPTIONS, + StorageFactory, + ZERO_LSN +} from './util.js'; +import { ParseSyncRulesOptions, StartBatchOptions } from '@/storage/BucketStorage.js'; describe('sync - mongodb', function () { defineTests(MONGO_STORAGE_FACTORY); @@ -30,18 +38,19 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't1', description: 'Test 1' - } + }, + afterReplicaId: 't1' }); await batch.save({ @@ -50,7 +59,8 @@ function defineTests(factory: StorageFactory) { after: { id: 't2', description: 'Test 2' - } + }, + afterReplicaId: 't2' }); await batch.commit('0/1'); @@ -63,6 +73,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any @@ -79,11 +90,11 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); - const result = await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', @@ -91,7 +102,8 @@ function defineTests(factory: StorageFactory) { id: 't1', description: 'Test\n"string"', large_num: 12345678901234567890n - } + }, + afterReplicaId: 't1' }); await batch.commit('0/1'); @@ -104,6 +116,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: false }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any @@ -122,7 +135,7 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); @@ -133,6 +146,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: 0 } as any @@ -149,7 +163,7 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); @@ -160,6 +174,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any @@ -168,14 +183,15 @@ function defineTests(factory: StorageFactory) { expect(await getCheckpointLines(iter)).toMatchSnapshot(); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't1', description: 'Test 1' - } + }, + afterReplicaId: 't1' }); await batch.commit('0/1'); @@ -183,14 +199,15 @@ function defineTests(factory: StorageFactory) { expect(await getCheckpointLines(iter)).toMatchSnapshot(); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't2', description: 'Test 2' - } + }, + afterReplicaId: 't2' }); await batch.commit('0/2'); @@ -208,7 +225,7 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); @@ -221,6 +238,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: exp } as any @@ -246,18 +264,19 @@ function defineTests(factory: StorageFactory) { content: BASIC_SYNC_RULES }); - const storage = await f.getInstance(syncRules.parsed()); + const storage = await f.getInstance(syncRules); await storage.setSnapshotDone(ZERO_LSN); await storage.autoActivate(); - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'insert', after: { id: 't1', description: 'Test 1' - } + }, + afterReplicaId: 't1' }); await batch.save({ @@ -266,7 +285,8 @@ function defineTests(factory: StorageFactory) { after: { id: 't2', description: 'Test 2' - } + }, + afterReplicaId: 't2' }); await batch.commit('0/1'); @@ -279,6 +299,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + parseOptions: PARSE_OPTIONS, tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any @@ -298,14 +319,15 @@ function defineTests(factory: StorageFactory) { // Now we save additional data AND compact before continuing. // This invalidates the checkpoint we've received above. - await storage.startBatch({ zeroLSN: ZERO_LSN }, async (batch) => { + await storage.startBatch(BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: 'update', after: { id: 't1', description: 'Test 1b' - } + }, + afterReplicaId: 't1' }); await batch.save({ @@ -314,7 +336,8 @@ function defineTests(factory: StorageFactory) { after: { id: 't2', description: 'Test 2b' - } + }, + afterReplicaId: 't2' }); await batch.commit('0/2'); diff --git a/packages/service-core/test/src/util.ts b/packages/service-core/test/src/util.ts index 4d11c3d5b..5acdd2f2e 100644 --- a/packages/service-core/test/src/util.ts +++ b/packages/service-core/test/src/util.ts @@ -1,13 +1,21 @@ import { Metrics } from '@/metrics/Metrics.js'; -import { BucketStorageFactory, SyncBucketDataBatch } from '@/storage/BucketStorage.js'; +import { + BucketStorageFactory, + ParseSyncRulesOptions, + PersistedSyncRulesContent, + StartBatchOptions, + SyncBucketDataBatch +} from '@/storage/BucketStorage.js'; import { MongoBucketStorage } from '@/storage/MongoBucketStorage.js'; import { SourceTable } from '@/storage/SourceTable.js'; import { PowerSyncMongo } from '@/storage/mongo/db.js'; import { SyncBucketData } from '@/util/protocol-types.js'; -import { hashData } from '@/util/utils.js'; +import { getUuidReplicaIdentityBson, hashData } from '@/util/utils.js'; import * as bson from 'bson'; import * as mongo from 'mongodb'; import { env } from './env.js'; +import { SqlSyncRules } from '@powersync/service-sync-rules'; +import { ReplicaId } from '@/storage/storage-index.js'; // The metrics need to be initialised before they can be used await Metrics.initialise({ @@ -27,6 +35,33 @@ export const MONGO_STORAGE_FACTORY: StorageFactory = async () => { export const ZERO_LSN = '0/0'; +export const PARSE_OPTIONS: ParseSyncRulesOptions = { + defaultSchema: 'public' +}; + +export const BATCH_OPTIONS: StartBatchOptions = { + ...PARSE_OPTIONS, + zeroLSN: ZERO_LSN +}; + +export function testRules(content: string): PersistedSyncRulesContent { + return { + id: 1, + sync_rules_content: content, + slot_name: 'test', + parsed(options) { + return { + id: 1, + sync_rules: SqlSyncRules.fromYaml(content, PARSE_OPTIONS), + slot_name: 'test' + }; + }, + lock() { + throw new Error('Not implemented'); + } + }; +} + export async function connectMongo() { // Short timeout for tests, to fail fast when the server is not available. // Slightly longer timeouts for CI, to avoid arbitrary test failures @@ -45,9 +80,9 @@ export function makeTestTable(name: string, columns?: string[] | undefined) { id, SourceTable.DEFAULT_TAG, relId, - SourceTable.DEFAULT_SCHEMA, + 'public', name, - (columns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeOid: 25 })), + (columns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeId: 25 })), true ); } @@ -93,3 +128,10 @@ function getFirst(batch: SyncBucketData[] | SyncBucketDataBatch[] | SyncBucketDa return first as SyncBucketData; } } + +/** + * Replica id in the old Postgres format, for backwards-compatible tests. + */ +export function rid(id: string): bson.UUID { + return getUuidReplicaIdentityBson({ id: id }, [{ name: 'id', type: 'VARCHAR', typeId: 25 }]); +} diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 459178ee1..a140a1fc2 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -2,6 +2,7 @@ import { IdSequence } from './IdSequence.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlDataQuery } from './SqlDataQuery.js'; import { SqlParameterQuery } from './SqlParameterQuery.js'; +import { SyncRulesOptions } from './SqlSyncRules.js'; import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js'; import { TablePattern } from './TablePattern.js'; import { SqlRuleError } from './errors.js'; @@ -42,11 +43,11 @@ export class SqlBucketDescriptor { parameterIdSequence = new IdSequence(); - addDataQuery(sql: string, schema?: SourceSchema): QueryParseResult { + addDataQuery(sql: string, options: SyncRulesOptions): QueryParseResult { if (this.bucket_parameters == null) { throw new Error('Bucket parameters must be defined'); } - const dataRows = SqlDataQuery.fromSql(this.name, this.bucket_parameters, sql, schema); + const dataRows = SqlDataQuery.fromSql(this.name, this.bucket_parameters, sql, options); dataRows.ruleId = this.idSequence.nextId(); @@ -58,8 +59,8 @@ export class SqlBucketDescriptor { }; } - addParameterQuery(sql: string, schema: SourceSchema | undefined, options: QueryParseOptions): QueryParseResult { - const parameterQuery = SqlParameterQuery.fromSql(this.name, sql, schema, options); + addParameterQuery(sql: string, options: QueryParseOptions): QueryParseResult { + const parameterQuery = SqlParameterQuery.fromSql(this.name, sql, options); if (this.bucket_parameters == null) { this.bucket_parameters = parameterQuery.bucket_parameters; } else { diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index c1176bc9b..b5377d436 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -19,6 +19,7 @@ import { } from './types.js'; import { filterJsonRow, getBucketId, isSelectStatement } from './utils.js'; import { TableQuerySchema } from './TableQuerySchema.js'; +import { SyncRulesOptions } from './SqlSyncRules.js'; interface RowValueExtractor { extract(tables: QueryParameters, into: SqliteRow): void; @@ -26,9 +27,10 @@ interface RowValueExtractor { } export class SqlDataQuery { - static fromSql(descriptor_name: string, bucket_parameters: string[], sql: string, schema?: SourceSchema) { + static fromSql(descriptor_name: string, bucket_parameters: string[], sql: string, options: SyncRulesOptions) { const parsed = parse(sql, { locationTracking: true }); const rows = new SqlDataQuery(); + const schema = options.schema; if (parsed.length > 1) { throw new SqlRuleError('Only a single SELECT statement is supported', sql, parsed[1]?._location); @@ -50,7 +52,7 @@ export class SqlDataQuery { } const alias: string = tableRef.alias ?? tableRef.name; - const sourceTable = new TablePattern(tableRef.schema, tableRef.name); + const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name); let querySchema: QuerySchema | undefined = undefined; if (schema) { const tables = schema.getTables(sourceTable); diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index f13352177..2b5a4d14a 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -23,6 +23,7 @@ import { SqliteRow } from './types.js'; import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js'; +import { SyncRulesOptions } from './SqlSyncRules.js'; /** * Represents a parameter query, such as: @@ -34,11 +35,11 @@ export class SqlParameterQuery { static fromSql( descriptor_name: string, sql: string, - schema?: SourceSchema, - options?: QueryParseOptions + options: QueryParseOptions ): SqlParameterQuery | StaticSqlParameterQuery { const parsed = parse(sql, { locationTracking: true }); const rows = new SqlParameterQuery(); + const schema = options?.schema; if (parsed.length > 1) { throw new SqlRuleError('Only a single SELECT statement is supported', sql, parsed[1]?._location); @@ -70,7 +71,7 @@ export class SqlParameterQuery { new SqlRuleError('Table aliases not supported in parameter queries', sql, q.from?.[0]._location) ); } - const sourceTable = new TablePattern(tableRef.schema, tableRef.name); + const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name); let querySchema: QuerySchema | undefined = undefined; if (schema) { const tables = schema.getTables(sourceTable); @@ -139,7 +140,7 @@ export class SqlParameterQuery { rows.tools = tools; rows.errors.push(...tools.errors); - if (rows.usesDangerousRequestParameters && !options?.accept_potentially_dangerous_queries) { + if (rows.usesDangerousRequestParameters && !options.accept_potentially_dangerous_queries) { let err = new SqlRuleError( "Potentially dangerous query based on parameters set by the client. The client can send any value for these parameters so it's not a good place to do authorization.", sql diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index 15b84adc6..10c0de3b4 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -25,6 +25,18 @@ import { const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); +export interface SyncRulesOptions { + schema?: SourceSchema; + /** + * The default schema to use when only a table name is specified. + * + * 'public' for Postgres, default database for MongoDB/MySQL. + */ + defaultSchema: string; + + throwOnError?: boolean; +} + export class SqlSyncRules implements SyncRules { bucket_descriptors: SqlBucketDescriptor[] = []; idSequence = new IdSequence(); @@ -33,7 +45,7 @@ export class SqlSyncRules implements SyncRules { errors: YamlError[] = []; - static validate(yaml: string, options?: { schema?: SourceSchema }): YamlError[] { + static validate(yaml: string, options: SyncRulesOptions): YamlError[] { try { const rules = this.fromYaml(yaml, options); return rules.errors; @@ -48,9 +60,9 @@ export class SqlSyncRules implements SyncRules { } } - static fromYaml(yaml: string, options?: { throwOnError?: boolean; schema?: SourceSchema }) { - const throwOnError = options?.throwOnError ?? true; - const schema = options?.schema; + static fromYaml(yaml: string, options: SyncRulesOptions) { + const throwOnError = options.throwOnError ?? true; + const schema = options.schema; const lineCounter = new LineCounter(); const parsed = parseDocument(yaml, { @@ -98,7 +110,8 @@ export class SqlSyncRules implements SyncRules { const accept_potentially_dangerous_queries = value.get('accept_potentially_dangerous_queries', true)?.value == true; - const options: QueryParseOptions = { + const queryOptions: QueryParseOptions = { + ...options, accept_potentially_dangerous_queries }; const parameters = value.get('parameters', true) as unknown; @@ -108,16 +121,16 @@ export class SqlSyncRules implements SyncRules { if (parameters instanceof Scalar) { rules.withScalar(parameters, (q) => { - return descriptor.addParameterQuery(q, schema, options); + return descriptor.addParameterQuery(q, queryOptions); }); } else if (parameters instanceof YAMLSeq) { for (let item of parameters.items) { rules.withScalar(item, (q) => { - return descriptor.addParameterQuery(q, schema, options); + return descriptor.addParameterQuery(q, queryOptions); }); } } else { - descriptor.addParameterQuery('SELECT', schema, options); + descriptor.addParameterQuery('SELECT', queryOptions); } if (!(dataQueries instanceof YAMLSeq)) { @@ -126,7 +139,7 @@ export class SqlSyncRules implements SyncRules { } for (let query of dataQueries.items) { rules.withScalar(query, (q) => { - return descriptor.addDataQuery(q, schema); + return descriptor.addDataQuery(q, queryOptions); }); } rules.bucket_descriptors.push(descriptor); diff --git a/packages/sync-rules/src/TablePattern.ts b/packages/sync-rules/src/TablePattern.ts index d6d3494ba..55c90ec9e 100644 --- a/packages/sync-rules/src/TablePattern.ts +++ b/packages/sync-rules/src/TablePattern.ts @@ -1,7 +1,6 @@ import { SourceTableInterface } from './SourceTableInterface.js'; export const DEFAULT_TAG = 'default'; -export const DEFAULT_SCHEMA = 'public'; /** * Some pattern matching SourceTables. @@ -12,8 +11,7 @@ export class TablePattern { public readonly schema: string; public readonly tablePattern: string; - constructor(schema: string | undefined, tablePattern: string) { - schema ??= DEFAULT_SCHEMA; + constructor(schema: string, tablePattern: string) { const splitSchema = schema.split('.'); if (splitSchema.length > 2) { throw new Error(`Invalid schema: ${schema}`); diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index e54d508df..e27489435 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -3,6 +3,7 @@ import { SourceTableInterface } from './SourceTableInterface.js'; import { ColumnDefinition, ExpressionType } from './ExpressionType.js'; import { TablePattern } from './TablePattern.js'; import { toSyncRulesParameters } from './utils.js'; +import { SyncRulesOptions } from './SqlSyncRules.js'; export interface SyncRules { evaluateRow(options: EvaluateRowOptions): EvaluationResult[]; @@ -10,7 +11,7 @@ export interface SyncRules { evaluateParameterRow(table: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[]; } -export interface QueryParseOptions { +export interface QueryParseOptions extends SyncRulesOptions { accept_potentially_dangerous_queries?: boolean; } diff --git a/packages/sync-rules/test/src/data_queries.test.ts b/packages/sync-rules/test/src/data_queries.test.ts index 134585cd4..a70a63b30 100644 --- a/packages/sync-rules/test/src/data_queries.test.ts +++ b/packages/sync-rules/test/src/data_queries.test.ts @@ -1,11 +1,11 @@ import { describe, expect, test } from 'vitest'; import { ExpressionType, SqlDataQuery } from '../../src/index.js'; -import { ASSETS, BASIC_SCHEMA } from './util.js'; +import { ASSETS, BASIC_SCHEMA, PARSE_OPTIONS } from './util.js'; describe('data queries', () => { test('bucket parameters = query', function () { const sql = 'SELECT * FROM assets WHERE assets.org_id = bucket.org_id'; - const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' })).toEqual([ @@ -22,7 +22,7 @@ describe('data queries', () => { test('bucket parameters IN query', function () { const sql = 'SELECT * FROM assets WHERE bucket.category IN assets.categories'; - const query = SqlDataQuery.fromSql('mybucket', ['category'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['category'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'green']) })).toMatchObject([ @@ -43,7 +43,7 @@ describe('data queries', () => { test('table alias', function () { const sql = 'SELECT * FROM assets as others WHERE others.org_id = bucket.org_id'; - const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' })).toEqual([ @@ -59,7 +59,12 @@ describe('data queries', () => { test('types', () => { const schema = BASIC_SCHEMA; - const q1 = SqlDataQuery.fromSql('q1', ['user_id'], `SELECT * FROM assets WHERE owner_id = bucket.user_id`); + const q1 = SqlDataQuery.fromSql( + 'q1', + ['user_id'], + `SELECT * FROM assets WHERE owner_id = bucket.user_id`, + PARSE_OPTIONS + ); expect(q1.getColumnOutputs(schema)).toEqual([ { name: 'assets', @@ -84,7 +89,8 @@ describe('data queries', () => { count * '4' as count4, name ->> '$.attr' as json_value, ifnull(name, 2.0) as maybe_name - FROM assets WHERE owner_id = bucket.user_id` + FROM assets WHERE owner_id = bucket.user_id`, + PARSE_OPTIONS ); expect(q2.getColumnOutputs(schema)).toEqual([ { @@ -109,7 +115,7 @@ describe('data queries', () => { 'q1', ['user_id'], 'SELECT id, name, count FROM assets WHERE owner_id = bucket.user_id', - schema + { ...PARSE_OPTIONS, schema } ); expect(q1.errors).toEqual([]); @@ -117,7 +123,7 @@ describe('data queries', () => { 'q2', ['user_id'], 'SELECT id, upper(description) as d FROM assets WHERE other_id = bucket.user_id', - schema + { ...PARSE_OPTIONS, schema } ); expect(q2.errors).toMatchObject([ { @@ -134,16 +140,16 @@ describe('data queries', () => { 'q3', ['user_id'], 'SELECT id, description, * FROM nope WHERE other_id = bucket.user_id', - schema + { ...PARSE_OPTIONS, schema } ); expect(q3.errors).toMatchObject([ { - message: `Table public.nope not found`, + message: `Table test_schema.nope not found`, type: 'warning' } ]); - const q4 = SqlDataQuery.fromSql('q4', [], 'SELECT * FROM other', schema); + const q4 = SqlDataQuery.fromSql('q4', [], 'SELECT * FROM other', { ...PARSE_OPTIONS, schema }); expect(q4.errors).toMatchObject([ { message: `Query must return an "id" column`, @@ -151,19 +157,19 @@ describe('data queries', () => { } ]); - const q5 = SqlDataQuery.fromSql('q5', [], 'SELECT other_id as id, * FROM other', schema); + const q5 = SqlDataQuery.fromSql('q5', [], 'SELECT other_id as id, * FROM other', { ...PARSE_OPTIONS, schema }); expect(q5.errors).toMatchObject([]); }); test('invalid query - invalid IN', function () { const sql = 'SELECT * FROM assets WHERE assets.category IN bucket.categories'; - const query = SqlDataQuery.fromSql('mybucket', ['categories'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['categories'], sql, PARSE_OPTIONS); expect(query.errors).toMatchObject([{ type: 'fatal', message: 'Unsupported usage of IN operator' }]); }); test('invalid query - not all parameters used', function () { const sql = 'SELECT * FROM assets WHERE 1'; - const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toMatchObject([ { type: 'fatal', message: 'Query must cover all bucket parameters. Expected: ["bucket.org_id"] Got: []' } ]); @@ -171,7 +177,7 @@ describe('data queries', () => { test('invalid query - parameter not defined', function () { const sql = 'SELECT * FROM assets WHERE assets.org_id = bucket.org_id'; - const query = SqlDataQuery.fromSql('mybucket', [], sql); + const query = SqlDataQuery.fromSql('mybucket', [], sql, PARSE_OPTIONS); expect(query.errors).toMatchObject([ { type: 'fatal', message: 'Query must cover all bucket parameters. Expected: [] Got: ["bucket.org_id"]' } ]); @@ -179,19 +185,19 @@ describe('data queries', () => { test('invalid query - function on parameter (1)', function () { const sql = 'SELECT * FROM assets WHERE assets.org_id = upper(bucket.org_id)'; - const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toMatchObject([{ type: 'fatal', message: 'Cannot use bucket parameters in expressions' }]); }); test('invalid query - function on parameter (2)', function () { const sql = 'SELECT * FROM assets WHERE assets.org_id = upper(bucket.org_id)'; - const query = SqlDataQuery.fromSql('mybucket', [], sql); + const query = SqlDataQuery.fromSql('mybucket', [], sql, PARSE_OPTIONS); expect(query.errors).toMatchObject([{ type: 'fatal', message: 'Cannot use bucket parameters in expressions' }]); }); test('invalid query - match clause in select', () => { const sql = 'SELECT id, (bucket.org_id = assets.org_id) as org_matches FROM assets where org_id = bucket.org_id'; - const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql); + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors[0].message).toMatch(/Parameter match expression is not allowed here/); }); }); diff --git a/packages/sync-rules/test/src/parameter_queries.test.ts b/packages/sync-rules/test/src/parameter_queries.test.ts index 7e6a0ee34..78468f08a 100644 --- a/packages/sync-rules/test/src/parameter_queries.test.ts +++ b/packages/sync-rules/test/src/parameter_queries.test.ts @@ -1,11 +1,11 @@ import { describe, expect, test } from 'vitest'; import { SqlParameterQuery } from '../../src/index.js'; -import { BASIC_SCHEMA, normalizeTokenParameters } from './util.js'; +import { BASIC_SCHEMA, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; describe('parameter queries', () => { test('token_parameters IN query', function () { const sql = 'SELECT id as group_id FROM groups WHERE token_parameters.user_id IN groups.user_ids'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; expect(query.evaluateParameterRow({ id: 'group1', user_ids: JSON.stringify(['user1', 'user2']) })).toEqual([ @@ -37,7 +37,7 @@ describe('parameter queries', () => { test('IN token_parameters query', function () { const sql = 'SELECT id as region_id FROM regions WHERE name IN token_parameters.region_names'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; expect(query.evaluateParameterRow({ id: 'region1', name: 'colorado' })).toEqual([ @@ -65,7 +65,7 @@ describe('parameter queries', () => { test('queried numeric parameters', () => { const sql = 'SELECT users.int1, users.float1, users.float2 FROM users WHERE users.int1 = token_parameters.int1 AND users.float1 = token_parameters.float1 AND users.float2 = token_parameters.float2'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; // Note: We don't need to worry about numeric vs decimal types in the lookup - JSONB handles normalization for us. @@ -95,7 +95,7 @@ describe('parameter queries', () => { test('plain token_parameter (baseline)', () => { const sql = 'SELECT id from users WHERE filter_param = token_parameters.user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ @@ -111,7 +111,7 @@ describe('parameter queries', () => { test('function on token_parameter', () => { const sql = 'SELECT id from users WHERE filter_param = upper(token_parameters.user_id)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ @@ -127,7 +127,7 @@ describe('parameter queries', () => { test('token parameter member operator', () => { const sql = "SELECT id from users WHERE filter_param = token_parameters.some_param ->> 'description'"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ @@ -145,7 +145,7 @@ describe('parameter queries', () => { test('token parameter and binary operator', () => { const sql = 'SELECT id from users WHERE filter_param = token_parameters.some_param + 2'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getLookups(normalizeTokenParameters({ some_param: 3 }))).toEqual([['mybucket', undefined, 5n]]); @@ -153,7 +153,7 @@ describe('parameter queries', () => { test('token parameter IS NULL as filter', () => { const sql = 'SELECT id from users WHERE filter_param = (token_parameters.some_param IS NULL)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getLookups(normalizeTokenParameters({ some_param: null }))).toEqual([['mybucket', undefined, 1n]]); @@ -162,7 +162,7 @@ describe('parameter queries', () => { test('direct token parameter', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -182,7 +182,7 @@ describe('parameter queries', () => { test('token parameter IS NULL', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param IS NULL'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -202,7 +202,7 @@ describe('parameter queries', () => { test('token parameter IS NOT NULL', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param IS NOT NULL'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -222,7 +222,7 @@ describe('parameter queries', () => { test('token parameter NOT', () => { const sql = 'SELECT FROM users WHERE NOT token_parameters.is_admin'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -242,7 +242,7 @@ describe('parameter queries', () => { test('row filter and token parameter IS NULL', () => { const sql = 'SELECT FROM users WHERE users.id = token_parameters.user_id AND token_parameters.some_param IS NULL'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -262,7 +262,7 @@ describe('parameter queries', () => { test('row filter and direct token parameter', () => { const sql = 'SELECT FROM users WHERE users.id = token_parameters.user_id AND token_parameters.some_param'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -282,7 +282,7 @@ describe('parameter queries', () => { test('cast', () => { const sql = 'SELECT FROM users WHERE users.id = cast(token_parameters.user_id as text)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ @@ -293,7 +293,7 @@ describe('parameter queries', () => { test('IS NULL row filter', () => { const sql = 'SELECT id FROM users WHERE role IS NULL'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'user1', role: null })).toEqual([ @@ -311,7 +311,7 @@ describe('parameter queries', () => { // Not supported: token_parameters.is_admin != false // Support could be added later. const sql = 'SELECT FROM users WHERE users.id = token_parameters.user_id AND token_parameters.is_admin'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -334,7 +334,7 @@ describe('parameter queries', () => { test('token filter (2)', () => { const sql = 'SELECT users.id AS user_id, token_parameters.is_admin as is_admin FROM users WHERE users.id = token_parameters.user_id AND token_parameters.is_admin'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -357,7 +357,7 @@ describe('parameter queries', () => { test('case-sensitive parameter queries (1)', () => { const sql = 'SELECT users."userId" AS user_id FROM users WHERE users."userId" = token_parameters.user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -375,7 +375,7 @@ describe('parameter queries', () => { // This may change in the future - we should check against expected behavior for // Postgres and/or SQLite. const sql = 'SELECT users.userId AS user_id FROM users WHERE users.userId = token_parameters.user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -391,7 +391,7 @@ describe('parameter queries', () => { test('dynamic global parameter query', () => { const sql = "SELECT workspaces.id AS workspace_id FROM workspaces WHERE visibility = 'public'"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -410,7 +410,7 @@ describe('parameter queries', () => { // This is treated as two separate lookup index values const sql = 'SELECT id from users WHERE filter_param = upper(token_parameters.user_id) AND filter_param = lower(token_parameters.user_id)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ @@ -430,7 +430,7 @@ describe('parameter queries', () => { // This is treated as the same index lookup value, can use OR with the two clauses const sql = 'SELECT id from users WHERE filter_param1 = upper(token_parameters.user_id) OR filter_param2 = upper(token_parameters.user_id)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.evaluateParameterRow({ id: 'test_id', filter_param1: 'test1', filter_param2: 'test2' })).toEqual([ @@ -449,8 +449,9 @@ describe('parameter queries', () => { test('request.parameters()', function () { const sql = "SELECT FROM posts WHERE category = request.parameters() ->> 'category_id'"; - const query = SqlParameterQuery.fromSql('mybucket', sql, undefined, { - accept_potentially_dangerous_queries: true + const query = SqlParameterQuery.fromSql('mybucket', sql, { + accept_potentially_dangerous_queries: true, + ...PARSE_OPTIONS }) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -465,8 +466,9 @@ describe('parameter queries', () => { test('nested request.parameters() (1)', function () { const sql = "SELECT FROM posts WHERE category = request.parameters() -> 'details' ->> 'category'"; - const query = SqlParameterQuery.fromSql('mybucket', sql, undefined, { - accept_potentially_dangerous_queries: true + const query = SqlParameterQuery.fromSql('mybucket', sql, { + accept_potentially_dangerous_queries: true, + ...PARSE_OPTIONS }) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -477,8 +479,9 @@ describe('parameter queries', () => { test('nested request.parameters() (2)', function () { const sql = "SELECT FROM posts WHERE category = request.parameters() ->> 'details.category'"; - const query = SqlParameterQuery.fromSql('mybucket', sql, undefined, { - accept_potentially_dangerous_queries: true + const query = SqlParameterQuery.fromSql('mybucket', sql, { + accept_potentially_dangerous_queries: true, + ...PARSE_OPTIONS }) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -490,8 +493,9 @@ describe('parameter queries', () => { test('IN request.parameters()', function () { // Can use -> or ->> here const sql = "SELECT id as region_id FROM regions WHERE name IN request.parameters() -> 'region_names'"; - const query = SqlParameterQuery.fromSql('mybucket', sql, undefined, { - accept_potentially_dangerous_queries: true + const query = SqlParameterQuery.fromSql('mybucket', sql, { + accept_potentially_dangerous_queries: true, + ...PARSE_OPTIONS }) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; @@ -522,7 +526,7 @@ describe('parameter queries', () => { test('user_parameters in SELECT', function () { const sql = 'SELECT id, user_parameters.other_id as other_id FROM users WHERE id = token_parameters.user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -538,7 +542,7 @@ describe('parameter queries', () => { test('request.parameters() in SELECT', function () { const sql = "SELECT id, request.parameters() ->> 'other_id' as other_id FROM users WHERE id = token_parameters.user_id"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ @@ -553,7 +557,7 @@ describe('parameter queries', () => { test('request.jwt()', function () { const sql = "SELECT FROM users WHERE id = request.jwt() ->> 'sub'"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); const requestParams = normalizeTokenParameters({ user_id: 'user1' }); @@ -562,7 +566,7 @@ describe('parameter queries', () => { test('request.user_id()', function () { const sql = 'SELECT FROM users WHERE id = request.user_id()'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); const requestParams = normalizeTokenParameters({ user_id: 'user1' }); @@ -574,68 +578,67 @@ describe('parameter queries', () => { // into separate queries, but it's a significant change. For now, developers should do that manually. const sql = "SELECT workspaces.id AS workspace_id FROM workspaces WHERE workspaces.user_id = token_parameters.user_id OR visibility = 'public'"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/must use the same parameters/); }); test('invalid OR in parameter queries (2)', () => { const sql = 'SELECT id from users WHERE filter_param = upper(token_parameters.user_id) OR filter_param = lower(token_parameters.user_id)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/must use the same parameters/); }); test('invalid parameter match clause (1)', () => { const sql = 'SELECT FROM users WHERE (id = token_parameters.user_id) = false'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Parameter match clauses cannot be used here/); }); test('invalid parameter match clause (2)', () => { const sql = 'SELECT FROM users WHERE NOT (id = token_parameters.user_id)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Parameter match clauses cannot be used here/); }); test('invalid parameter match clause (3)', () => { // May be supported in the future const sql = 'SELECT FROM users WHERE token_parameters.start_at < users.created_at'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Cannot use table values and parameters in the same clauses/); }); test('invalid parameter match clause (4)', () => { const sql = 'SELECT FROM users WHERE json_extract(users.description, token_parameters.path)'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Cannot use table values and parameters in the same clauses/); }); test('invalid parameter match clause (5)', () => { const sql = 'SELECT (user_parameters.role = posts.roles) as r FROM posts'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Parameter match expression is not allowed here/); }); test('invalid function schema', () => { const sql = 'SELECT FROM users WHERE something.length(users.id) = 0'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors[0].message).toMatch(/Function 'something.length' is not defined/); }); test('validate columns', () => { const schema = BASIC_SCHEMA; - const q1 = SqlParameterQuery.fromSql( - 'q4', - 'SELECT id FROM assets WHERE owner_id = token_parameters.user_id', + const q1 = SqlParameterQuery.fromSql('q4', 'SELECT id FROM assets WHERE owner_id = token_parameters.user_id', { + ...PARSE_OPTIONS, schema - ); + }); expect(q1.errors).toMatchObject([]); const q2 = SqlParameterQuery.fromSql( 'q5', 'SELECT id as asset_id FROM assets WHERE other_id = token_parameters.user_id', - schema + { ...PARSE_OPTIONS, schema } ); expect(q2.errors).toMatchObject([ @@ -649,7 +652,7 @@ describe('parameter queries', () => { describe('dangerous queries', function () { function testDangerousQuery(sql: string) { test(sql, function () { - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toMatchObject([ { message: @@ -661,7 +664,7 @@ describe('parameter queries', () => { } function testSafeQuery(sql: string) { test(sql, function () { - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.usesDangerousRequestParameters).toEqual(false); }); diff --git a/packages/sync-rules/test/src/static_parameter_queries.test.ts b/packages/sync-rules/test/src/static_parameter_queries.test.ts index d030b6f25..3e8b7fbfe 100644 --- a/packages/sync-rules/test/src/static_parameter_queries.test.ts +++ b/packages/sync-rules/test/src/static_parameter_queries.test.ts @@ -1,12 +1,12 @@ import { describe, expect, test } from 'vitest'; import { SqlParameterQuery } from '../../src/index.js'; import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; -import { normalizeTokenParameters } from './util.js'; +import { normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; describe('static parameter queries', () => { test('basic query', function () { const sql = 'SELECT token_parameters.user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters!).toEqual(['user_id']); expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["user1"]']); @@ -14,7 +14,7 @@ describe('static parameter queries', () => { test('global query', function () { const sql = 'SELECT'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters!).toEqual([]); expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket[]']); @@ -22,7 +22,7 @@ describe('static parameter queries', () => { test('query with filter', function () { const sql = 'SELECT token_parameters.user_id WHERE token_parameters.is_admin'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ 'mybucket["user1"]' @@ -32,7 +32,7 @@ describe('static parameter queries', () => { test('function in select clause', function () { const sql = 'SELECT upper(token_parameters.user_id) as upper_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); expect(query.bucket_parameters!).toEqual(['upper_id']); @@ -40,7 +40,7 @@ describe('static parameter queries', () => { test('function in filter clause', function () { const sql = "SELECT WHERE upper(token_parameters.role) = 'ADMIN'"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getStaticBucketIds(normalizeTokenParameters({ role: 'admin' }))).toEqual(['mybucket[]']); expect(query.getStaticBucketIds(normalizeTokenParameters({ role: 'user' }))).toEqual([]); @@ -48,7 +48,7 @@ describe('static parameter queries', () => { test('comparison in filter clause', function () { const sql = 'SELECT WHERE token_parameters.id1 = token_parameters.id2'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.getStaticBucketIds(normalizeTokenParameters({ id1: 't1', id2: 't1' }))).toEqual(['mybucket[]']); expect(query.getStaticBucketIds(normalizeTokenParameters({ id1: 't1', id2: 't2' }))).toEqual([]); @@ -56,7 +56,8 @@ describe('static parameter queries', () => { test('request.parameters()', function () { const sql = "SELECT request.parameters() ->> 'org_id' as org_id"; - const query = SqlParameterQuery.fromSql('mybucket', sql, undefined, { + const query = SqlParameterQuery.fromSql('mybucket', sql, { + ...PARSE_OPTIONS, accept_potentially_dangerous_queries: true }) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); @@ -66,7 +67,7 @@ describe('static parameter queries', () => { test('request.jwt()', function () { const sql = "SELECT request.jwt() ->> 'sub' as user_id"; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['user_id']); @@ -75,7 +76,7 @@ describe('static parameter queries', () => { test('request.user_id()', function () { const sql = 'SELECT request.user_id() as user_id'; - const query = SqlParameterQuery.fromSql('mybucket', sql) as StaticSqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['user_id']); @@ -85,7 +86,7 @@ describe('static parameter queries', () => { describe('dangerous queries', function () { function testDangerousQuery(sql: string) { test(sql, function () { - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toMatchObject([ { message: @@ -97,7 +98,7 @@ describe('static parameter queries', () => { } function testSafeQuery(sql: string) { test(sql, function () { - const query = SqlParameterQuery.fromSql('mybucket', sql) as SqlParameterQuery; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); expect(query.usesDangerousRequestParameters).toEqual(false); }); diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index e5fafd1f9..c7e88ad46 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -1,6 +1,5 @@ import { describe, expect, test } from 'vitest'; import { - DEFAULT_SCHEMA, DEFAULT_TAG, DartSchemaGenerator, JsLegacySchemaGenerator, @@ -9,21 +8,24 @@ import { TsSchemaGenerator } from '../../src/index.js'; -import { ASSETS, BASIC_SCHEMA, TestSourceTable, USERS, normalizeTokenParameters } from './util.js'; +import { ASSETS, BASIC_SCHEMA, PARSE_OPTIONS, TestSourceTable, USERS, normalizeTokenParameters } from './util.js'; describe('sync rules', () => { test('parse empty sync rules', () => { - const rules = SqlSyncRules.fromYaml('bucket_definitions: {}'); + const rules = SqlSyncRules.fromYaml('bucket_definitions: {}', PARSE_OPTIONS); expect(rules.bucket_descriptors).toEqual([]); }); test('parse global sync rules', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT id, description FROM assets - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.name).toEqual('mybucket'); expect(bucket.bucket_parameters).toEqual([]); @@ -46,12 +48,15 @@ bucket_definitions: }); test('parse global sync rules with filter', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT WHERE token_parameters.is_admin data: [] - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual([]); const param_query = bucket.global_parameter_queries[0]; @@ -66,12 +71,15 @@ bucket_definitions: }); test('parse global sync rules with table filter', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT FROM users WHERE users.id = token_parameters.user_id AND users.is_admin data: [] - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual([]); const param_query = bucket.parameter_queries[0]; @@ -86,13 +94,16 @@ bucket_definitions: }); test('parse bucket with parameters', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id, user_parameters.device_id data: - SELECT id, description FROM assets WHERE assets.user_id = bucket.user_id AND assets.device_id = bucket.device_id AND NOT assets.archived - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id', 'device_id']); const param_query = bucket.global_parameter_queries[0]; @@ -129,13 +140,16 @@ bucket_definitions: }); test('parse bucket with parameters and OR condition', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id data: - SELECT id, description FROM assets WHERE assets.user_id = bucket.user_id OR assets.owner_id = bucket.user_id - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); const param_query = bucket.global_parameter_queries[0]; @@ -182,80 +196,104 @@ bucket_definitions: test('parse bucket with parameters and invalid OR condition', () => { expect(() => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id data: - SELECT id, description FROM assets WHERE assets.user_id = bucket.user_id AND (assets.user_id = bucket.foo OR assets.other_id = bucket.bar) - `); + `, + PARSE_OPTIONS + ); }).toThrowError(/must use the same parameters/); }); test('reject unsupported queries', () => { expect( - SqlSyncRules.validate(` + SqlSyncRules.validate( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id LIMIT 1 data: [] - `) + `, + PARSE_OPTIONS + ) ).toMatchObject([{ message: 'LIMIT is not supported' }]); expect( - SqlSyncRules.validate(` + SqlSyncRules.validate( + ` bucket_definitions: mybucket: data: - SELECT DISTINCT id, description FROM assets - `) + `, + PARSE_OPTIONS + ) ).toMatchObject([{ message: 'DISTINCT is not supported' }]); expect( - SqlSyncRules.validate(` + SqlSyncRules.validate( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id OFFSET 10 data: [] - `) + `, + PARSE_OPTIONS + ) ).toMatchObject([{ message: 'LIMIT is not supported' }]); expect(() => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id FOR UPDATE SKIP LOCKED data: [] - `); + `, + PARSE_OPTIONS + ); }).toThrowError(/SKIP is not supported/); expect(() => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.user_id FOR UPDATE data: [] - `); + `, + PARSE_OPTIONS + ); }).toThrowError(/FOR is not supported/); expect(() => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT id, description FROM assets ORDER BY id - `); + `, + PARSE_OPTIONS + ); }).toThrowError(/ORDER BY is not supported/); }); test('transforming things', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT upper(token_parameters.user_id) AS user_id data: - SELECT id, upper(description) AS description_upper FROM assets WHERE upper(assets.user_id) = bucket.user_id AND NOT assets.archived - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); @@ -281,13 +319,16 @@ bucket_definitions: test('transforming things with upper-case functions', () => { // Testing that we can use different case for the function names - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT UPPER(token_parameters.user_id) AS user_id data: - SELECT id, UPPER(description) AS description_upper FROM assets WHERE UPPER(assets.user_id) = bucket.user_id AND NOT assets.archived - `); + `, + PARSE_OPTIONS + ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); @@ -312,12 +353,15 @@ bucket_definitions: }); test('transforming json', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT id, data ->> 'count' AS count, data -> 'bool' AS bool1, data ->> 'bool' AS bool2, 'true' ->> '$' as bool3, json_extract(data, '$.bool') AS bool4 FROM assets - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ sourceTable: ASSETS, @@ -342,13 +386,16 @@ bucket_definitions: }); test('IN json', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.region_id data: - SELECT id, description FROM assets WHERE bucket.region_id IN assets.region_ids - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ @@ -384,14 +431,17 @@ bucket_definitions: }); test('direct boolean param', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.is_admin data: - SELECT id, description, role, 'admin' as rule FROM assets WHERE bucket.is_admin - SELECT id, description, role, 'normal' as rule FROM assets WHERE (bucket.is_admin OR bucket.is_admin = false) AND assets.role != 'admin' - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1', description: 'test', role: 'admin' } }) @@ -455,12 +505,15 @@ bucket_definitions: }); test('some math', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT id, (5 / 2) AS int, (5 / 2.0) AS float, (CAST(5 AS real) / 2) AS float2 FROM assets - `); + `, + PARSE_OPTIONS + ); expect(rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1' } })).toEqual([ { @@ -479,13 +532,16 @@ bucket_definitions: }); test('bucket with static numeric parameters', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT token_parameters.int1, token_parameters.float1, token_parameters.float2 data: - SELECT id FROM assets WHERE assets.int1 = bucket.int1 AND assets.float1 = bucket.float1 AND assets.float2 = bucket.float2 - `); + `, + PARSE_OPTIONS + ); expect(rules.getStaticBucketIds(normalizeTokenParameters({ int1: 314, float1: 3.14, float2: 314 }))).toEqual([ 'mybucket[314,3.14,314]' ]); @@ -506,24 +562,30 @@ bucket_definitions: }); test('static parameter query with function on token_parameter', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: parameters: SELECT upper(token_parameters.user_id) as upper data: [] - `); + `, + PARSE_OPTIONS + ); expect(rules.errors).toEqual([]); expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'test' }))).toEqual(['mybucket["TEST"]']); }); test('custom table and id', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT client_id AS id, description FROM assets_123 as assets WHERE assets.archived = false - SELECT other_id AS id, description FROM assets_123 as assets - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ @@ -555,12 +617,15 @@ bucket_definitions: }); test('wildcard table', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT client_id AS id, description, _table_suffix as suffix, * FROM "assets_%" as assets WHERE assets.archived = false AND _table_suffix > '100' - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ @@ -586,12 +651,15 @@ bucket_definitions: }); test('wildcard without alias', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT *, _table_suffix as suffix, * FROM "%" WHERE archived = false - `); + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ @@ -615,16 +683,19 @@ bucket_definitions: }); test('should filter schemas', () => { - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT id FROM "assets" # Yes - - SELECT id FROM "public"."assets" # yes - - SELECT id FROM "default.public"."assets" # yes + - SELECT id FROM "test_schema"."assets" # yes + - SELECT id FROM "default.test_schema"."assets" # yes - SELECT id FROM "other"."assets" # no - - SELECT id FROM "other.public"."assets" # no - `); + - SELECT id FROM "other.test_schema"."assets" # no + `, + PARSE_OPTIONS + ); expect( rules.evaluateRow({ @@ -670,7 +741,7 @@ bucket_definitions: parameters: SELECT id FROM assets WHERE other_id = token_parameters.user_id data: [] `, - { schema: BASIC_SCHEMA } + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } ); expect(rules.errors).toMatchObject([ @@ -689,7 +760,7 @@ bucket_definitions: parameters: SELECT request.parameters() ->> 'project_id' as project_id data: [] `, - { schema: BASIC_SCHEMA } + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } ); expect(rules.errors).toMatchObject([ @@ -710,7 +781,7 @@ bucket_definitions: parameters: SELECT request.parameters() ->> 'project_id' as project_id data: [] `, - { schema: BASIC_SCHEMA } + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } ); expect(rules.errors).toEqual([]); @@ -722,7 +793,7 @@ bucket_definitions: tag: DEFAULT_TAG, schemas: [ { - name: DEFAULT_SCHEMA, + name: 'test_schema', tables: [ { name: 'assets', @@ -739,14 +810,17 @@ bucket_definitions: } ]); - const rules = SqlSyncRules.fromYaml(` + const rules = SqlSyncRules.fromYaml( + ` bucket_definitions: mybucket: data: - SELECT * FROM assets as assets1 - SELECT id, name, count FROM assets as assets2 - SELECT id, owner_id as other_id, foo FROM assets as ASSETS2 - `); + `, + PARSE_OPTIONS + ); expect(new DartSchemaGenerator().generate(rules, schema)).toEqual(`Schema([ Table('assets1', [ diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index 1b4b12117..e1ed5b804 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -1,5 +1,4 @@ import { - DEFAULT_SCHEMA, DEFAULT_TAG, RequestJwtPayload, RequestParameters, @@ -9,11 +8,15 @@ import { export class TestSourceTable implements SourceTableInterface { readonly connectionTag = DEFAULT_TAG; - readonly schema = DEFAULT_SCHEMA; + readonly schema = 'test_schema'; constructor(public readonly table: string) {} } +export const PARSE_OPTIONS = { + defaultSchema: 'test_schema' +}; + export const ASSETS = new TestSourceTable('assets'); export const USERS = new TestSourceTable('users'); @@ -22,7 +25,7 @@ export const BASIC_SCHEMA = new StaticSchema([ tag: DEFAULT_TAG, schemas: [ { - name: DEFAULT_SCHEMA, + name: 'test_schema', tables: [ { name: 'assets',