diff --git a/.changeset/fast-taxis-rush.md b/.changeset/fast-taxis-rush.md new file mode 100644 index 000000000..142ba93f8 --- /dev/null +++ b/.changeset/fast-taxis-rush.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Add "test-connection" CLI command diff --git a/.changeset/neat-cycles-trade.md b/.changeset/neat-cycles-trade.md new file mode 100644 index 000000000..029067dc6 --- /dev/null +++ b/.changeset/neat-cycles-trade.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-sync-rules': patch +--- + +Improved error message for "null" bucket definitions. diff --git a/.changeset/tasty-lamps-remember.md b/.changeset/tasty-lamps-remember.md new file mode 100644 index 000000000..fad3c62d2 --- /dev/null +++ b/.changeset/tasty-lamps-remember.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mongodb': patch +--- + +Improved error messages for "Test Connection". diff --git a/.changeset/wicked-tigers-eat.md b/.changeset/wicked-tigers-eat.md new file mode 100644 index 000000000..cfd40242d --- /dev/null +++ b/.changeset/wicked-tigers-eat.md @@ -0,0 +1,17 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-rsocket-router': patch +'@powersync/service-errors': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +'@powersync/service-sync-rules': patch +'@powersync/lib-service-postgres': patch +'@powersync/lib-services-framework': patch +'@powersync/lib-service-mongodb': patch +'@powersync/service-image': patch +--- + +Introduce standard error codes diff --git a/libs/lib-mongodb/src/db/db-index.ts b/libs/lib-mongodb/src/db/db-index.ts index d57b14270..2aca23d70 100644 --- a/libs/lib-mongodb/src/db/db-index.ts +++ b/libs/lib-mongodb/src/db/db-index.ts @@ -1 +1,2 @@ export * from './mongo.js'; +export * from './errors.js'; diff --git a/libs/lib-mongodb/src/db/errors.ts b/libs/lib-mongodb/src/db/errors.ts new file mode 100644 index 000000000..2c5eb183f --- /dev/null +++ b/libs/lib-mongodb/src/db/errors.ts @@ -0,0 +1,71 @@ +import { DatabaseConnectionError, ErrorCode, ServiceError } from '@powersync/lib-services-framework'; +import { isMongoServerError } from './mongo.js'; +import { MongoNetworkError, MongoServerSelectionError } from 'mongodb'; + +export function mapConnectionError(err: any): ServiceError { + const cause = err.cause; + if (ServiceError.isServiceError(err)) { + return err; + } else if (isMongoServerError(err)) { + if (err.codeName == 'AuthenticationFailed') { + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1306, + 'MongoDB authentication failed. Check the username and password.', + err + ); + } else if (err.codeName == 'Unauthorized') { + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1307, + 'MongoDB authorization issue. Check that the user has the required permissions.', + err + ); + } + // Fallback + return new DatabaseConnectionError(ErrorCode.PSYNC_S1301, `MongoDB server error: ${err.codeName}`, err); + } else if (isNetworkError(cause)) { + if (hasCode(cause.cause, 'ERR_SSL_TLSV1_ALERT_INTERNAL_ERROR')) { + // This specifically happens on shared Atlas clusters where the IP Access List is not set up correctly. + // Since it's a shared cluster, the connection is not blocked completely, but closes during the TLS setup. + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1303, + 'Internal TLS Error. Check IP Access List on the cluster.', + err + ); + } else if (hasCode(cause.cause, 'ENOTFOUND')) { + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1304, + 'DNS lookup error. Check that the hostname is correct.', + err + ); + } + // Fallback + return new DatabaseConnectionError(ErrorCode.PSYNC_S1302, 'MongoDB network error', err); + } else if (err.code == 'ENOTFOUND') { + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1304, + 'DNS lookup error. Check that the hostname is correct.', + err + ); + } else if (isMongoServerSelectionError(err) && err.message.includes('Server selection timed out')) { + return new DatabaseConnectionError( + ErrorCode.PSYNC_S1305, + 'Connection timed out. Check the IP Access List on the cluster.', + err + ); + } else { + // Fallback + return new DatabaseConnectionError(ErrorCode.PSYNC_S1301, 'MongoDB connection error', err); + } +} + +function isNetworkError(err: any): err is MongoNetworkError { + return err?.name === 'MongoNetworkError'; +} + +function isMongoServerSelectionError(err: any): err is MongoServerSelectionError { + return err?.name === 'MongoServerSelectionError'; +} + +function hasCode(err: any, code: string): boolean { + return err?.code == code; +} diff --git a/libs/lib-mongodb/src/types/types.ts b/libs/lib-mongodb/src/types/types.ts index 3647e1d06..9d4969e9f 100644 --- a/libs/lib-mongodb/src/types/types.ts +++ b/libs/lib-mongodb/src/types/types.ts @@ -1,4 +1,4 @@ -import { LookupOptions, makeHostnameLookupFunction } from '@powersync/lib-services-framework'; +import { ErrorCode, LookupOptions, makeHostnameLookupFunction, ServiceError } from '@powersync/lib-services-framework'; import * as t from 'ts-codec'; import * as urijs from 'uri-js'; @@ -44,7 +44,7 @@ export function normalizeMongoConfig(options: BaseMongoConfigDecoded) { const password = options.password ?? userInfo?.[1]; if (database == '') { - throw new Error(`database required`); + throw new ServiceError(ErrorCode.PSYNC_S1105, `MongoDB connection: database required`); } delete uri.userinfo; diff --git a/libs/lib-mongodb/tsconfig.json b/libs/lib-mongodb/tsconfig.json index a0ae425c6..cd12f39d5 100644 --- a/libs/lib-mongodb/tsconfig.json +++ b/libs/lib-mongodb/tsconfig.json @@ -8,5 +8,5 @@ "sourceMap": true }, "include": ["src"], - "references": [] + "references": [{ "path": "../lib-services" }] } diff --git a/libs/lib-postgres/src/types/types.ts b/libs/lib-postgres/src/types/types.ts index 05e370dda..3781f6d03 100644 --- a/libs/lib-postgres/src/types/types.ts +++ b/libs/lib-postgres/src/types/types.ts @@ -1,4 +1,4 @@ -import { makeHostnameLookupFunction } from '@powersync/lib-services-framework'; +import { ErrorCode, makeHostnameLookupFunction, ServiceError } from '@powersync/lib-services-framework'; import type * as jpgwire from '@powersync/service-jpgwire'; import * as service_types from '@powersync/service-types'; import * as t from 'ts-codec'; @@ -58,7 +58,10 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD if (options.uri) { uri = urijs.parse(options.uri); if (uri.scheme != 'postgresql' && uri.scheme != 'postgres') { - `Invalid URI - protocol must be postgresql, got ${uri.scheme}`; + throw new ServiceError( + ErrorCode.PSYNC_S1109, + `Invalid URI - protocol must be postgresql, got ${JSON.stringify(uri.scheme)}` + ); } else if (uri.scheme != 'postgresql') { uri.scheme = 'postgresql'; } @@ -80,23 +83,26 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD const cacert = options.cacert; if (sslmode == 'verify-ca' && cacert == null) { - throw new Error('Explicit cacert is required for sslmode=verify-ca'); + throw new ServiceError( + ErrorCode.PSYNC_S1104, + 'Postgres connection: Explicit cacert is required for `sslmode: verify-ca`' + ); } if (hostname == '') { - throw new Error(`hostname required`); + throw new ServiceError(ErrorCode.PSYNC_S1106, `Postgres connection: hostname required`); } if (username == '') { - throw new Error(`username required`); + throw new ServiceError(ErrorCode.PSYNC_S1107, `Postgres connection: username required`); } if (password == '') { - throw new Error(`password required`); + throw new ServiceError(ErrorCode.PSYNC_S1108, `Postgres connection: password required`); } if (database == '') { - throw new Error(`database required`); + throw new ServiceError(ErrorCode.PSYNC_S1105, `Postgres connection: database required`); } const lookupOptions = { reject_ip_ranges: options.reject_ip_ranges ?? [] }; @@ -132,8 +138,8 @@ export function validatePort(port: string | number): number { if (typeof port == 'string') { port = parseInt(port); } - if (port < 1024) { - throw new Error(`Port ${port} not supported`); + if (port < 1024 || port > 65535) { + throw new ServiceError(ErrorCode.PSYNC_S1110, `Port ${port} not supported`); } return port; } diff --git a/libs/lib-postgres/src/utils/pgwire_utils.ts b/libs/lib-postgres/src/utils/pgwire_utils.ts index 84c849202..71cd1e54e 100644 --- a/libs/lib-postgres/src/utils/pgwire_utils.ts +++ b/libs/lib-postgres/src/utils/pgwire_utils.ts @@ -2,7 +2,7 @@ import * as pgwire from '@powersync/service-jpgwire'; -import { logger } from '@powersync/lib-services-framework'; +import { logger, ServiceAssertionError } from '@powersync/lib-services-framework'; export function escapeIdentifier(identifier: string) { return `"${identifier.replace(/"/g, '""').replace(/\./g, '"."')}"`; @@ -24,7 +24,7 @@ export function autoParameter(arg: any): pgwire.StatementParam { } else if (typeof arg == 'bigint') { return { type: 'int8', value: arg }; } else { - throw new Error(`Unsupported query parameter: ${typeof arg}`); + throw new ServiceAssertionError(`Unsupported query parameter: ${typeof arg}`); } } diff --git a/libs/lib-postgres/tsconfig.json b/libs/lib-postgres/tsconfig.json index c818f7a14..ce4558962 100644 --- a/libs/lib-postgres/tsconfig.json +++ b/libs/lib-postgres/tsconfig.json @@ -8,5 +8,5 @@ "sourceMap": true }, "include": ["src"], - "references": [{ "path": "../../packages/jpgwire" }, { "path": "../lib-services" }] + "references": [{ "path": "../lib-services" }, { "path": "../../packages/jpgwire" }] } diff --git a/libs/lib-services/package.json b/libs/lib-services/package.json index a54f473cb..e4d8af8e7 100644 --- a/libs/lib-services/package.json +++ b/libs/lib-services/package.json @@ -20,6 +20,7 @@ }, "keywords": [], "dependencies": { + "@powersync/service-errors": "workspace:*", "ajv": "^8.12.0", "better-ajv-errors": "^1.2.0", "bson": "^6.8.0", diff --git a/libs/lib-services/src/container.ts b/libs/lib-services/src/container.ts index 20f0f4dbf..f081b995d 100644 --- a/libs/lib-services/src/container.ts +++ b/libs/lib-services/src/container.ts @@ -3,6 +3,7 @@ import { ErrorReporter } from './alerts/definitions.js'; import { NoOpReporter } from './alerts/no-op-reporter.js'; import { MigrationManager } from './migrations/MigrationManager.js'; import { ProbeModule, TerminationHandler, createFSProbe, createTerminationHandler } from './signals/signals-index.js'; +import { ServiceAssertionError } from '@powersync/service-errors'; export enum ContainerImplementation { REPORTER = 'reporter', @@ -100,7 +101,7 @@ export class Container { getImplementation(identifier: ServiceIdentifier): T { const implementation = this.implementations.get(identifier); if (!implementation) { - throw new Error(`Implementation for ${String(identifier)} has not been registered.`); + throw new ServiceAssertionError(`Implementation for ${String(identifier)} has not been registered.`); } return implementation; } diff --git a/libs/lib-services/src/errors/errors-index.ts b/libs/lib-services/src/errors/errors-index.ts index 4075db519..fc16f5a63 100644 --- a/libs/lib-services/src/errors/errors-index.ts +++ b/libs/lib-services/src/errors/errors-index.ts @@ -1,2 +1 @@ -export * from './framework-errors.js'; -export * from './utils.js'; +export * from '@powersync/service-errors'; diff --git a/libs/lib-services/src/errors/framework-errors.ts b/libs/lib-services/src/errors/framework-errors.ts deleted file mode 100644 index 7c6e3abb3..000000000 --- a/libs/lib-services/src/errors/framework-errors.ts +++ /dev/null @@ -1,150 +0,0 @@ -export enum ErrorSeverity { - INFO = 'info', - WARNING = 'warning', - ERROR = 'error' -} - -export type ErrorData = { - name?: string; - - code: string; - description: string; - - severity?: ErrorSeverity; - details?: string; - status?: number; - stack?: string; - - origin?: string; - - trace_id?: string; -}; - -// Maybe this could be renamed to ServiceError or something similar -export class JourneyError extends Error { - is_journey_error = true; - - errorData: ErrorData; - - static isJourneyError(input: any): input is JourneyError { - return input instanceof JourneyError || input?.is_journey_error == true; - } - - private static errorMessage(data: ErrorData) { - let message = `[${data.code}] ${data.description}`; - if (data.details) { - message += `\n ${data.details}`; - } - return message; - } - - constructor(data: ErrorData) { - super(JourneyError.errorMessage(data)); - - this.errorData = data; - if (data.stack) { - this.stack = data.stack; - } - - this.name = data.name || this.constructor.name; - this.errorData.name = this.name; - } - - toString() { - return this.stack; - } - - toJSON(): ErrorData { - if (process.env.NODE_ENV !== 'production') { - return this.errorData; - } - return { - name: this.errorData.name, - code: this.errorData.code, - status: this.errorData.status, - description: this.errorData.description, - details: this.errorData.details, - trace_id: this.errorData.trace_id, - severity: this.errorData.severity, - origin: this.errorData.origin - }; - } - - setTraceId(id: string) { - this.errorData.trace_id = id; - } -} - -export class ValidationError extends JourneyError { - static CODE = 'VALIDATION_ERROR'; - constructor(errors: any) { - super({ - code: ValidationError.CODE, - status: 400, - description: 'Validation failed', - details: JSON.stringify(errors) - }); - } -} - -export class AuthorizationError extends JourneyError { - static CODE = 'AUTHORIZATION'; - constructor(errors: any) { - super({ - code: AuthorizationError.CODE, - status: 401, - description: 'Authorization failed', - details: errors - }); - } -} - -export class InternalServerError extends JourneyError { - static CODE = 'INTERNAL_SERVER_ERROR'; - constructor(err: Error) { - super({ - code: InternalServerError.CODE, - severity: ErrorSeverity.ERROR, - status: 500, - description: 'Something went wrong', - details: err.message, - stack: process.env.NODE_ENV !== 'production' ? err.stack : undefined - }); - } -} - -export class ResourceNotFound extends JourneyError { - static CODE = 'RESOURCE_NOT_FOUND'; - - /** - * @deprecated Use the (resource, id) constructor instead. - * @param id - */ - constructor(id: string); - constructor(resource: string, id: string); - - constructor(resource: string, id?: string) { - const combinedId = id ? `${resource}/${id}` : resource; - super({ - code: ResourceNotFound.CODE, - status: 404, - description: 'The requested resource does not exist on this server', - details: `The resource ${combinedId} does not exist on this server`, - severity: ErrorSeverity.INFO - }); - } -} - -export class ResourceConflict extends JourneyError { - static CODE = 'RESOURCE_CONFLICT'; - - constructor(details: string) { - super({ - code: ResourceConflict.CODE, - status: 409, - description: 'The specified resource already exists on this server', - details: details, - severity: ErrorSeverity.INFO - }); - } -} diff --git a/libs/lib-services/src/errors/utils.ts b/libs/lib-services/src/errors/utils.ts deleted file mode 100644 index 878d924f4..000000000 --- a/libs/lib-services/src/errors/utils.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { ErrorData, JourneyError } from './framework-errors.js'; - -export const isJourneyError = (err: any): err is JourneyError => { - const matches = JourneyError.isJourneyError(err); - return !!matches; -}; - -export const getErrorData = (err: Error | any): ErrorData | undefined => { - if (!isJourneyError(err)) { - return; - } - return err.toJSON(); -}; - -export const matchesErrorCode = (err: Error | any, code: string) => { - if (isJourneyError(err)) { - return err.errorData.code === code; - } - return false; -}; diff --git a/libs/lib-services/src/ip/lookup.ts b/libs/lib-services/src/ip/lookup.ts index c6d518335..6c920b9ce 100644 --- a/libs/lib-services/src/ip/lookup.ts +++ b/libs/lib-services/src/ip/lookup.ts @@ -2,6 +2,7 @@ import * as net from 'node:net'; import * as dns from 'node:dns'; import * as dnsp from 'node:dns/promises'; import ip from 'ipaddr.js'; +import { ErrorCode, ServiceError } from '@powersync/service-errors'; export interface LookupOptions { reject_ip_ranges: string[]; @@ -67,19 +68,21 @@ export function validateIpHostname(hostname: string, options: LookupOptions): vo return; } - const parsed = ip.parse(hostname); + const ipaddr = hostname; + + const parsed = ip.parse(ipaddr); const rejectLocal = reject_ranges.includes('local'); const rejectSubnets = reject_ranges.filter((range) => range != 'local'); const reject = { blocked: (rejectSubnets ?? []).map((r) => ip.parseCIDR(r)) }; if (options.reject_ipv6 && parsed.kind() == 'ipv6') { - throw new Error('IPv6 not supported'); + throw new ServiceError(ErrorCode.PSYNC_S2202, 'IPv6 support not enabled for JWKS URI'); } if (ip.subnetMatch(parsed, reject) == 'blocked') { // Ranges explicitly blocked, e.g. private IPv6 ranges - throw new Error(`IPs in this range are not supported: ${hostname}`); + throw new ServiceError(ErrorCode.PSYNC_S2203, `IPs in this range are not supported: ${ipaddr}`); } if (!rejectLocal) { @@ -94,7 +97,7 @@ export function validateIpHostname(hostname: string, options: LookupOptions): vo return; } else { // Do not connect to any reserved IPs, including loopback and private ranges - throw new Error(`IPs in this range are not supported: ${hostname}`); + throw new ServiceError(ErrorCode.PSYNC_S2203, `IPs in this range are not supported: ${ipaddr}`); } } diff --git a/libs/lib-services/test/src/__snapshots__/errors.test.ts.snap b/libs/lib-services/test/src/__snapshots__/errors.test.ts.snap index 93bda2842..92f3c4c3e 100644 --- a/libs/lib-services/test/src/__snapshots__/errors.test.ts.snap +++ b/libs/lib-services/test/src/__snapshots__/errors.test.ts.snap @@ -1,10 +1,10 @@ // Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html -exports[`errors > utilities should properly match a journey error 1`] = ` +exports[`errors > utilities should properly match a service error 1`] = ` { - "code": "CUSTOM_JOURNEY_ERROR", + "code": "PSYNC_S0001", "description": "This is a custom error", "details": "this is some more detailed information", - "name": "CustomJourneyError", + "name": "CustomServiceError", } `; diff --git a/libs/lib-services/test/src/errors.test.ts b/libs/lib-services/test/src/errors.test.ts index 82cccd901..cdbaab129 100644 --- a/libs/lib-services/test/src/errors.test.ts +++ b/libs/lib-services/test/src/errors.test.ts @@ -2,10 +2,12 @@ import { describe, test, expect } from 'vitest'; import * as errors from '../../src/errors/errors-index.js'; -class CustomJourneyError extends errors.JourneyError { +const { ErrorCode } = errors; + +class CustomServiceError extends errors.ServiceError { constructor() { super({ - code: 'CUSTOM_JOURNEY_ERROR', + code: ErrorCode.PSYNC_S0001, description: 'This is a custom error', details: 'this is some more detailed information' }); @@ -14,34 +16,34 @@ class CustomJourneyError extends errors.JourneyError { describe('errors', () => { test('it should respond to instanceof checks', () => { - const error = new CustomJourneyError(); + const error = new CustomServiceError(); expect(error instanceof Error).toBe(true); - expect(error instanceof errors.JourneyError).toBe(true); - expect(error.name).toBe('CustomJourneyError'); + expect(error instanceof errors.ServiceError).toBe(true); + expect(error.name).toBe('CustomServiceError'); }); test('it should serialize properly', () => { - const error = new CustomJourneyError(); + const error = new CustomServiceError(); // The error stack will contain host specific path information. We only care about the header // anyway and that the stack is shown - indicated by the initial `at` text - const initial = `CustomJourneyError: [CUSTOM_JOURNEY_ERROR] This is a custom error + const initial = `CustomServiceError: [PSYNC_S0001] This is a custom error this is some more detailed information at`; expect(`${error}`.startsWith(initial)).toBe(true); }); - test('utilities should properly match a journey error', () => { - const standard_error = new Error('non-journey error'); - const error = new CustomJourneyError(); + test('utilities should properly match a service error', () => { + const standard_error = new Error('non-service error'); + const error = new CustomServiceError(); - expect(errors.isJourneyError(standard_error)).toBe(false); - expect(errors.isJourneyError(error)).toBe(true); + expect(errors.isServiceError(standard_error)).toBe(false); + expect(errors.isServiceError(error)).toBe(true); - expect(errors.matchesErrorCode(error, 'CUSTOM_JOURNEY_ERROR')).toBe(true); - expect(errors.matchesErrorCode(standard_error, 'CUSTOM_JOURNEY_ERROR')).toBe(false); + expect(errors.matchesErrorCode(error, ErrorCode.PSYNC_S0001)).toBe(true); + expect(errors.matchesErrorCode(standard_error, ErrorCode.PSYNC_S0001)).toBe(false); expect(errors.getErrorData(error)).toMatchSnapshot(); expect(errors.getErrorData(standard_error)).toBe(undefined); diff --git a/libs/lib-services/tsconfig.json b/libs/lib-services/tsconfig.json index d37ee074d..8f56f8021 100644 --- a/libs/lib-services/tsconfig.json +++ b/libs/lib-services/tsconfig.json @@ -6,5 +6,5 @@ "rootDir": "src" }, "include": ["src"], - "references": [] + "references": [{ "path": "../../packages/service-errors" }] } diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1702295701188-sync-rule-state.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1702295701188-sync-rule-state.ts index 1a24d302d..e7a56f88a 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1702295701188-sync-rule-state.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1702295701188-sync-rule-state.ts @@ -2,6 +2,7 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { storage as core_storage, migrations } from '@powersync/service-core'; import * as storage from '../../../storage/storage-index.js'; import { MongoStorageConfig } from '../../../types/types.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; interface LegacySyncRulesDocument extends storage.SyncRuleDocument { /** @@ -65,7 +66,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { const remaining = await db.sync_rules.find({ state: null as any }).toArray(); if (remaining.length > 0) { const slots = remaining.map((doc) => doc.slot_name).join(', '); - throw new Error(`Invalid state for sync rules: ${slots}`); + throw new ServiceAssertionError(`Invalid state for sync rules: ${slots}`); } } finally { await db.client.close(); diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 87f577eca..908872cc3 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -5,7 +5,7 @@ import * as timers from 'timers/promises'; import { storage, sync, utils } from '@powersync/service-core'; -import { DisposableObserver, logger } from '@powersync/lib-services-framework'; +import { DisposableObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; import { v4 as uuid } from 'uuid'; import * as lib_mongo from '@powersync/lib-service-mongodb'; @@ -433,7 +433,7 @@ export class MongoBucketStorage clusterTime = time; }); if (clusterTime == null) { - throw new Error('Could not get clusterTime'); + throw new ServiceError(ErrorCode.PSYNC_S2401, 'Could not get clusterTime'); } if (signal.aborted) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 6e0827a52..01d5577a1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -2,7 +2,16 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { SqlEventDescriptor, SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; -import { container, DisposableObserver, errors, logger } from '@powersync/lib-services-framework'; +import { + container, + DisposableObserver, + ErrorCode, + errors, + logger, + ReplicationAssertionError, + ServiceAssertionError, + ServiceError +} from '@powersync/lib-services-framework'; import { SaveOperationTag, storage, utils } from '@powersync/service-core'; import * as timers from 'node:timers/promises'; import { PowerSyncMongo } from './db.js'; @@ -140,7 +149,7 @@ export class MongoBucketBatch this.batch = resumeBatch; if (last_op == null) { - throw new Error('Unexpected last_op == null'); + throw new ReplicationAssertionError('Unexpected last_op == null'); } this.persisted_op = last_op; @@ -294,7 +303,7 @@ export class MongoBucketBatch return null; } } else { - throw new Error(`${record.tag} not supported with skipExistingRows: true`); + throw new ReplicationAssertionError(`${record.tag} not supported with skipExistingRows: true`); } } @@ -348,7 +357,7 @@ export class MongoBucketBatch afterData = new bson.Binary(bson.serialize(after!)); // We additionally make sure it's <= 15MB - we need some margin for metadata. if (afterData.length() > MAX_ROW_SIZE) { - throw new Error(`Row too large: ${afterData.length()}`); + throw new ServiceError(ErrorCode.PSYNC_S1002, `Row too large: ${afterData.length()}`); } } catch (e) { // Replace with empty values, equivalent to TOAST values @@ -548,7 +557,7 @@ export class MongoBucketBatch logger.info(`${this.slot_name} ${description} - try ${flushTry}`); } if (flushTry > 20 && Date.now() > lastTry) { - throw new Error('Max transaction tries exceeded'); + throw new ServiceError(ErrorCode.PSYNC_S1402, 'Max transaction tries exceeded'); } const next_op_id_doc = await this.db.op_id_sequence.findOneAndUpdate( diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 2c28751f1..b9238c1e2 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,5 +1,5 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { logger } from '@powersync/lib-services-framework'; +import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework'; import { storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; @@ -335,7 +335,9 @@ export class MongoCompactor { } } } else { - throw new Error(`Unexpected ${op.op} operation at ${op._id.g}:${op._id.b}:${op._id.o}`); + throw new ReplicationAssertionError( + `Unexpected ${op.op} operation at ${op._id.g}:${op._id.b}:${op._id.o}` + ); } } if (!gotAnOp) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoIdSequence.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoIdSequence.ts index 3392f3abd..633d15cfe 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoIdSequence.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoIdSequence.ts @@ -1,3 +1,5 @@ +import { ReplicationAssertionError } from '@powersync/lib-services-framework'; + /** * Manages op_id or similar sequence in memory. * @@ -9,7 +11,7 @@ export class MongoIdSequence { constructor(last: bigint) { if (typeof last != 'bigint') { - throw new Error(`BigInt required, got ${last} ${typeof last}`); + throw new ReplicationAssertionError(`BigInt required, got ${last} ${typeof last}`); } this._last = last; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts index cfa010b26..dd133b269 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts @@ -1,5 +1,5 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; -import { logger } from '@powersync/lib-services-framework'; +import { logger, ServiceAssertionError } from '@powersync/lib-services-framework'; import { storage } from '@powersync/service-core'; import { MongoStorageConfig } from '../../types/types.js'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; @@ -16,7 +16,9 @@ export class MongoStorageProvider implements storage.BucketStorageProvider { const { storage } = resolvedConfig; if (storage.type != this.type) { // This should not be reached since the generation should be managed externally. - throw new Error(`Cannot create MongoDB bucket storage with provided config ${storage.type} !== ${this.type}`); + throw new ServiceAssertionError( + `Cannot create MongoDB bucket storage with provided config ${storage.type} !== ${this.type}` + ); } const decodedConfig = MongoStorageConfig.decode(storage as any); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index e4268718a..a2d51edfa 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -1,6 +1,6 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { mongo } from '@powersync/lib-service-mongodb'; -import { DisposableObserver, logger } from '@powersync/lib-services-framework'; +import { DisposableObserver, logger, ServiceAssertionError } from '@powersync/lib-services-framework'; import { storage, utils } from '@powersync/service-core'; import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; @@ -344,7 +344,7 @@ export class MongoSyncBucketStorage start ??= dataBuckets.get(bucket); if (start == null) { - throw new Error(`data for unexpected bucket: ${bucket}`); + throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`); } currentBatch = { bucket, @@ -479,7 +479,7 @@ export class MongoSyncBucketStorage } ); if (doc == null) { - throw new Error('Cannot find sync rules status'); + throw new ServiceAssertionError('Cannot find sync rules status'); } return { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts index bf69a0851..0c5b7b30e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts @@ -1,6 +1,6 @@ import crypto from 'crypto'; -import { logger } from '@powersync/lib-services-framework'; +import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; import { storage } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; @@ -33,7 +33,10 @@ export class MongoSyncRulesLock implements storage.ReplicationLock { ); if (doc == null) { - throw new Error(`Sync rules: ${sync_rules.id} have been locked by another process for replication.`); + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${sync_rules.id} have been locked by another process for replication.` + ); } return new MongoSyncRulesLock(db, sync_rules.id, lockId); } diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 64040a745..803c873e7 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -3,4 +3,9 @@ import { register } from '@powersync/service-core-tests'; import { describe } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; -describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY, { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 })); \ No newline at end of file +describe('Mongo Sync Bucket Storage Compact', () => + register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY, { + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1 + })); diff --git a/modules/module-mongodb/src/module/MongoModule.ts b/modules/module-mongodb/src/module/MongoModule.ts index cf03cb4bf..b38c0e03d 100644 --- a/modules/module-mongodb/src/module/MongoModule.ts +++ b/modules/module-mongodb/src/module/MongoModule.ts @@ -1,5 +1,12 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; -import { api, ConfigurationFileSyncRulesProvider, replication, system, TearDownOptions } from '@powersync/service-core'; +import { + api, + ConfigurationFileSyncRulesProvider, + ConnectionTestResult, + replication, + system, + TearDownOptions +} from '@powersync/service-core'; import { MongoRouteAPIAdapter } from '../api/MongoRouteAPIAdapter.js'; import { ChangeStreamReplicator } from '../replication/ChangeStreamReplicator.js'; import { ConnectionManagerFactory } from '../replication/ConnectionManagerFactory.js'; @@ -46,22 +53,31 @@ export class MongoModule extends replication.ReplicationModule { - // TODO: Implement? + // No-op } - async testConnection(config: types.MongoConnectionConfig): Promise { + async testConnection(config: types.MongoConnectionConfig) { this.decodeConfig(config); - const normalisedConfig = this.resolveConfig(this.decodedConfig!); - const connectionManager = new MongoManager(normalisedConfig, { + const normalizedConfig = this.resolveConfig(this.decodedConfig!); + return await MongoModule.testConnection(normalizedConfig); + } + + static async testConnection(normalizedConfig: types.NormalizedMongoConnectionConfig): Promise { + const connectionManager = new MongoManager(normalizedConfig, { // Use short timeouts for testing connections. // Must be < 30s, to ensure we get a proper timeout error. - socketTimeoutMS: 5_000, - serverSelectionTimeoutMS: 5_000 + socketTimeoutMS: 1_000, + serverSelectionTimeoutMS: 1_000 }); try { - return await checkSourceConfiguration(connectionManager); + await checkSourceConfiguration(connectionManager); + } catch (e) { + throw lib_mongo.mapConnectionError(e); } finally { await connectionManager.end(); } + return { + connectionDescription: normalizedConfig.uri + }; } } diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index a74fd695d..cb2e1ffa6 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,5 +1,12 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { container, logger } from '@powersync/lib-services-framework'; +import { + container, + ErrorCode, + logger, + ReplicationAbortedError, + ReplicationAssertionError, + ServiceError +} from '@powersync/lib-services-framework'; import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; import { PostImagesOption } from '../types/types.js'; @@ -180,12 +187,18 @@ export class ChangeStream { const hello = await this.defaultDb.command({ hello: 1 }); const snapshotTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; if (hello.msg == 'isdbgrid') { - throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).'); + throw new ServiceError( + ErrorCode.PSYNC_S1341, + 'Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).' + ); } else if (hello.setName == null) { - throw new Error('Standalone MongoDB instances are not supported - use a replicaset.'); + throw new ServiceError( + ErrorCode.PSYNC_S1342, + 'Standalone MongoDB instances are not supported - use a replicaset.' + ); } else if (snapshotTime == null) { // Not known where this would happen apart from the above cases - throw new Error('MongoDB lastWrite timestamp not found.'); + throw new ReplicationAssertionError('MongoDB lastWrite timestamp not found.'); } // We previously used {snapshot: true} for the snapshot session. // While it gives nice consistency guarantees, it fails when the @@ -294,7 +307,7 @@ export class ChangeStream { for await (let document of cursor) { if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication`); + throw new ReplicationAbortedError(`Aborted initial replication`); } at += 1; @@ -367,7 +380,7 @@ export class ChangeStream { }); logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`); } else if (!enabled) { - throw new Error(`postImages not enabled on ${db}.${collectionInfo.name}`); + throw new ServiceError(ErrorCode.PSYNC_S1343, `postImages not enabled on ${db}.${collectionInfo.name}`); } } @@ -385,7 +398,7 @@ export class ChangeStream { const snapshot = options.snapshot; if (!descriptor.objectId && typeof descriptor.objectId != 'string') { - throw new Error('objectId expected'); + throw new ReplicationAssertionError('MongoDB replication - objectId expected'); } const result = await this.storage.resolveTable({ group_id: this.group_id, @@ -466,7 +479,7 @@ export class ChangeStream { beforeReplicaId: change.documentKey._id }); } else { - throw new Error(`Unsupported operation: ${change.operationType}`); + throw new ReplicationAssertionError(`Unsupported operation: ${change.operationType}`); } } @@ -607,7 +620,7 @@ export class ChangeStream { } } else if (splitDocument != null) { // We were waiting for fragments, but got a different event - throw new Error(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); + throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } // console.log('event', changeDocument); diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts index 2cf96c494..9f6aae168 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts @@ -2,6 +2,7 @@ import { storage, replication } from '@powersync/service-core'; import { ChangeStreamReplicationJob } from './ChangeStreamReplicationJob.js'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; import { MongoErrorRateLimiter } from './MongoErrorRateLimiter.js'; +import { MongoModule } from '../module/MongoModule.js'; export interface ChangeStreamReplicatorOptions extends replication.AbstractReplicatorOptions { connectionFactory: ConnectionManagerFactory; @@ -33,4 +34,8 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator DEPTH_LIMIT) { // This is primarily to prevent infinite recursion - throw new Error(`json nested object depth exceeds the limit of ${DEPTH_LIMIT}`); + throw new ServiceError(ErrorCode.PSYNC_S1004, `json nested object depth exceeds the limit of ${DEPTH_LIMIT}`); } if (data === null) { return data; diff --git a/modules/module-mongodb/src/replication/replication-utils.ts b/modules/module-mongodb/src/replication/replication-utils.ts index 5370fdd5e..fc095bbb8 100644 --- a/modules/module-mongodb/src/replication/replication-utils.ts +++ b/modules/module-mongodb/src/replication/replication-utils.ts @@ -1,13 +1,88 @@ +import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; import { MongoManager } from './MongoManager.js'; +import { PostImagesOption } from '../types/types.js'; export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints'; +const REQUIRED_CHECKPOINT_PERMISSIONS = ['find', 'insert', 'update', 'remove', 'changeStream', 'createCollection']; + export async function checkSourceConfiguration(connectionManager: MongoManager): Promise { const db = connectionManager.db; + const hello = await db.command({ hello: 1 }); if (hello.msg == 'isdbgrid') { - throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).'); + throw new ServiceError( + ErrorCode.PSYNC_S1341, + 'Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).' + ); } else if (hello.setName == null) { - throw new Error('Standalone MongoDB instances are not supported - use a replicaset.'); + throw new ServiceError(ErrorCode.PSYNC_S1342, 'Standalone MongoDB instances are not supported - use a replicaset.'); + } + + // https://www.mongodb.com/docs/manual/reference/command/connectionStatus/ + const connectionStatus = await db.command({ connectionStatus: 1, showPrivileges: true }); + const priviledges = connectionStatus.authInfo?.authenticatedUserPrivileges as { + resource: { db: string; collection: string }; + actions: string[]; + }[]; + let checkpointsActions = new Set(); + let anyCollectionActions = new Set(); + if (priviledges?.length > 0) { + const onDefaultDb = priviledges.filter((p) => p.resource.db == db.databaseName || p.resource.db == ''); + const onCheckpoints = onDefaultDb.filter( + (p) => p.resource.collection == CHECKPOINTS_COLLECTION || p.resource?.collection == '' + ); + + for (let p of onCheckpoints) { + for (let a of p.actions) { + checkpointsActions.add(a); + } + } + for (let p of onDefaultDb) { + for (let a of p.actions) { + anyCollectionActions.add(a); + } + } + + const missingCheckpointActions = REQUIRED_CHECKPOINT_PERMISSIONS.filter( + (action) => !checkpointsActions.has(action) + ); + if (missingCheckpointActions.length > 0) { + const fullName = `${db.databaseName}.${CHECKPOINTS_COLLECTION}`; + throw new ServiceError( + ErrorCode.PSYNC_S1307, + `MongoDB user does not have the required ${missingCheckpointActions.map((a) => `"${a}"`).join(', ')} priviledge(s) on "${fullName}".` + ); + } + + if (connectionManager.options.postImages == PostImagesOption.AUTO_CONFIGURE) { + // This checks that we have collMod on _any_ collection in the db. + // This is not a complete check, but does give a basic sanity-check for testing the connection. + if (!anyCollectionActions.has('collMod')) { + throw new ServiceError( + ErrorCode.PSYNC_S1307, + `MongoDB user does not have the required "collMod" priviledge on "${db.databaseName}", required for "post_images: auto_configure".` + ); + } + } + if (!anyCollectionActions.has('listCollections')) { + throw new ServiceError( + ErrorCode.PSYNC_S1307, + `MongoDB user does not have the required "listCollections" priviledge on "${db.databaseName}".` + ); + } + } else { + // Assume auth is disabled. + // On Atlas, at least one role/priviledge is required for each user, which will trigger the above. + + // We do still do a basic check that we can list the collection (it may not actually exist yet). + await db + .listCollections( + { + name: CHECKPOINTS_COLLECTION + }, + { nameOnly: false } + ) + .toArray(); } } diff --git a/modules/module-mysql/src/module/MySQLModule.ts b/modules/module-mysql/src/module/MySQLModule.ts index a4ab36bae..32d6fbc2f 100644 --- a/modules/module-mysql/src/module/MySQLModule.ts +++ b/modules/module-mysql/src/module/MySQLModule.ts @@ -1,4 +1,11 @@ -import { api, ConfigurationFileSyncRulesProvider, replication, system, TearDownOptions } from '@powersync/service-core'; +import { + api, + ConfigurationFileSyncRulesProvider, + ConnectionTestResult, + replication, + system, + TearDownOptions +} from '@powersync/service-core'; import { MySQLRouteAPIAdapter } from '../api/MySQLRouteAPIAdapter.js'; import { BinLogReplicator } from '../replication/BinLogReplicator.js'; @@ -54,10 +61,14 @@ export class MySQLModule extends replication.ReplicationModule { + async testConnection(config: MySQLConnectionConfig) { this.decodeConfig(config); - const normalisedConfig = this.resolveConfig(this.decodedConfig!); - const connectionManager = new MySQLConnectionManager(normalisedConfig, {}); + const normalizedConfig = this.resolveConfig(this.decodedConfig!); + return await MySQLModule.testConnection(normalizedConfig); + } + + static async testConnection(normalizedConfig: types.ResolvedConnectionConfig): Promise { + const connectionManager = new MySQLConnectionManager(normalizedConfig, {}); const connection = await connectionManager.getConnection(); try { const errors = await checkSourceConfiguration(connection); @@ -67,5 +78,8 @@ export class MySQLModule extends replication.ReplicationModule { if (!isPostgresStorageConfig(config)) { - throw new Error(`Input storage configuration is not for Postgres`); + throw new ServiceAssertionError(`Input storage configuration is not for Postgres`); } return new lib_postgres.DatabaseClient({ config: normalizePostgresStorageConfig(PostgresStorageConfig.decode(config)), diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index b81ba5f78..f88657dc6 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -1,5 +1,5 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; -import { logger } from '@powersync/lib-services-framework'; +import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework'; import { storage, utils } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as t from 'ts-codec'; @@ -313,7 +313,9 @@ export class PostgresCompactor { } } } else { - throw new Error(`Unexpected ${op.op} operation at ${this.group_id}:${bucket}:${op.op_id}`); + throw new ReplicationAssertionError( + `Unexpected ${op.op} operation at ${this.group_id}:${bucket}:${op.op_id}` + ); } } } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 6dc929ef9..47735bd88 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -1,666 +1,666 @@ -import * as lib_postgres from '@powersync/lib-service-postgres'; -import { DisposableObserver } from '@powersync/lib-services-framework'; -import { storage, utils } from '@powersync/service-core'; -import { JSONBig } from '@powersync/service-jsonbig'; -import * as sync_rules from '@powersync/service-sync-rules'; -import * as uuid from 'uuid'; -import { BIGINT_MAX } from '../types/codecs.js'; -import { models, RequiredOperationBatchLimits } from '../types/types.js'; -import { replicaIdToSubkey } from '../utils/bson.js'; -import { mapOpEntry } from '../utils/bucket-data.js'; - -import { StatementParam } from '@powersync/service-jpgwire'; -import { StoredRelationId } from '../types/models/SourceTable.js'; -import { pick } from '../utils/ts-codec.js'; -import { PostgresBucketBatch } from './batch/PostgresBucketBatch.js'; -import { PostgresWriteCheckpointAPI } from './checkpoints/PostgresWriteCheckpointAPI.js'; -import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; -import { PostgresCompactor } from './PostgresCompactor.js'; - -export type PostgresSyncRulesStorageOptions = { - factory: PostgresBucketStorageFactory; - db: lib_postgres.DatabaseClient; - sync_rules: storage.PersistedSyncRulesContent; - write_checkpoint_mode?: storage.WriteCheckpointMode; - batchLimits: RequiredOperationBatchLimits; -}; - -export class PostgresSyncRulesStorage - extends DisposableObserver - implements storage.SyncRulesBucketStorage -{ - public readonly group_id: number; - public readonly sync_rules: storage.PersistedSyncRulesContent; - public readonly slot_name: string; - public readonly factory: PostgresBucketStorageFactory; - - protected db: lib_postgres.DatabaseClient; - protected writeCheckpointAPI: PostgresWriteCheckpointAPI; - - // TODO we might be able to share this in an abstract class - private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; - private checksumCache = new storage.ChecksumCache({ - fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); - } - }); - - constructor(protected options: PostgresSyncRulesStorageOptions) { - super(); - this.group_id = options.sync_rules.id; - this.db = options.db; - this.sync_rules = options.sync_rules; - this.slot_name = options.sync_rules.slot_name; - this.factory = options.factory; - - this.writeCheckpointAPI = new PostgresWriteCheckpointAPI({ - db: this.db, - mode: options.write_checkpoint_mode ?? storage.WriteCheckpointMode.MANAGED - }); - } - - get writeCheckpointMode(): storage.WriteCheckpointMode { - return this.writeCheckpointAPI.writeCheckpointMode; - } - - // TODO we might be able to share this in an abstract class - getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.SqlSyncRules { - const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {}; - /** - * Check if the cached sync rules, if present, had the same options. - * Parse sync rules if the options are different or if there is no cached value. - */ - if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) { - this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options }; - } - - return this.parsedSyncRulesCache!.parsed; - } - - async reportError(e: any): Promise { - const message = String(e.message ?? 'Replication failure'); - await this.db.sql` - UPDATE sync_rules - SET - last_fatal_error = ${{ type: 'varchar', value: message }} - WHERE - id = ${{ type: 'int4', value: this.group_id }}; - `.execute(); - } - - compact(options?: storage.CompactOptions): Promise { - return new PostgresCompactor(this.db, this.group_id, options).compact(); - } - - batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( - checkpoints.map((c) => ({ ...c, sync_rules_id: this.group_id })) - ); - } - - createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint({ - ...checkpoint, - sync_rules_id: this.group_id - }); - } - - lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { - return this.writeCheckpointAPI.lastWriteCheckpoint({ - ...filters, - sync_rules_id: this.group_id - }); - } - - setWriteCheckpointMode(mode: storage.WriteCheckpointMode): void { - return this.writeCheckpointAPI.setWriteCheckpointMode(mode); - } - - createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); - } - - async getCheckpoint(): Promise { - const checkpointRow = await this.db.sql` - SELECT - last_checkpoint, - last_checkpoint_lsn - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['last_checkpoint', 'last_checkpoint_lsn'])) - .first(); - - return { - checkpoint: utils.timestampToOpId(checkpointRow?.last_checkpoint ?? 0n), - lsn: checkpointRow?.last_checkpoint_lsn ?? null - }; - } - - async resolveTable(options: storage.ResolveTableOptions): Promise { - const { group_id, connection_id, connection_tag, entity_descriptor } = options; - - const { schema, name: table, objectId, replicationColumns } = entity_descriptor; - - const columns = replicationColumns.map((column) => ({ - name: column.name, - type: column.type, - // The PGWire returns this as a BigInt. We want to store this as JSONB - type_oid: typeof column.typeId !== 'undefined' ? Number(column.typeId) : column.typeId - })); - return this.db.transaction(async (db) => { - let sourceTableRow = await db.sql` - SELECT - * - FROM - source_tables - WHERE - group_id = ${{ type: 'int4', value: group_id }} - AND connection_id = ${{ type: 'int4', value: connection_id }} - AND relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} - AND schema_name = ${{ type: 'varchar', value: schema }} - AND table_name = ${{ type: 'varchar', value: table }} - AND replica_id_columns = ${{ type: 'jsonb', value: columns }} - ` - .decoded(models.SourceTable) - .first(); - - if (sourceTableRow == null) { - const row = await db.sql` - INSERT INTO - source_tables ( - id, - group_id, - connection_id, - relation_id, - schema_name, - table_name, - replica_id_columns - ) - VALUES - ( - ${{ type: 'varchar', value: uuid.v4() }}, - ${{ type: 'int4', value: group_id }}, - ${{ type: 'int4', value: connection_id }}, - --- The objectId can be string | number, we store it as jsonb value - ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }}, - ${{ type: 'varchar', value: schema }}, - ${{ type: 'varchar', value: table }}, - ${{ type: 'jsonb', value: columns }} - ) - RETURNING - * - ` - .decoded(models.SourceTable) - .first(); - sourceTableRow = row; - } - - const sourceTable = new storage.SourceTable( - sourceTableRow!.id, - connection_tag, - objectId, - schema, - table, - replicationColumns, - sourceTableRow!.snapshot_done ?? true - ); - sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable); - sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); - sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable); - - const truncatedTables = await db.sql` - SELECT - * - FROM - source_tables - WHERE - group_id = ${{ type: 'int4', value: group_id }} - AND connection_id = ${{ type: 'int4', value: connection_id }} - AND id != ${{ type: 'varchar', value: sourceTableRow!.id }} - AND ( - relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} - OR ( - schema_name = ${{ type: 'varchar', value: schema }} - AND table_name = ${{ type: 'varchar', value: table }} - ) - ) - ` - .decoded(models.SourceTable) - .rows(); - - return { - table: sourceTable, - dropTables: truncatedTables.map( - (doc) => - new storage.SourceTable( - doc.id, - connection_tag, - doc.relation_id?.object_id ?? 0, - doc.schema_name, - doc.table_name, - doc.replica_id_columns?.map((c) => ({ - name: c.name, - typeOid: c.typeId, - type: c.type - })) ?? [], - doc.snapshot_done ?? true - ) - ) - }; - }); - } - - async startBatch( - options: storage.StartBatchOptions, - callback: (batch: storage.BucketStorageBatch) => Promise - ): Promise { - const syncRules = await this.db.sql` - SELECT - last_checkpoint_lsn, - no_checkpoint_before, - keepalive_op - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op'])) - .first(); - - const checkpoint_lsn = syncRules?.last_checkpoint_lsn ?? null; - - await using batch = new PostgresBucketBatch({ - db: this.db, - sync_rules: this.sync_rules.parsed(options).sync_rules, - group_id: this.group_id, - slot_name: this.slot_name, - last_checkpoint_lsn: checkpoint_lsn, - keep_alive_op: syncRules?.keepalive_op, - no_checkpoint_before_lsn: syncRules?.no_checkpoint_before ?? options.zeroLSN, - store_current_data: options.storeCurrentData, - skip_existing_rows: options.skipExistingRows ?? false, - batch_limits: this.options.batchLimits - }); - this.iterateListeners((cb) => cb.batchStarted?.(batch)); - - await callback(batch); - await batch.flush(); - if (batch.last_flushed_op) { - return { flushed_op: String(batch.last_flushed_op) }; - } else { - return null; - } - } - - async getParameterSets( - checkpoint: utils.OpId, - lookups: sync_rules.SqliteJsonValue[][] - ): Promise { - const rows = await this.db.sql` - SELECT DISTINCT - ON (lookup, source_table, source_key) lookup, - source_table, - source_key, - id, - bucket_parameters - FROM - bucket_parameters - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - AND lookup = ANY ( - SELECT - decode((FILTER ->> 0)::text, 'hex') -- Decode the hex string to bytea - FROM - jsonb_array_elements(${{ - type: 'jsonb', - value: lookups.map((l) => storage.serializeLookupBuffer(l).toString('hex')) - }}) AS FILTER - ) - AND id <= ${{ type: 'int8', value: BigInt(checkpoint) }} - ORDER BY - lookup, - source_table, - source_key, - id DESC - ` - .decoded(pick(models.BucketParameters, ['bucket_parameters'])) - .rows(); - - const groupedParameters = rows.map((row) => { - return JSONBig.parse(row.bucket_parameters) as sync_rules.SqliteJsonRow; - }); - return groupedParameters.flat(); - } - - async *getBucketDataBatch( - checkpoint: utils.OpId, - dataBuckets: Map, - options?: storage.BucketDataBatchOptions - ): AsyncIterable { - if (dataBuckets.size == 0) { - return; - } - - const end = checkpoint ?? BIGINT_MAX; - const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({ - bucket_name: name, - start: start - })); - - const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; - const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; - - let batchSize = 0; - let currentBatch: utils.SyncBucketData | null = null; - let targetOp: bigint | null = null; - let rowCount = 0; - - /** - * It is possible to perform this query with JSONB join. e.g. - * ```sql - * WITH - * filter_data AS ( - * SELECT - * FILTER ->> 'bucket_name' AS bucket_name, - * (FILTER ->> 'start')::BIGINT AS start_op_id - * FROM - * jsonb_array_elements($1::jsonb) AS FILTER - * ) - * SELECT - * b.*, - * octet_length(b.data) AS data_size - * FROM - * bucket_data b - * JOIN filter_data f ON b.bucket_name = f.bucket_name - * AND b.op_id > f.start_op_id - * AND b.op_id <= $2 - * WHERE - * b.group_id = $3 - * ORDER BY - * b.bucket_name ASC, - * b.op_id ASC - * LIMIT - * $4; - * ``` - * Which might be better for large volumes of buckets, but in testing the JSON method - * was significantly slower than the method below. Syncing 2.5 million rows in a single - * bucket takes 2 minutes and 11 seconds with the method below. With the JSON method - * 1 million rows were only synced before a 5 minute timeout. - */ - for await (const rows of this.db.streamRows({ - statement: ` - SELECT - * - FROM - bucket_data - WHERE - group_id = $1 - and op_id <= $2 - and ( - ${filters.map((f, index) => `(bucket_name = $${index * 2 + 4} and op_id > $${index * 2 + 5})`).join(' OR ')} - ) - ORDER BY - bucket_name ASC, - op_id ASC - LIMIT - $3;`, - params: [ - { type: 'int4', value: this.group_id }, - { type: 'int8', value: end }, - { type: 'int4', value: rowLimit + 1 }, - ...filters.flatMap((f) => [ - { type: 'varchar' as const, value: f.bucket_name }, - { type: 'int8' as const, value: f.start } satisfies StatementParam - ]) - ] - })) { - const decodedRows = rows.map((r) => models.BucketData.decode(r as any)); - - for (const row of decodedRows) { - const { bucket_name } = row; - const rowSize = row.data ? row.data.length : 0; - - if ( - currentBatch == null || - currentBatch.bucket != bucket_name || - batchSize >= sizeLimit || - (currentBatch?.data.length && batchSize + rowSize > sizeLimit) || - currentBatch.data.length >= rowLimit - ) { - let start: string | undefined = undefined; - if (currentBatch != null) { - if (currentBatch.bucket == bucket_name) { - currentBatch.has_more = true; - } - - const yieldBatch = currentBatch; - start = currentBatch.after; - currentBatch = null; - batchSize = 0; - yield { batch: yieldBatch, targetOp: targetOp }; - targetOp = null; - if (rowCount >= rowLimit) { - // We've yielded all the requested rows - break; - } - } - - start ??= dataBuckets.get(bucket_name); - if (start == null) { - throw new Error(`data for unexpected bucket: ${bucket_name}`); - } - currentBatch = { - bucket: bucket_name, - after: start, - has_more: false, - data: [], - next_after: start - }; - targetOp = null; - } - - const entry = mapOpEntry(row); - - if (row.source_table && row.source_key) { - entry.subkey = replicaIdToSubkey(row.source_table, storage.deserializeReplicaId(row.source_key)); - } - - if (row.target_op != null) { - // MOVE, CLEAR - const rowTargetOp = row.target_op; - if (targetOp == null || rowTargetOp > targetOp) { - targetOp = rowTargetOp; - } - } - - currentBatch.data.push(entry); - currentBatch.next_after = entry.op_id; - - batchSize += rowSize; - - // Manually track the total rows yielded - rowCount++; - } - } - - if (currentBatch != null) { - const yieldBatch = currentBatch; - currentBatch = null; - yield { batch: yieldBatch, targetOp: targetOp }; - targetOp = null; - } - } - - async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise { - return this.checksumCache.getChecksumMap(checkpoint, buckets); - } - - async terminate(options?: storage.TerminateOptions) { - if (!options || options?.clearStorage) { - await this.clear(); - } - await this.db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.TERMINATED }}, - snapshot_done = ${{ type: 'bool', value: false }} - WHERE - id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - } - - async getStatus(): Promise { - const syncRulesRow = await this.db.sql` - SELECT - snapshot_done, - last_checkpoint_lsn, - state - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['snapshot_done', 'last_checkpoint_lsn', 'state'])) - .first(); - - if (syncRulesRow == null) { - throw new Error('Cannot find sync rules status'); - } - - return { - snapshot_done: syncRulesRow.snapshot_done, - active: syncRulesRow.state == storage.SyncRuleState.ACTIVE, - checkpoint_lsn: syncRulesRow.last_checkpoint_lsn ?? null - }; - } - - async clear(): Promise { - await this.db.sql` - UPDATE sync_rules - SET - snapshot_done = FALSE, - last_checkpoint_lsn = NULL, - last_checkpoint = NULL, - no_checkpoint_before = NULL - WHERE - id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - - await this.db.sql` - DELETE FROM bucket_data - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - - await this.db.sql` - DELETE FROM bucket_parameters - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - - await this.db.sql` - DELETE FROM current_data - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - - await this.db.sql` - DELETE FROM source_tables - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - } - - async autoActivate(): Promise { - await this.db.transaction(async (db) => { - const syncRulesRow = await db.sql` - SELECT - state - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['state'])) - .first(); - - if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} - WHERE - id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - } - - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} - WHERE - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} - AND id != ${{ type: 'int4', value: this.group_id }} - `.execute(); - }); - } - - private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { - if (batch.length == 0) { - return new Map(); - } - - const rangedBatch = batch.map((b) => ({ - ...b, - start: b.start ?? 0 - })); - - const results = await this.db.sql` - WITH - filter_data AS ( - SELECT - FILTER ->> 'bucket' AS bucket_name, - (FILTER ->> 'start')::BIGINT AS start_op_id, - (FILTER ->> 'end')::BIGINT AS end_op_id - FROM - jsonb_array_elements(${{ type: 'jsonb', value: rangedBatch }}::jsonb) AS FILTER - ) - SELECT - b.bucket_name AS bucket, - SUM(b.checksum) AS checksum_total, - COUNT(*) AS total, - MAX( - CASE - WHEN b.op = 'CLEAR' THEN 1 - ELSE 0 - END - ) AS has_clear_op - FROM - bucket_data b - JOIN filter_data f ON b.bucket_name = f.bucket_name - AND b.op_id > f.start_op_id - AND b.op_id <= f.end_op_id - WHERE - b.group_id = ${{ type: 'int4', value: this.group_id }} - GROUP BY - b.bucket_name; - `.rows<{ bucket: string; checksum_total: bigint; total: bigint; has_clear_op: number }>(); - - return new Map( - results.map((doc) => { - return [ - doc.bucket, - { - bucket: doc.bucket, - partialCount: Number(doc.total), - partialChecksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff, - isFullChecksum: doc.has_clear_op == 1 - } satisfies storage.PartialChecksum - ]; - }) - ); - } -} +import * as lib_postgres from '@powersync/lib-service-postgres'; +import { DisposableObserver, ReplicationAssertionError } from '@powersync/lib-services-framework'; +import { storage, utils } from '@powersync/service-core'; +import { JSONBig } from '@powersync/service-jsonbig'; +import * as sync_rules from '@powersync/service-sync-rules'; +import * as uuid from 'uuid'; +import { BIGINT_MAX } from '../types/codecs.js'; +import { models, RequiredOperationBatchLimits } from '../types/types.js'; +import { replicaIdToSubkey } from '../utils/bson.js'; +import { mapOpEntry } from '../utils/bucket-data.js'; + +import { StatementParam } from '@powersync/service-jpgwire'; +import { StoredRelationId } from '../types/models/SourceTable.js'; +import { pick } from '../utils/ts-codec.js'; +import { PostgresBucketBatch } from './batch/PostgresBucketBatch.js'; +import { PostgresWriteCheckpointAPI } from './checkpoints/PostgresWriteCheckpointAPI.js'; +import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; +import { PostgresCompactor } from './PostgresCompactor.js'; + +export type PostgresSyncRulesStorageOptions = { + factory: PostgresBucketStorageFactory; + db: lib_postgres.DatabaseClient; + sync_rules: storage.PersistedSyncRulesContent; + write_checkpoint_mode?: storage.WriteCheckpointMode; + batchLimits: RequiredOperationBatchLimits; +}; + +export class PostgresSyncRulesStorage + extends DisposableObserver + implements storage.SyncRulesBucketStorage +{ + public readonly group_id: number; + public readonly sync_rules: storage.PersistedSyncRulesContent; + public readonly slot_name: string; + public readonly factory: PostgresBucketStorageFactory; + + protected db: lib_postgres.DatabaseClient; + protected writeCheckpointAPI: PostgresWriteCheckpointAPI; + + // TODO we might be able to share this in an abstract class + private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; + private checksumCache = new storage.ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); + + constructor(protected options: PostgresSyncRulesStorageOptions) { + super(); + this.group_id = options.sync_rules.id; + this.db = options.db; + this.sync_rules = options.sync_rules; + this.slot_name = options.sync_rules.slot_name; + this.factory = options.factory; + + this.writeCheckpointAPI = new PostgresWriteCheckpointAPI({ + db: this.db, + mode: options.write_checkpoint_mode ?? storage.WriteCheckpointMode.MANAGED + }); + } + + get writeCheckpointMode(): storage.WriteCheckpointMode { + return this.writeCheckpointAPI.writeCheckpointMode; + } + + // TODO we might be able to share this in an abstract class + getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.SqlSyncRules { + const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {}; + /** + * Check if the cached sync rules, if present, had the same options. + * Parse sync rules if the options are different or if there is no cached value. + */ + if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) { + this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options }; + } + + return this.parsedSyncRulesCache!.parsed; + } + + async reportError(e: any): Promise { + const message = String(e.message ?? 'Replication failure'); + await this.db.sql` + UPDATE sync_rules + SET + last_fatal_error = ${{ type: 'varchar', value: message }} + WHERE + id = ${{ type: 'int4', value: this.group_id }}; + `.execute(); + } + + compact(options?: storage.CompactOptions): Promise { + return new PostgresCompactor(this.db, this.group_id, options).compact(); + } + + batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { + return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( + checkpoints.map((c) => ({ ...c, sync_rules_id: this.group_id })) + ); + } + + createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { + return this.writeCheckpointAPI.createCustomWriteCheckpoint({ + ...checkpoint, + sync_rules_id: this.group_id + }); + } + + lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { + return this.writeCheckpointAPI.lastWriteCheckpoint({ + ...filters, + sync_rules_id: this.group_id + }); + } + + setWriteCheckpointMode(mode: storage.WriteCheckpointMode): void { + return this.writeCheckpointAPI.setWriteCheckpointMode(mode); + } + + createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { + return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); + } + + async getCheckpoint(): Promise { + const checkpointRow = await this.db.sql` + SELECT + last_checkpoint, + last_checkpoint_lsn + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['last_checkpoint', 'last_checkpoint_lsn'])) + .first(); + + return { + checkpoint: utils.timestampToOpId(checkpointRow?.last_checkpoint ?? 0n), + lsn: checkpointRow?.last_checkpoint_lsn ?? null + }; + } + + async resolveTable(options: storage.ResolveTableOptions): Promise { + const { group_id, connection_id, connection_tag, entity_descriptor } = options; + + const { schema, name: table, objectId, replicationColumns } = entity_descriptor; + + const columns = replicationColumns.map((column) => ({ + name: column.name, + type: column.type, + // The PGWire returns this as a BigInt. We want to store this as JSONB + type_oid: typeof column.typeId !== 'undefined' ? Number(column.typeId) : column.typeId + })); + return this.db.transaction(async (db) => { + let sourceTableRow = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} + AND schema_name = ${{ type: 'varchar', value: schema }} + AND table_name = ${{ type: 'varchar', value: table }} + AND replica_id_columns = ${{ type: 'jsonb', value: columns }} + ` + .decoded(models.SourceTable) + .first(); + + if (sourceTableRow == null) { + const row = await db.sql` + INSERT INTO + source_tables ( + id, + group_id, + connection_id, + relation_id, + schema_name, + table_name, + replica_id_columns + ) + VALUES + ( + ${{ type: 'varchar', value: uuid.v4() }}, + ${{ type: 'int4', value: group_id }}, + ${{ type: 'int4', value: connection_id }}, + --- The objectId can be string | number, we store it as jsonb value + ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }}, + ${{ type: 'varchar', value: schema }}, + ${{ type: 'varchar', value: table }}, + ${{ type: 'jsonb', value: columns }} + ) + RETURNING + * + ` + .decoded(models.SourceTable) + .first(); + sourceTableRow = row; + } + + const sourceTable = new storage.SourceTable( + sourceTableRow!.id, + connection_tag, + objectId, + schema, + table, + replicationColumns, + sourceTableRow!.snapshot_done ?? true + ); + sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable); + sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); + sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable); + + const truncatedTables = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND id != ${{ type: 'varchar', value: sourceTableRow!.id }} + AND ( + relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} + OR ( + schema_name = ${{ type: 'varchar', value: schema }} + AND table_name = ${{ type: 'varchar', value: table }} + ) + ) + ` + .decoded(models.SourceTable) + .rows(); + + return { + table: sourceTable, + dropTables: truncatedTables.map( + (doc) => + new storage.SourceTable( + doc.id, + connection_tag, + doc.relation_id?.object_id ?? 0, + doc.schema_name, + doc.table_name, + doc.replica_id_columns?.map((c) => ({ + name: c.name, + typeOid: c.typeId, + type: c.type + })) ?? [], + doc.snapshot_done ?? true + ) + ) + }; + }); + } + + async startBatch( + options: storage.StartBatchOptions, + callback: (batch: storage.BucketStorageBatch) => Promise + ): Promise { + const syncRules = await this.db.sql` + SELECT + last_checkpoint_lsn, + no_checkpoint_before, + keepalive_op + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op'])) + .first(); + + const checkpoint_lsn = syncRules?.last_checkpoint_lsn ?? null; + + await using batch = new PostgresBucketBatch({ + db: this.db, + sync_rules: this.sync_rules.parsed(options).sync_rules, + group_id: this.group_id, + slot_name: this.slot_name, + last_checkpoint_lsn: checkpoint_lsn, + keep_alive_op: syncRules?.keepalive_op, + no_checkpoint_before_lsn: syncRules?.no_checkpoint_before ?? options.zeroLSN, + store_current_data: options.storeCurrentData, + skip_existing_rows: options.skipExistingRows ?? false, + batch_limits: this.options.batchLimits + }); + this.iterateListeners((cb) => cb.batchStarted?.(batch)); + + await callback(batch); + await batch.flush(); + if (batch.last_flushed_op) { + return { flushed_op: String(batch.last_flushed_op) }; + } else { + return null; + } + } + + async getParameterSets( + checkpoint: utils.OpId, + lookups: sync_rules.SqliteJsonValue[][] + ): Promise { + const rows = await this.db.sql` + SELECT DISTINCT + ON (lookup, source_table, source_key) lookup, + source_table, + source_key, + id, + bucket_parameters + FROM + bucket_parameters + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + AND lookup = ANY ( + SELECT + decode((FILTER ->> 0)::text, 'hex') -- Decode the hex string to bytea + FROM + jsonb_array_elements(${{ + type: 'jsonb', + value: lookups.map((l) => storage.serializeLookupBuffer(l).toString('hex')) + }}) AS FILTER + ) + AND id <= ${{ type: 'int8', value: BigInt(checkpoint) }} + ORDER BY + lookup, + source_table, + source_key, + id DESC + ` + .decoded(pick(models.BucketParameters, ['bucket_parameters'])) + .rows(); + + const groupedParameters = rows.map((row) => { + return JSONBig.parse(row.bucket_parameters) as sync_rules.SqliteJsonRow; + }); + return groupedParameters.flat(); + } + + async *getBucketDataBatch( + checkpoint: utils.OpId, + dataBuckets: Map, + options?: storage.BucketDataBatchOptions + ): AsyncIterable { + if (dataBuckets.size == 0) { + return; + } + + const end = checkpoint ?? BIGINT_MAX; + const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({ + bucket_name: name, + start: start + })); + + const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; + const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; + + let batchSize = 0; + let currentBatch: utils.SyncBucketData | null = null; + let targetOp: bigint | null = null; + let rowCount = 0; + + /** + * It is possible to perform this query with JSONB join. e.g. + * ```sql + * WITH + * filter_data AS ( + * SELECT + * FILTER ->> 'bucket_name' AS bucket_name, + * (FILTER ->> 'start')::BIGINT AS start_op_id + * FROM + * jsonb_array_elements($1::jsonb) AS FILTER + * ) + * SELECT + * b.*, + * octet_length(b.data) AS data_size + * FROM + * bucket_data b + * JOIN filter_data f ON b.bucket_name = f.bucket_name + * AND b.op_id > f.start_op_id + * AND b.op_id <= $2 + * WHERE + * b.group_id = $3 + * ORDER BY + * b.bucket_name ASC, + * b.op_id ASC + * LIMIT + * $4; + * ``` + * Which might be better for large volumes of buckets, but in testing the JSON method + * was significantly slower than the method below. Syncing 2.5 million rows in a single + * bucket takes 2 minutes and 11 seconds with the method below. With the JSON method + * 1 million rows were only synced before a 5 minute timeout. + */ + for await (const rows of this.db.streamRows({ + statement: ` + SELECT + * + FROM + bucket_data + WHERE + group_id = $1 + and op_id <= $2 + and ( + ${filters.map((f, index) => `(bucket_name = $${index * 2 + 4} and op_id > $${index * 2 + 5})`).join(' OR ')} + ) + ORDER BY + bucket_name ASC, + op_id ASC + LIMIT + $3;`, + params: [ + { type: 'int4', value: this.group_id }, + { type: 'int8', value: end }, + { type: 'int4', value: rowLimit + 1 }, + ...filters.flatMap((f) => [ + { type: 'varchar' as const, value: f.bucket_name }, + { type: 'int8' as const, value: f.start } satisfies StatementParam + ]) + ] + })) { + const decodedRows = rows.map((r) => models.BucketData.decode(r as any)); + + for (const row of decodedRows) { + const { bucket_name } = row; + const rowSize = row.data ? row.data.length : 0; + + if ( + currentBatch == null || + currentBatch.bucket != bucket_name || + batchSize >= sizeLimit || + (currentBatch?.data.length && batchSize + rowSize > sizeLimit) || + currentBatch.data.length >= rowLimit + ) { + let start: string | undefined = undefined; + if (currentBatch != null) { + if (currentBatch.bucket == bucket_name) { + currentBatch.has_more = true; + } + + const yieldBatch = currentBatch; + start = currentBatch.after; + currentBatch = null; + batchSize = 0; + yield { batch: yieldBatch, targetOp: targetOp }; + targetOp = null; + if (rowCount >= rowLimit) { + // We've yielded all the requested rows + break; + } + } + + start ??= dataBuckets.get(bucket_name); + if (start == null) { + throw new ReplicationAssertionError(`data for unexpected bucket: ${bucket_name}`); + } + currentBatch = { + bucket: bucket_name, + after: start, + has_more: false, + data: [], + next_after: start + }; + targetOp = null; + } + + const entry = mapOpEntry(row); + + if (row.source_table && row.source_key) { + entry.subkey = replicaIdToSubkey(row.source_table, storage.deserializeReplicaId(row.source_key)); + } + + if (row.target_op != null) { + // MOVE, CLEAR + const rowTargetOp = row.target_op; + if (targetOp == null || rowTargetOp > targetOp) { + targetOp = rowTargetOp; + } + } + + currentBatch.data.push(entry); + currentBatch.next_after = entry.op_id; + + batchSize += rowSize; + + // Manually track the total rows yielded + rowCount++; + } + } + + if (currentBatch != null) { + const yieldBatch = currentBatch; + currentBatch = null; + yield { batch: yieldBatch, targetOp: targetOp }; + targetOp = null; + } + } + + async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise { + return this.checksumCache.getChecksumMap(checkpoint, buckets); + } + + async terminate(options?: storage.TerminateOptions) { + if (!options || options?.clearStorage) { + await this.clear(); + } + await this.db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.TERMINATED }}, + snapshot_done = ${{ type: 'bool', value: false }} + WHERE + id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + } + + async getStatus(): Promise { + const syncRulesRow = await this.db.sql` + SELECT + snapshot_done, + last_checkpoint_lsn, + state + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['snapshot_done', 'last_checkpoint_lsn', 'state'])) + .first(); + + if (syncRulesRow == null) { + throw new Error('Cannot find sync rules status'); + } + + return { + snapshot_done: syncRulesRow.snapshot_done, + active: syncRulesRow.state == storage.SyncRuleState.ACTIVE, + checkpoint_lsn: syncRulesRow.last_checkpoint_lsn ?? null + }; + } + + async clear(): Promise { + await this.db.sql` + UPDATE sync_rules + SET + snapshot_done = FALSE, + last_checkpoint_lsn = NULL, + last_checkpoint = NULL, + no_checkpoint_before = NULL + WHERE + id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + + await this.db.sql` + DELETE FROM bucket_data + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + + await this.db.sql` + DELETE FROM bucket_parameters + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + + await this.db.sql` + DELETE FROM current_data + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + + await this.db.sql` + DELETE FROM source_tables + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + } + + async autoActivate(): Promise { + await this.db.transaction(async (db) => { + const syncRulesRow = await db.sql` + SELECT + state + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['state'])) + .first(); + + if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + WHERE + id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + } + + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} + WHERE + state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + AND id != ${{ type: 'int4', value: this.group_id }} + `.execute(); + }); + } + + private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { + if (batch.length == 0) { + return new Map(); + } + + const rangedBatch = batch.map((b) => ({ + ...b, + start: b.start ?? 0 + })); + + const results = await this.db.sql` + WITH + filter_data AS ( + SELECT + FILTER ->> 'bucket' AS bucket_name, + (FILTER ->> 'start')::BIGINT AS start_op_id, + (FILTER ->> 'end')::BIGINT AS end_op_id + FROM + jsonb_array_elements(${{ type: 'jsonb', value: rangedBatch }}::jsonb) AS FILTER + ) + SELECT + b.bucket_name AS bucket, + SUM(b.checksum) AS checksum_total, + COUNT(*) AS total, + MAX( + CASE + WHEN b.op = 'CLEAR' THEN 1 + ELSE 0 + END + ) AS has_clear_op + FROM + bucket_data b + JOIN filter_data f ON b.bucket_name = f.bucket_name + AND b.op_id > f.start_op_id + AND b.op_id <= f.end_op_id + WHERE + b.group_id = ${{ type: 'int4', value: this.group_id }} + GROUP BY + b.bucket_name; + `.rows<{ bucket: string; checksum_total: bigint; total: bigint; has_clear_op: number }>(); + + return new Map( + results.map((doc) => { + return [ + doc.bucket, + { + bucket: doc.bucket, + partialCount: Number(doc.total), + partialChecksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff, + isFullChecksum: doc.has_clear_op == 1 + } satisfies storage.PartialChecksum + ]; + }) + ); + } +} diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index d7596b2bc..1ee75697a 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -1,5 +1,14 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; -import { container, DisposableObserver, errors, logger } from '@powersync/lib-services-framework'; +import { + container, + DisposableObserver, + ErrorCode, + errors, + logger, + ReplicationAssertionError, + ServiceAssertionError, + ServiceError +} from '@powersync/lib-services-framework'; import { storage, utils } from '@powersync/service-core'; import * as sync_rules from '@powersync/service-sync-rules'; import * as timers from 'timers/promises'; @@ -258,7 +267,7 @@ export class PostgresBucketBatch this.batch = resumeBatch; if (lastOp == null) { - throw new Error('Unexpected last_op == null'); + throw new ServiceAssertionError('Unexpected last_op == null'); } this.persisted_op = lastOp; @@ -631,7 +640,7 @@ export class PostgresBucketBatch return null; } } else { - throw new Error(`${record.tag} not supported with skipExistingRows: true`); + throw new ReplicationAssertionError(`${record.tag} not supported with skipExistingRows: true`); } } @@ -680,7 +689,7 @@ export class PostgresBucketBatch try { afterData = storage.serializeBson(after); if (afterData!.byteLength > MAX_ROW_SIZE) { - throw new Error(`Row too large: ${afterData?.byteLength}`); + throw new ServiceError(ErrorCode.PSYNC_S1002, `Row too large: ${afterData?.byteLength}`); } } catch (e) { // Replace with empty values, equivalent to TOAST values diff --git a/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts b/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts index c3cf5da6e..9bef5a000 100644 --- a/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts +++ b/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts @@ -1,5 +1,5 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; -import { logger } from '@powersync/lib-services-framework'; +import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; import { storage } from '@powersync/service-core'; import { SqlSyncRules } from '@powersync/service-sync-rules'; @@ -44,7 +44,10 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR }); const lockHandle = await manager.acquire(); if (!lockHandle) { - throw new Error(`Sync rules: ${this.id} have been locked by another process for replication.`); + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${this.id} have been locked by another process for replication.` + ); } const interval = setInterval(async () => { diff --git a/modules/module-postgres-storage/src/types/codecs.ts b/modules/module-postgres-storage/src/types/codecs.ts index 9f0824ee1..7d0c5b586 100644 --- a/modules/module-postgres-storage/src/types/codecs.ts +++ b/modules/module-postgres-storage/src/types/codecs.ts @@ -1,3 +1,4 @@ +import { ReplicationAssertionError } from '@powersync/lib-services-framework'; import * as t from 'ts-codec'; export const BIGINT_MAX = BigInt('9223372036854775807'); @@ -98,7 +99,7 @@ export const hexBuffer = t.codec( return Buffer.from(encoded); } if (typeof encoded !== 'string') { - throw new Error(`Expected either a Buffer instance or hex encoded buffer string`); + throw new ReplicationAssertionError(`Expected either a Buffer instance or hex encoded buffer string`); } return Buffer.from(encoded, 'hex'); } @@ -116,7 +117,7 @@ export const pgwire_number = t.codec( return encoded; } if (typeof encoded !== 'bigint') { - throw new Error(`Expected either number or bigint for value`); + throw new ReplicationAssertionError(`Expected either number or bigint for value`); } if (encoded > BigInt(Number.MAX_SAFE_INTEGER) || encoded < BigInt(Number.MIN_SAFE_INTEGER)) { throw new RangeError('BigInt value is out of safe integer range for conversion to Number.'); diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 2600657e1..fed6ce8c8 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -1,4 +1,5 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; +import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; import { api, ParseSyncRulesOptions } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; @@ -228,7 +229,11 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, return Number(row.lsn_distance); } - throw new Error(`Could not determine replication lag for slot ${slotName}`); + throw new ServiceError({ + status: 500, + code: ErrorCode.PSYNC_S4001, + description: `Could not determine replication lag for slot ${slotName}` + }); } async getReplicationHead(): Promise { diff --git a/modules/module-postgres/src/module/PostgresModule.ts b/modules/module-postgres/src/module/PostgresModule.ts index 734b23c09..d9b7ff470 100644 --- a/modules/module-postgres/src/module/PostgresModule.ts +++ b/modules/module-postgres/src/module/PostgresModule.ts @@ -1,4 +1,12 @@ -import { api, auth, ConfigurationFileSyncRulesProvider, modules, replication, system } from '@powersync/service-core'; +import { + api, + auth, + ConfigurationFileSyncRulesProvider, + ConnectionTestResult, + modules, + replication, + system +} from '@powersync/service-core'; import * as jpgwire from '@powersync/service-jpgwire'; import { PostgresRouteAPIAdapter } from '../api/PostgresRouteAPIAdapter.js'; import { SupabaseKeyCollector } from '../auth/SupabaseKeyCollector.js'; @@ -10,6 +18,7 @@ import { PUBLICATION_NAME } from '../replication/WalStream.js'; import { WalStreamReplicator } from '../replication/WalStreamReplicator.js'; import * as types from '../types/types.js'; import { PostgresConnectionConfig } from '../types/types.js'; +import { baseUri, NormalizedBasePostgresConnectionConfig } from '@powersync/lib-service-postgres'; export class PostgresModule extends replication.ReplicationModule { constructor() { @@ -124,18 +133,26 @@ export class PostgresModule extends replication.ReplicationModule { + async testConnection(config: PostgresConnectionConfig): Promise { this.decodeConfig(config); - const normalisedConfig = this.resolveConfig(this.decodedConfig!); - const connectionManager = new PgManager(normalisedConfig, { + const normalizedConfig = this.resolveConfig(this.decodedConfig!); + return await this.testConnection(normalizedConfig); + } + + static async testConnection(normalizedConfig: NormalizedBasePostgresConnectionConfig): Promise { + // FIXME: This is not a complete implementation yet. + const connectionManager = new PgManager(normalizedConfig, { idleTimeout: 30_000, maxSize: 1 }); const connection = await connectionManager.snapshotConnection(); try { - return await checkSourceConfiguration(connection, PUBLICATION_NAME); + await checkSourceConfiguration(connection, PUBLICATION_NAME); } finally { await connectionManager.end(); } + return { + connectionDescription: baseUri(normalizedConfig) + }; } } diff --git a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts index 0c46b9f24..6a5e4d029 100644 --- a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts +++ b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts @@ -5,7 +5,7 @@ import { logger } from '@powersync/lib-services-framework'; export class ConnectionManagerFactory { private readonly connectionManagers: PgManager[]; - private readonly dbConnectionConfig: NormalizedPostgresConnectionConfig; + public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig; constructor(dbConnectionConfig: NormalizedPostgresConnectionConfig) { this.dbConnectionConfig = dbConnectionConfig; diff --git a/modules/module-postgres/src/replication/PgRelation.ts b/modules/module-postgres/src/replication/PgRelation.ts index f6d9ac900..08cb87c75 100644 --- a/modules/module-postgres/src/replication/PgRelation.ts +++ b/modules/module-postgres/src/replication/PgRelation.ts @@ -1,3 +1,4 @@ +import { ReplicationAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { storage } from '@powersync/service-core'; import { PgoutputRelation } from '@powersync/service-jpgwire'; @@ -16,7 +17,7 @@ export function getRelId(source: PgoutputRelation): number { // Source types are wrong here const relId = (source as any).relationOid as number; if (!relId) { - throw new Error(`No relation id!`); + throw new ReplicationAssertionError(`No relation id found`); } return relId; } diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 92b8a2031..4e26aca31 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,5 +1,11 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; -import { container, errors, logger } from '@powersync/lib-services-framework'; +import { + container, + errors, + logger, + ReplicationAbortedError, + ReplicationAssertionError +} from '@powersync/lib-services-framework'; import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; @@ -133,7 +139,7 @@ export class WalStream { for (let row of tableRows) { const name = row.table_name as string; if (typeof row.relid != 'bigint') { - throw new Error(`missing relid for ${name}`); + throw new ReplicationAssertionError(`Missing relid for ${name}`); } const relid = Number(row.relid as bigint); @@ -294,7 +300,7 @@ export class WalStream { } } - throw new Error('Unreachable'); + throw new ReplicationAssertionError('Unreachable'); } async estimatedCount(db: pgwire.PgConnection, table: storage.SourceTable): Promise { @@ -415,7 +421,7 @@ WHERE oid = $1::regclass`, lastLogIndex = at; } if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication of ${this.slot_name}`); + throw new ReplicationAbortedError(`Aborted initial replication of ${this.slot_name}`); } for (const record of WalStream.getQueryData(rows)) { @@ -441,7 +447,7 @@ WHERE oid = $1::regclass`, async handleRelation(batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, snapshot: boolean) { if (!descriptor.objectId && typeof descriptor.objectId != 'number') { - throw new Error('objectId expected'); + throw new ReplicationAssertionError(`objectId expected, got ${typeof descriptor.objectId}`); } const result = await this.storage.resolveTable({ group_id: this.group_id, @@ -484,6 +490,7 @@ WHERE oid = $1::regclass`, await db.query('COMMIT'); } catch (e) { await db.query('ROLLBACK'); + // TODO: Wrap with custom error type throw e; } } finally { @@ -501,7 +508,7 @@ WHERE oid = $1::regclass`, if (table == null) { // We should always receive a replication message before the relation is used. // If we can't find it, it's a bug. - throw new Error(`Missing relation cache for ${relationId}`); + throw new ReplicationAssertionError(`Missing relation cache for ${relationId}`); } return table; } diff --git a/modules/module-postgres/src/replication/WalStreamReplicator.ts b/modules/module-postgres/src/replication/WalStreamReplicator.ts index 14a21725e..7831ffec6 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicator.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicator.ts @@ -2,6 +2,7 @@ import { replication, storage } from '@powersync/service-core'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; import { cleanUpReplicationSlot } from './replication-utils.js'; import { WalStreamReplicationJob } from './WalStreamReplicationJob.js'; +import { PostgresModule } from '../module/PostgresModule.js'; export interface WalStreamReplicatorOptions extends replication.AbstractReplicatorOptions { connectionFactory: ConnectionManagerFactory; @@ -42,4 +43,8 @@ export class WalStreamReplicator extends replication.AbstractReplicator Promise; @@ -14,7 +15,7 @@ export class Migrations { add(id: number, name: string, up: MigrationFunction) { if (this.migrations.length > 0 && this.migrations[this.migrations.length - 1].id >= id) { - throw new Error('Migration ids must be strictly incrementing'); + throw new ServiceAssertionError('Migration ids must be strictly incrementing'); } this.migrations.push({ id, up, name }); } diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index 6769b2a5b..25592b107 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -60,9 +60,9 @@ export class ReactiveSocketRouter { // wss.clients.size includes this connection, so we check for greater than // TODO: Share connection limit between this and http stream connections if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) { - const err = new errors.JourneyError({ + const err = new errors.ServiceError({ status: 429, - code: 'SERVER_BUSY', + code: errors.ErrorCode.PSYNC_S2304, description: `Maximum active concurrent connections limit has been reached` }); logger.warn(err); @@ -144,7 +144,7 @@ export async function handleReactiveStream( const route = params.endpoints.find((e) => e.path == path && e.type == RS_ENDPOINT_TYPE.STREAM); if (!route) { - return exitWithError(new errors.ResourceNotFound('route', `No route for ${path} is configured`)); + return exitWithError(new errors.RouteNotFound(path)); } const { handler, authorize, validator, decoder = params.payloadDecoder } = route; diff --git a/packages/rsocket-router/tests/src/requests.test.ts b/packages/rsocket-router/tests/src/requests.test.ts index 5b9b3e92d..8d13d881e 100644 --- a/packages/rsocket-router/tests/src/requests.test.ts +++ b/packages/rsocket-router/tests/src/requests.test.ts @@ -3,6 +3,7 @@ import { createMockObserver, createMockResponder } from './utils/mock-responder. import { handleReactiveStream } from '../../src/router/ReactiveSocketRouter.js'; import { deserialize, serialize } from 'bson'; import { RS_ENDPOINT_TYPE, ReactiveEndpoint, RequestMeta, SocketResponder } from '../../src/router/types.js'; +import { ErrorCode } from '@powersync/lib-services-framework'; /** * Mocks the process of handling reactive routes @@ -117,7 +118,7 @@ describe('Requests', () => { ); // Should be a validation error - expect(JSON.stringify(spy.mock.calls[0])).includes('AUTHORIZATION'); + expect(JSON.stringify(spy.mock.calls[0])).includes(ErrorCode.PSYNC_S2101); }); it('should get invalid route error', async () => { @@ -130,6 +131,6 @@ describe('Requests', () => { await handleRoute(path, [], responder); // Should be a validation error - expect(JSON.stringify(spy.mock.calls[0])).includes('No route'); + expect(JSON.stringify(spy.mock.calls[0])).includes(ErrorCode.PSYNC_S2002); }); }); diff --git a/packages/service-core/src/auth/RemoteJWKSCollector.ts b/packages/service-core/src/auth/RemoteJWKSCollector.ts index bcae78749..3e4186a2d 100644 --- a/packages/service-core/src/auth/RemoteJWKSCollector.ts +++ b/packages/service-core/src/auth/RemoteJWKSCollector.ts @@ -3,7 +3,13 @@ import * as https from 'https'; import * as jose from 'jose'; import fetch from 'node-fetch'; -import { LookupOptions, makeHostnameLookupFunction } from '@powersync/lib-services-framework'; +import { + ErrorCode, + LookupOptions, + makeHostnameLookupFunction, + ServiceAssertionError, + ServiceError +} from '@powersync/lib-services-framework'; import { KeyCollector, KeyResult } from './KeyCollector.js'; import { KeySpec } from './KeySpec.js'; @@ -24,14 +30,17 @@ export class RemoteJWKSCollector implements KeyCollector { ) { try { this.url = new URL(url); - } catch (e) { - throw new Error(`Invalid jwks_uri: ${url}`); + } catch (e: any) { + throw new ServiceError(ErrorCode.PSYNC_S3102, `Invalid jwks_uri: ${JSON.stringify(url)} Details: ${e.message}`); } // We do support http here for self-hosting use cases. // Management service restricts this to https for hosted versions. if (this.url.protocol != 'https:' && this.url.protocol != 'http:') { - throw new Error(`Only http(s) is supported for jwks_uri, got: ${url}`); + throw new ServiceError( + ErrorCode.PSYNC_S3103, + `Only http(s) is supported for jwks_uri, got: ${JSON.stringify(url)}` + ); } this.agent = this.resolveAgent(); @@ -96,6 +105,9 @@ export class RemoteJWKSCollector implements KeyCollector { /** * Agent that uses a custom lookup function. + * + * This will synchronously raise an error if the URL contains an IP in the reject list. + * For domain names resolving to a rejected IP, that will fail when making the request. */ resolveAgent(): http.Agent | https.Agent { const lookupOptions = this.options?.lookupOptions ?? { reject_ip_ranges: [] }; @@ -111,6 +123,7 @@ export class RemoteJWKSCollector implements KeyCollector { case 'https:': return new https.Agent(options); } - throw new Error('http or or https is required for protocol'); + // Already validated the URL before, so this is not expected + throw new ServiceAssertionError('http or or https is required for JWKS protocol'); } } diff --git a/packages/service-core/src/entry/cli-entry.ts b/packages/service-core/src/entry/cli-entry.ts index a9f693c77..b2e26df3f 100644 --- a/packages/service-core/src/entry/cli-entry.ts +++ b/packages/service-core/src/entry/cli-entry.ts @@ -6,6 +6,7 @@ import { registerCompactAction } from './commands/compact-action.js'; import { registerMigrationAction } from './commands/migrate-action.js'; import { registerStartAction } from './commands/start-action.js'; import { registerTearDownAction } from './commands/teardown-action.js'; +import { registerTestConnectionAction } from './commands/test-connection-action.js'; /** * Generates a Commander program which serves as the entry point @@ -20,6 +21,7 @@ export function generateEntryProgram(startHandlers?: Record { if (ack !== 'TEARDOWN') { - throw new Error('TEARDOWN was not acknowledged.'); + throw new ServiceError(ErrorCode.PSYNC_S0102, 'TEARDOWN was not acknowledged.'); } await teardown(extractRunnerOptions(options)); diff --git a/packages/service-core/src/entry/commands/test-connection-action.ts b/packages/service-core/src/entry/commands/test-connection-action.ts new file mode 100644 index 000000000..ac67f1a9d --- /dev/null +++ b/packages/service-core/src/entry/commands/test-connection-action.ts @@ -0,0 +1,41 @@ +import { Command } from 'commander'; + +import { container, logger } from '@powersync/lib-services-framework'; +import * as system from '../../system/system-index.js'; +import * as utils from '../../util/util-index.js'; + +import { modules, ReplicationEngine } from '../../index.js'; +import { extractRunnerOptions, wrapConfigCommand } from './config-command.js'; + +const COMMAND_NAME = 'test-connection'; + +export function registerTestConnectionAction(program: Command) { + const testConnectionCommand = program.command(COMMAND_NAME); + + wrapConfigCommand(testConnectionCommand); + + return testConnectionCommand.description('Test connection').action(async (options) => { + try { + const config = await utils.loadConfig(extractRunnerOptions(options)); + const serviceContext = new system.ServiceContextContainer(config); + + const replication = new ReplicationEngine(); + serviceContext.register(ReplicationEngine, replication); + + // Register modules in order to load the correct config + const moduleManager = container.getImplementation(modules.ModuleManager); + await moduleManager.initialize(serviceContext); + + // Start the storage engine in order to create the appropriate BucketStorage + await serviceContext.lifeCycleEngine.start(); + + logger.info('Testing connection...'); + const results = await replication.testConnection(); + logger.info(`Connection succeeded to ${results.map((r) => r.connectionDescription).join(', ')}`); + process.exit(0); + } catch (e) { + logger.error(`Connection failed: ${e.message}`); + process.exit(1); + } + }); +} diff --git a/packages/service-core/src/metrics/Metrics.ts b/packages/service-core/src/metrics/Metrics.ts index 98888f0db..7a1682160 100644 --- a/packages/service-core/src/metrics/Metrics.ts +++ b/packages/service-core/src/metrics/Metrics.ts @@ -3,7 +3,7 @@ import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http'; import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'; import { Resource } from '@opentelemetry/resources'; import { MeterProvider, MetricReader, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; -import { logger } from '@powersync/lib-services-framework'; +import { logger, ServiceAssertionError } from '@powersync/lib-services-framework'; import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; @@ -132,7 +132,7 @@ export class Metrics { public static getInstance(): Metrics { if (!Metrics.instance) { - throw new Error('Metrics have not been initialised'); + throw new ServiceAssertionError('Metrics have not been initialized'); } return Metrics.instance; diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index fcc3fa0ec..0b12d9983 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -6,6 +6,7 @@ import { StorageEngine } from '../storage/storage-index.js'; import { SyncRulesProvider } from '../util/config/sync-rules/sync-rules-provider.js'; import { AbstractReplicationJob } from './AbstractReplicationJob.js'; import { ErrorRateLimiter } from './ErrorRateLimiter.js'; +import { ConnectionTestResult } from './ReplicationModule.js'; // 5 minutes const PING_INTERVAL = 1_000_000_000n * 300n; @@ -225,4 +226,6 @@ export abstract class AbstractReplicator; } diff --git a/packages/service-core/src/replication/ReplicationEngine.ts b/packages/service-core/src/replication/ReplicationEngine.ts index 9b36f4a40..3ae7849d8 100644 --- a/packages/service-core/src/replication/ReplicationEngine.ts +++ b/packages/service-core/src/replication/ReplicationEngine.ts @@ -1,5 +1,6 @@ import { logger } from '@powersync/lib-services-framework'; import { AbstractReplicator } from './AbstractReplicator.js'; +import { ConnectionTestResult } from './ReplicationModule.js'; export class ReplicationEngine { private readonly replicators: Map = new Map(); @@ -40,4 +41,8 @@ export class ReplicationEngine { } logger.info('Successfully shut down Replication Engine.'); } + + public async testConnection(): Promise { + return await Promise.all([...this.replicators.values()].map((replicator) => replicator.testConnection())); + } } diff --git a/packages/service-core/src/replication/ReplicationModule.ts b/packages/service-core/src/replication/ReplicationModule.ts index 5b5bca8de..410c022ab 100644 --- a/packages/service-core/src/replication/ReplicationModule.ts +++ b/packages/service-core/src/replication/ReplicationModule.ts @@ -1,13 +1,19 @@ import { DataSourceConfig } from '@powersync/service-types/dist/config/PowerSyncConfig.js'; import * as t from 'ts-codec'; +import { schema } from '@powersync/lib-services-framework'; import * as types from '@powersync/service-types'; import * as api from '../api/api-index.js'; import * as modules from '../modules/modules-index.js'; import * as system from '../system/system-index.js'; -import { schema } from '@powersync/lib-services-framework'; import { AbstractReplicator } from './AbstractReplicator.js'; -import { TearDownOptions } from '../modules/modules-index.js'; + +export interface ConnectionTestResult { + /** + * Connection URI or hostname. + */ + connectionDescription: string; +} /** * Provides a common interface for testing the connection to a DataSource. @@ -17,7 +23,7 @@ export interface ConnectionTester { * Confirm if a connection can be established to the datasource for the provided datasource configuration * @param config */ - testConnection(config: TConfig): Promise; + testConnection(config: TConfig): Promise; } export interface ReplicationModuleOptions extends modules.AbstractModuleOptions { @@ -58,7 +64,7 @@ export abstract class ReplicationModule */ protected abstract createReplicator(context: system.ServiceContext): AbstractReplicator; - public abstract testConnection(config: TConfig): Promise; + public abstract testConnection(config: TConfig): Promise; /** * Register this module's Replicators and RouteAPI adapters if the required configuration is present. @@ -81,16 +87,12 @@ export abstract class ReplicationModule ); } - try { - const baseMatchingConfig = matchingConfig[0] as TConfig; - // If decoding fails, log the error and continue, no replication will happen for this data source - this.decodeConfig(baseMatchingConfig); + const baseMatchingConfig = matchingConfig[0] as TConfig; + // If decoding fails, this will raise a hard error, and stop the service. + this.decodeConfig(baseMatchingConfig); - context.replicationEngine?.register(this.createReplicator(context)); - context.routerEngine?.registerAPI(this.createRouteAPIAdapter()); - } catch (e) { - this.logger.error('Failed to initialize.', e); - } + context.replicationEngine?.register(this.createReplicator(context)); + context.routerEngine?.registerAPI(this.createRouteAPIAdapter()); } protected decodeConfig(config: TConfig): void { diff --git a/packages/service-core/src/routes/endpoints/admin.ts b/packages/service-core/src/routes/endpoints/admin.ts index c08b1068d..27aa0a761 100644 --- a/packages/service-core/src/routes/endpoints/admin.ts +++ b/packages/service-core/src/routes/endpoints/admin.ts @@ -1,4 +1,4 @@ -import { errors, router, schema } from '@powersync/lib-services-framework'; +import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework'; import { SqlSyncRules, StaticSchema } from '@powersync/service-sync-rules'; import { internal_routes } from '@powersync/service-types'; @@ -120,9 +120,9 @@ export const reprocess = routeDefinition({ const active = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions()); if (active == null) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 422, - code: 'NO_SYNC_RULES', + code: ErrorCode.PSYNC_S4104, description: 'No active sync rules' }); } diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index cdcdb8cf4..eca069324 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -1,4 +1,4 @@ -import { errors, logger, schema } from '@powersync/lib-services-framework'; +import { ErrorCode, errors, logger, schema } from '@powersync/lib-services-framework'; import { RequestParameters } from '@powersync/service-sync-rules'; import { serialize } from 'bson'; @@ -34,9 +34,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => if (routerEngine!.closed) { responder.onError( - new errors.JourneyError({ + new errors.ServiceError({ status: 503, - code: 'SERVICE_UNAVAILABLE', + code: ErrorCode.PSYNC_S2003, description: 'Service temporarily unavailable' }) ); @@ -53,9 +53,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const cp = await activeBucketStorage.getActiveCheckpoint(); if (!cp.hasSyncRules()) { responder.onError( - new errors.JourneyError({ + new errors.ServiceError({ status: 500, - code: 'NO_SYNC_RULES', + code: ErrorCode.PSYNC_S2302, description: 'No sync rules available' }) ); diff --git a/packages/service-core/src/routes/endpoints/sync-rules.ts b/packages/service-core/src/routes/endpoints/sync-rules.ts index c902171cf..d76edc5ad 100644 --- a/packages/service-core/src/routes/endpoints/sync-rules.ts +++ b/packages/service-core/src/routes/endpoints/sync-rules.ts @@ -1,4 +1,4 @@ -import { errors, router, schema } from '@powersync/lib-services-framework'; +import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework'; import { SqlSyncRules, SyncRulesErrors } from '@powersync/service-sync-rules'; import type { FastifyPluginAsync } from 'fastify'; import * as t from 'ts-codec'; @@ -43,9 +43,9 @@ export const deploySyncRules = routeDefinition({ if (service_context.configuration.sync_rules.present) { // If sync rules are configured via the config, disable deploy via the API. - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 422, - code: 'API_DISABLED', + code: ErrorCode.PSYNC_S4105, description: 'Sync rules API disabled', details: 'Use the management API to deploy sync rules' }); @@ -60,9 +60,9 @@ export const deploySyncRules = routeDefinition({ schema: undefined }); } catch (e) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 422, - code: 'INVALID_SYNC_RULES', + code: ErrorCode.PSYNC_R0001, description: 'Sync rules parsing failed', details: e.message }); @@ -112,9 +112,9 @@ export const currentSyncRules = routeDefinition({ const sync_rules = await activeBucketStorage.getActiveSyncRulesContent(); if (!sync_rules) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 422, - code: 'NO_SYNC_RULES', + code: ErrorCode.PSYNC_S4104, description: 'No active sync rules' }); } @@ -159,9 +159,9 @@ export const reprocessSyncRules = routeDefinition({ const apiHandler = payload.context.service_context.routerEngine!.getAPI(); const sync_rules = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions()); if (sync_rules == null) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 422, - code: 'NO_SYNC_RULES', + code: ErrorCode.PSYNC_S4104, description: 'No active sync rules' }); } diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 83d8f3994..aaf84ae07 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -1,4 +1,4 @@ -import { errors, logger, router, schema } from '@powersync/lib-services-framework'; +import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework'; import { RequestParameters } from '@powersync/service-sync-rules'; import { Readable } from 'stream'; @@ -26,9 +26,9 @@ export const syncStreamed = routeDefinition({ const clientId = payload.params.client_id; if (routerEngine!.closed) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 503, - code: 'SERVICE_UNAVAILABLE', + code: ErrorCode.PSYNC_S2003, description: 'Service temporarily unavailable' }); } @@ -39,9 +39,9 @@ export const syncStreamed = routeDefinition({ // Sanity check before we start the stream const cp = await storageEngine.activeBucketStorage.getActiveCheckpoint(); if (!cp.hasSyncRules()) { - throw new errors.JourneyError({ + throw new errors.ServiceError({ status: 500, - code: 'NO_SYNC_RULES', + code: ErrorCode.PSYNC_S2302, description: 'No sync rules available' }); } diff --git a/packages/service-core/src/routes/route-register.ts b/packages/service-core/src/routes/route-register.ts index 8b798160b..9a220feac 100644 --- a/packages/service-core/src/routes/route-register.ts +++ b/packages/service-core/src/routes/route-register.ts @@ -62,16 +62,16 @@ export function registerFastifyRoutes( }); } } catch (ex) { - const journeyError = errors.JourneyError.isJourneyError(ex) ? ex : new errors.InternalServerError(ex); - logger.error(`Request failed`, journeyError); + const serviceError = errors.asServiceError(ex); + logger.error(`Request failed`, serviceError); response = new router.RouterResponse({ - status: journeyError.errorData.status || 500, + status: serviceError.errorData.status || 500, headers: { 'Content-Type': 'application/json' }, data: { - error: journeyError.errorData + error: serviceError.errorData } }); } diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 673ad2091..89be107dd 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -7,6 +7,7 @@ import { BucketChecksum, OpId, OplogEntry } from './protocol-types.js'; import * as storage from '../storage/storage-index.js'; import { PartialChecksum } from '../storage/ChecksumCache.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; export type ChecksumMap = Map; @@ -34,7 +35,7 @@ export function timestampToOpId(ts: bigint): OpId { // Dynamic values are passed in in some cases, so we make extra sure that the // number is a bigint and not number or Long. if (typeof ts != 'bigint') { - throw new Error(`bigint expected, got: ${ts} (${typeof ts})`); + throw new ServiceAssertionError(`bigint expected, got: ${ts} (${typeof ts})`); } return ts.toString(10); } diff --git a/packages/service-errors/LICENSE b/packages/service-errors/LICENSE new file mode 100644 index 000000000..c8efd46cc --- /dev/null +++ b/packages/service-errors/LICENSE @@ -0,0 +1,67 @@ +# Functional Source License, Version 1.1, Apache 2.0 Future License + +## Abbreviation + +FSL-1.1-Apache-2.0 + +## Notice + +Copyright 2023-2024 Journey Mobile, Inc. + +## Terms and Conditions + +### Licensor ("We") + +The party offering the Software under these Terms and Conditions. + +### The Software + +The "Software" is each version of the software that we make available under these Terms and Conditions, as indicated by our inclusion of these Terms and Conditions with the Software. + +### License Grant + +Subject to your compliance with this License Grant and the Patents, Redistribution and Trademark clauses below, we hereby grant you the right to use, copy, modify, create derivative works, publicly perform, publicly display and redistribute the Software for any Permitted Purpose identified below. + +### Permitted Purpose + +A Permitted Purpose is any purpose other than a Competing Use. A Competing Use means making the Software available to others in a commercial product or service that: + +1. substitutes for the Software; +2. substitutes for any other product or service we offer using the Software that exists as of the date we make the Software available; or +3. offers the same or substantially similar functionality as the Software. + +Permitted Purposes specifically include using the Software: + +1. for your internal use and access; +2. for non-commercial education; +3. for non-commercial research; and +4. in connection with professional services that you provide to a licensee using the Software in accordance with these Terms and Conditions. + +### Patents + +To the extent your use for a Permitted Purpose would necessarily infringe our patents, the license grant above includes a license under our patents. If you make a claim against any party that the Software infringes or contributes to the infringement of any patent, then your patent license to the Software ends immediately. + +### Redistribution + +The Terms and Conditions apply to all copies, modifications and derivatives of the Software. +If you redistribute any copies, modifications or derivatives of the Software, you must include a copy of or a link to these Terms and Conditions and not remove any copyright notices provided in or with the Software. + +### Disclaimer + +THE SOFTWARE IS PROVIDED "AS IS" AND WITHOUT WARRANTIES OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABILITY, TITLE OR NON-INFRINGEMENT. +IN NO EVENT WILL WE HAVE ANY LIABILITY TO YOU ARISING OUT OF OR RELATED TO THE SOFTWARE, INCLUDING INDIRECT, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES, EVEN IF WE HAVE BEEN INFORMED OF THEIR POSSIBILITY IN ADVANCE. + +### Trademarks + +Except for displaying the License Details and identifying us as the origin of the Software, you have no right under these Terms and Conditions to use our trademarks, trade names, service marks or product names. + +## Grant of Future License + +We hereby irrevocably grant you an additional license to use the Software under the Apache License, Version 2.0 that is effective on the second anniversary of the date we make the Software available. On or after that date, you may use the Software under the Apache License, Version 2.0, in which case the following will apply: + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/packages/service-errors/README.md b/packages/service-errors/README.md new file mode 100644 index 000000000..f9690e0cb --- /dev/null +++ b/packages/service-errors/README.md @@ -0,0 +1,3 @@ +# @powersync/service-errors + +Defines error codes used across the service. diff --git a/packages/service-errors/generate-docs.mjs b/packages/service-errors/generate-docs.mjs new file mode 100644 index 000000000..9f1630a2f --- /dev/null +++ b/packages/service-errors/generate-docs.mjs @@ -0,0 +1,120 @@ +#!/usr/bin/env node + +/** + * Usage: + * node generate-docs.mjs > docs.md + */ + +import * as fs from 'node:fs'; +import * as ts from 'typescript'; + +/** + * Parse and generate Markdown docs for the ErrorCode enum in a given file. + * - Groups error codes by preceding `//` section comments. + * - Extracts JSDoc comments for each enum member. + */ +function generateMarkdownDocs(filePath) { + // 1. Read the file content + const sourceText = fs.readFileSync(filePath, 'utf-8'); + + // 2. Create a SourceFile AST + const sourceFile = ts.createSourceFile(filePath, sourceText, ts.ScriptTarget.ESNext, /*setParentNodes*/ true); + + // We will store data as follows: + // { + // "[section title]": [ + // { name: "PSYNC_R0001", doc: "The doc comment..." }, + // { name: "PSYNC_R2200", doc: "" } + // ], + // ... + // } + // + + let mdOutput = '# ErrorCode Documentation\n\n'; + + // 3. Recursively walk the AST looking for: + // - Single-line comment sections (// Some Section). + // - The "ErrorCode" enum declaration and its members. + function visit(node) { + // If this is an enum named "ErrorCode", record each member + if (ts.isEnumDeclaration(node) && node.name.text === 'ErrorCode') { + for (const member of node.members) { + // Name of the enum member (e.g. "PSYNC_R0001") + const name = member.name.getText(sourceFile); + + // Capture single-line leading comments so that each new // comment sets a "currentSection" + const commentRanges = ts.getLeadingCommentRanges(sourceText, member.getFullStart()); + if (commentRanges) { + for (const range of commentRanges) { + // slice out the raw comment text + const commentText = sourceText.slice(range.pos, range.end); + if (commentText.trimStart().startsWith('//')) { + // remove the leading slashes and trim + const cleaned = commentText.replace(/^\/\/+/, '').trim(); + mdOutput += `${cleaned}\n`; + mdOutput += `\n`; + } + } + } + + // 4. Extract JSDoc for the member (if present) + // + // There are a few ways to do this; one approach is to read the JSDoc comments + // in `ts.getJSDocTags(member)`, or use `ts.getJSDocCommentsAndTags(node)`. + // + // For simplicity, we'll just look at the JSDoc comment text as a single + // block. If we need the specific @tags, we could parse that out too. + let docComment = ''; + const jsDocNodes = ts.getJSDocCommentsAndTags(member); + if (jsDocNodes.length > 0) { + // Each node might have a comment portion + // We'll just join them all as one string + docComment = jsDocNodes + .map((docNode) => { + if (docNode.comment) { + return docNode.comment.trim(); + } + return ''; + }) + .filter(Boolean) + .join('\n\n'); + } + + const docLines = docComment.split(/\r?\n/); + + mdOutput += `- **${name}**:\n`; + for (const line of docLines) { + mdOutput += ` ${line}\n`; + } + mdOutput += '\n'; + } + } + + // Continue walking + ts.forEachChild(node, visit); + } + + // Kick off the AST walk + visit(sourceFile); + + // 5. Generate Markdown output + // We'll do a simple format, e.g.: + // # ErrorCode Documentation + // + // ## PSYNC_Rxxxx: Sync rules issues + // - **PSYNC_R0001** + // Catch-all sync rules parsing error, if no more specific error is available + // + // ## PSYNC_Sxxxx: Service issues + // - **PSYNC_S0001** + // Internal assertion... + // ... + // + + return mdOutput; +} + +const filePath = 'src/codes.ts'; + +const md = generateMarkdownDocs(filePath); +console.log(md); diff --git a/packages/service-errors/package.json b/packages/service-errors/package.json new file mode 100644 index 000000000..5177e699a --- /dev/null +++ b/packages/service-errors/package.json @@ -0,0 +1,23 @@ +{ + "name": "@powersync/service-errors", + "repository": "https://github.com/powersync-ja/powersync-service", + "version": "0.1.0", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "license": "FSL-1.1-Apache-2.0", + "publishConfig": { + "access": "public" + }, + "files": [ + "dist/**/*" + ], + "type": "module", + "scripts": { + "clean": "rm -r ./dist && tsc -b --clean", + "build": "tsc -b", + "build:tests": "tsc -b test/tsconfig.json", + "test": "vitest" + }, + "dependencies": {}, + "devDependencies": {} +} diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts new file mode 100644 index 000000000..18901d51b --- /dev/null +++ b/packages/service-errors/src/codes.ts @@ -0,0 +1,354 @@ +/** + * Error codes used across the service. + * + * This is the primary definition of error codes, as well as the documentation + * for each. + */ +export enum ErrorCode { + // # PSYNC_Rxxxx: Sync rules issues + + /** + * Catch-all sync rules parsing error, if no more specific error is available + */ + PSYNC_R0001 = 'PSYNC_R0001', + + // ## PSYNC_R11xx: YAML syntax issues + + // ## PSYNC_R12xx: YAML structure (schema) issues + + // ## PSYNC_R21xx: SQL syntax issues + + // ## PSYNC_R22xx: SQL supported feature issues + + // ## PSYNC_R23xx: SQL schema mismatch issues + + // ## PSYNC_R24xx: SQL security warnings + + // # PSYNC_Sxxxx: Service issues + + /** + * Internal assertion. + * + * If you see this error, it might indicate a bug in the service code. + */ + PSYNC_S0001 = 'PSYNC_S0001', + + /** + * TEARDOWN was not acknowledged. + * + * This happens when the TEARDOWN argument was not supplied when running + * the service teardown command. The TEARDOWN argument is required since + * this is a destructive command. + * + * Run the command with `teardown TEARDOWN` to confirm. + */ + PSYNC_S0102 = 'PSYNC_S0102', + + // ## PSYNC_S1xxx: Replication issues + + /** + * Row too large. + * + * There is a 15MB size limit on every replicated row - rows larger than + * this cannot be replicated. + */ + PSYNC_S1002 = 'PSYNC_S1002', + + /** + * Sync rules have been locked by another process for replication. + * + * This error is normal in some circumstances: + * 1. In some cases, if a process was forcefully terminated, this error may occur for up to a minute. + * 2. During rolling deploys, this error may occur until the old process stops replication. + * + * If the error persists for longer, this may indicate that multiple replication processes are running. + * Make sure there is only one replication process apart from rolling deploys. + */ + PSYNC_S1003 = 'PSYNC_S1003', + + /** + * JSON nested object depth exceeds the limit of 20. + * + * This may occur if there is very deep nesting in JSON or embedded documents. + */ + PSYNC_S1004 = 'PSYNC_S1004', + + // ## PSYNC_S11xx: Postgres replication issues + + /** + * Replication assertion error. + * + * If you see this error, it might indicate a bug in the service code. + */ + PSYNC_S1101 = 'PSYNC_S1101', + + /** + * Aborted initial replication. + * + * This is not an actual error - it is expected when the replication process + * is stopped, or if replication is stopped for any other reason. + */ + PSYNC_S1103 = 'PSYNC_S1103', + + /** + * Explicit cacert is required for `sslmode: verify-ca`. + * + * Use either verify-full, or specify a certificate with verify-ca. + */ + PSYNC_S1104 = 'PSYNC_S1104', + + /** + * `database` is required in connection config. + * + * Specify the database explicitly, or in the `uri` field. + */ + PSYNC_S1105 = 'PSYNC_S1105', + + /** + * `hostname` is required in connection config. + * + * Specify the hostname explicitly, or in the `uri` field. + */ + PSYNC_S1106 = 'PSYNC_S1106', + + /** + * `username` is required in connection config. + * + * Specify the username explicitly, or in the `uri` field. + */ + PSYNC_S1107 = 'PSYNC_S1107', + + /** + * `password` is required in connection config. + * + * Specify the password explicitly, or in the `uri` field. + */ + PSYNC_S1108 = 'PSYNC_S1108', + + /** + * Invalid database URI. + * + * Check the URI scheme and format. + */ + PSYNC_S1109 = 'PSYNC_S1109', + + /** + * Invalid port number. + * + * Only ports in the range 1024 - 65535 are supported. + */ + PSYNC_S1110 = 'PSYNC_S1110', + + /** + * Publication does not exist. + * + * Run: `CREATE PUBLICATION powersync FOR ALL TABLES` on the source database. + */ + PSYNC_S1141 = 'PSYNC_S1141', + + /** + * Publication does not publish all changes. + * + * Create a publication using `WITH (publish = "insert, update, delete, truncate")` (the default). + */ + PSYNC_S1142 = 'PSYNC_S1142', + + /** + * Publication uses publish_via_partition_root. + */ + PSYNC_S1143 = 'PSYNC_S1143', + + // ## PSYNC_S12xx: MySQL replication issues + + // ## PSYNC_S13xx: MongoDB replication issues + + /** + * Generic MongoServerError. + */ + PSYNC_S1301 = 'PSYNC_S1301', + + /** + * Generic MongoNetworkError. + */ + PSYNC_S1302 = 'PSYNC_S1302', + + /** + * MongoDB internal TLS error. + * + * If connection to a shared cluster on MongoDB Atlas, this could be an IP Acccess List issue. + * Check that the service IP is allowed to connect to the cluster. + */ + PSYNC_S1303 = 'PSYNC_S1303', + + /** + * MongoDB connection DNS error. + * + * Check that the hostname is correct. + */ + PSYNC_S1304 = 'PSYNC_S1304', + + /** + * MongoDB connection timeout. + * + * Check that the hostname is correct, and that the service IP is allowed to connect to the cluster. + */ + PSYNC_S1305 = 'PSYNC_S1305', + + /** + * MongoDB authentication error. + * + * Check the username and password. + */ + PSYNC_S1306 = 'PSYNC_S1306', + + /** + * MongoDB authorization error. + * + * Check that the user has the required priviledges. + */ + PSYNC_S1307 = 'PSYNC_S1307', + + /** + * Sharded MongoDB Clusters are not supported yet. + */ + PSYNC_S1341 = 'PSYNC_S1341', + + /** + * Standalone MongoDB instances are not supported - use a replicaset. + */ + PSYNC_S1342 = 'PSYNC_S1342', + + /** + * PostImages not enabled on a source collection. + * + * Use `post_images: auto_configure` to configure post images automatically, or enable manually: + * + * db.runCommand({ + * collMod: 'collection-name', + * changeStreamPreAndPostImages: { enabled: true } + * }); + */ + PSYNC_S1343 = 'PSYNC_S1343', + + // ## PSYNC_S14xx: MongoDB storage replication issues + + /** + * Max transaction tries exceeded. + */ + PSYNC_S1402 = 'PSYNC_S1402', + + // ## PSYNC_S2xxx: Service API + + /** + * Generic internal server error (HTTP 500). + * + * See the error details for more info. + */ + PSYNC_S2001 = 'PSYNC_S2001', + + /** + * Route not found (HTTP 404). + */ + PSYNC_S2002 = 'PSYNC_S2002', + + /** + * 503 service unavailable due to restart. + * + * Wait a while then retry the request. + */ + PSYNC_S2003 = 'PSYNC_S2003', + + // ## PSYNC_S21xx: Auth errors originating on the client. + // + // This does not include auth configuration errors on the service. + + /** + * Generic authentication error. + */ + PSYNC_S2101 = 'PSYNC_S2101', + + // ## PSYNC_S22xx: Auth integration errors + + /** + * IPv6 support is not enabled for the JWKS URI. + * + * Use an endpoint that supports IPv4. + */ + PSYNC_S2202 = 'PSYNC_S2202', + + /** + * IPs in this range are not supported. + * + * Make sure to use a publically-accessible JWKS URI. + */ + PSYNC_S2203 = 'PSYNC_S2203', + + // ## PSYNC_S23xx: Sync API errors + + /** + * No sync rules available. + * + * This error may happen if: + * 1. Sync rules have not been deployed. + * 2. Sync rules have been deployed, but is still busy processing. + * + * View the replicator logs to see if the sync rules are being processed. + */ + PSYNC_S2302 = 'PSYNC_S2302', + + /** + * Maximum active concurrent connections limit has been reached. + */ + PSYNC_S2304 = 'PSYNC_S2304', + + // ## PSYNC_S23xx: Sync API errors - MongoDB Storage + + /** + * Could not get clusterTime. + */ + PSYNC_S2401 = 'PSYNC_S2401', + + // ## PSYNC_S23xx: Sync API errors - Postgres Storage + + // ## PSYNC_S3xxx: Service configuration issues + + // ## PSYNC_S31xx: Auth configuration issues + + /** + * Invalid jwks_uri. + */ + PSYNC_S3102 = 'PSYNC_S3102', + + /** + * Only http(s) is supported for jwks_uri. + */ + PSYNC_S3103 = 'PSYNC_S3103', + + // ## PSYNC_S32xx: Replication configuration issue. + + /** + * Failed to validate module configuration. + */ + PSYNC_S3201 = 'PSYNC_S3201', + + // ## PSYNC_S4000: management / dev apis + + /** + * Internal assertion error. + * + * This error may indicate a bug in the service code. + */ + PSYNC_S4001 = 'PSYNC_S4001', + + /** + * No active sync rules. + */ + PSYNC_S4104 = 'PSYNC_S4104', + + /** + * Sync rules API disabled. + * + * When a sync rules file is configured, the dynamic sync rules API is disabled. + */ + PSYNC_S4105 = 'PSYNC_S4105' +} diff --git a/packages/service-errors/src/errors.ts b/packages/service-errors/src/errors.ts new file mode 100644 index 000000000..bfa024ae9 --- /dev/null +++ b/packages/service-errors/src/errors.ts @@ -0,0 +1,217 @@ +import { ErrorCode } from './codes.js'; + +export enum ErrorSeverity { + INFO = 'info', + WARNING = 'warning', + ERROR = 'error' +} + +export type ErrorData = { + name?: string; + + code: ErrorCode; + description: string; + + severity?: ErrorSeverity; + details?: string; + status?: number; + stack?: string; + + origin?: string; + + trace_id?: string; +}; + +export class ServiceError extends Error { + is_service_error = true; + + errorData: ErrorData; + + static isServiceError(input: any): input is ServiceError { + return input instanceof ServiceError || input?.is_service_error == true; + } + + private static errorMessage(data: ErrorData | ErrorCode, description?: string) { + if (typeof data == 'string') { + data = { + code: data, + description: description! + }; + } + let message = `[${data.code}] ${data.description}`; + if (data.details) { + message += `\n ${data.details}`; + } + return message; + } + + constructor(data: ErrorData); + constructor(code: ErrorCode, description: string); + + constructor(data: ErrorData | ErrorCode, description?: string) { + super(ServiceError.errorMessage(data, description)); + if (typeof data == 'string') { + data = { + code: data, + description: description! + }; + } + this.errorData = data; + if (data.stack) { + this.stack = data.stack; + } + + this.name = data.name || this.constructor.name; + this.errorData.name = this.name; + } + + toString() { + return this.stack; + } + + toJSON(): ErrorData { + if (process.env.NODE_ENV !== 'production') { + return this.errorData; + } + return { + name: this.errorData.name, + code: this.errorData.code, + status: this.errorData.status, + description: this.errorData.description, + details: this.errorData.details, + trace_id: this.errorData.trace_id, + severity: this.errorData.severity, + origin: this.errorData.origin + }; + } + + setTraceId(id: string) { + this.errorData.trace_id = id; + } +} + +/** + * @deprecated Use more specific errors + */ +export class ValidationError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S2001; + + constructor(errors: any) { + super({ + code: ValidationError.CODE, + status: 400, + description: 'Validation failed', + details: JSON.stringify(errors) + }); + } +} + +/** + * Use for replication errors that are never expected to happen in production. + * + * If it does happen, it is either: + * 1. A bug in the code that should be fixed. + * 2. An error that needs a different error code. + */ +export class ReplicationAssertionError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S1101; + constructor(description: string) { + super({ + code: ReplicationAssertionError.CODE, + status: 500, + description: description + }); + } +} + +/** + * Use for general service errors that are never expected to happen in production. + * + * If it does happen, it is either: + * 1. A bug in the code that should be fixed. + * 2. An error that needs a different error code. + */ +export class ServiceAssertionError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S0001; + constructor(description: string) { + super({ + code: ServiceAssertionError.CODE, + status: 500, + description: description + }); + } +} + +/** + * Indicates replication is aborted. + * + * This is not an actual error - rather just an indication + * that something requested the replication should stop. + */ +export class ReplicationAbortedError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S1103; + + constructor(description?: string) { + super({ + code: ReplicationAbortedError.CODE, + description: description ?? 'Replication aborted' + }); + } +} + +export class AuthorizationError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S2101; + + constructor(errors: any) { + super({ + code: AuthorizationError.CODE, + status: 401, + description: 'Authorization failed', + details: errors + }); + } +} + +export class InternalServerError extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S2001; + + constructor(err: Error) { + super({ + code: InternalServerError.CODE, + severity: ErrorSeverity.ERROR, + status: 500, + description: 'Something went wrong', + details: err.message, + stack: process.env.NODE_ENV !== 'production' ? err.stack : undefined + }); + } +} + +export class RouteNotFound extends ServiceError { + static readonly CODE = ErrorCode.PSYNC_S2002; + + constructor(path: string) { + super({ + code: RouteNotFound.CODE, + status: 404, + description: 'The path does not exist on this server', + details: `The path ${JSON.stringify(path)} does not exist on this server`, + severity: ErrorSeverity.INFO + }); + } +} + +export class DatabaseConnectionError extends ServiceError { + public cause: any; + + constructor(code: ErrorCode, message: string, cause: any) { + super({ + code: code, + status: 500, + description: message, + details: `cause: ${cause.message}`, + stack: process.env.NODE_ENV !== 'production' ? cause.stack : undefined + }); + this.cause = cause; + } +} diff --git a/packages/service-errors/src/index.ts b/packages/service-errors/src/index.ts new file mode 100644 index 000000000..f0d733af7 --- /dev/null +++ b/packages/service-errors/src/index.ts @@ -0,0 +1,3 @@ +export * from './errors.js'; +export * from './codes.js'; +export * from './utils.js'; diff --git a/packages/service-errors/src/utils.ts b/packages/service-errors/src/utils.ts new file mode 100644 index 000000000..a5ca8ecaf --- /dev/null +++ b/packages/service-errors/src/utils.ts @@ -0,0 +1,28 @@ +import { ErrorData, InternalServerError, ServiceError } from './errors.js'; + +export const isServiceError = (err: any): err is ServiceError => { + const matches = ServiceError.isServiceError(err); + return !!matches; +}; + +export const asServiceError = (err: any): ServiceError => { + if (ServiceError.isServiceError(err)) { + return err; + } else { + return new InternalServerError(err); + } +}; + +export const getErrorData = (err: Error | any): ErrorData | undefined => { + if (!isServiceError(err)) { + return; + } + return err.toJSON(); +}; + +export const matchesErrorCode = (err: Error | any, code: string) => { + if (isServiceError(err)) { + return err.errorData.code === code; + } + return false; +}; diff --git a/packages/service-errors/test/src/codes.test.ts b/packages/service-errors/test/src/codes.test.ts new file mode 100644 index 000000000..93b4d0c44 --- /dev/null +++ b/packages/service-errors/test/src/codes.test.ts @@ -0,0 +1,31 @@ +import { describe, it, expect } from 'vitest'; +import { ErrorCode } from '../../src/codes.js'; + +// This file never exports anything - it is only used for checking our +// error code definitions. + +// Step 1: Check that the codes match the syntax `PSYNC_xxxxx`. + +type Digit = '0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9'; +type Category = 'S' | 'R'; + +// Note: This generates a type union of 20k possiblities, +// which could potentially slow down the TypeScript compiler. +// If it does, we could switch to a simpler `PSYNC_${Category}${number}` type. +type ServiceErrorCode = `PSYNC_${Category}${Digit}${Digit}${Digit}${Digit}`; + +describe('Service Error Codes', () => { + it('should match PSYNC_xxxxx', () => { + // tsc checks this for us + null as unknown as ErrorCode satisfies ServiceErrorCode; + }); + + it('should have matching keys and values', () => { + const codes = Object.keys(ErrorCode); + expect(codes.length).toBeGreaterThan(40); + for (let key of codes) { + const value = (ErrorCode as any)[key]; + expect(value).toEqual(key); + } + }); +}); diff --git a/packages/service-errors/test/tsconfig.json b/packages/service-errors/test/tsconfig.json new file mode 100644 index 000000000..124a1cbe5 --- /dev/null +++ b/packages/service-errors/test/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "baseUrl": "./", + "outDir": "dist", + "esModuleInterop": true, + "skipLibCheck": true, + "sourceMap": true, + "paths": { + "@/*": ["../src/*"] + } + }, + "include": ["src"], + "references": [ + { + "path": "../" + } + ] +} diff --git a/packages/service-errors/tsconfig.json b/packages/service-errors/tsconfig.json new file mode 100644 index 000000000..d37ee074d --- /dev/null +++ b/packages/service-errors/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "declarationDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "references": [] +} diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index ca5f05d85..e6462b62d 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -110,6 +110,11 @@ export class SqlSyncRules implements SyncRules { const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap }; const key = keyScalar.toString(); + if (value == null || !(value instanceof YAMLMap)) { + rules.errors.push(this.tokenError(keyScalar, `'${key}' bucket definition must be an object`)); + continue; + } + const accept_potentially_dangerous_queries = value.get('accept_potentially_dangerous_queries', true)?.value == true; const queryOptions: QueryParseOptions = { diff --git a/packages/sync-rules/src/errors.ts b/packages/sync-rules/src/errors.ts index 88a86547f..c00646478 100644 --- a/packages/sync-rules/src/errors.ts +++ b/packages/sync-rules/src/errors.ts @@ -76,3 +76,13 @@ export class SyncRulesErrors extends Error { super(SyncRulesErrors.constructMessage(errors)); } } + +export type SyncRulesErrorCode = `PSYNC_${string}`; +export class SyncRuleProcessingError extends Error { + public code: SyncRulesErrorCode; + + constructor(code: SyncRulesErrorCode, message: string) { + super(`[${code}] ${message}`); + this.code = code; + } +} diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index 06e34f540..af4fbf7b6 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -1,16 +1,8 @@ -import { Statement, SelectFromStatement } from 'pgsql-ast-parser'; -import { - DatabaseInputRow, - SqliteRow, - SqliteJsonRow, - SqliteJsonValue, - SqliteValue, - RequestParameters, - RequestJwtPayload -} from './types.js'; +import { JSONBig, JsonContainer, Replacer, stringifyRaw } from '@powersync/service-jsonbig'; +import { SelectFromStatement, Statement } from 'pgsql-ast-parser'; import { SQLITE_FALSE, SQLITE_TRUE } from './sql_support.js'; -import { JsonContainer } from '@powersync/service-jsonbig'; -import { JSONBig, stringifyRaw, Replacer } from '@powersync/service-jsonbig'; +import { DatabaseInputRow, SqliteJsonRow, SqliteJsonValue, SqliteRow, SqliteValue } from './types.js'; +import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js'; export function isSelectStatement(q: Statement): q is SelectFromStatement { return q.type == 'select'; @@ -69,7 +61,12 @@ export function isJsonValue(value: SqliteValue): value is SqliteJsonValue { function filterJsonData(data: any, depth = 0): any { if (depth > DEPTH_LIMIT) { // This is primarily to prevent infinite recursion - throw new Error(`json nested object depth exceeds the limit of ${DEPTH_LIMIT}`); + // TODO: Proper error class + throw new SyncRulesProcessingError( + // FIXME: Use @powersync/service-errors + 'PSYNC_S1004', + `json nested object depth exceeds the limit of ${DEPTH_LIMIT}` + ); } if (data == null) { return data; // null or undefined diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index 54d507440..7741dff79 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -745,6 +745,29 @@ bucket_definitions: ]); }); + test('null bucket definition', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + mybucket: + `, + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS, throwOnError: false } + ); + + expect(rules.errors).toMatchObject([ + { + message: "'mybucket' bucket definition must be an object", + type: 'fatal' + }, + // Ideally this should not be displayed - it's an additional JSON schema validation error + // for the same issue. For now we just include both. + { + message: 'must be object', + type: 'fatal' + } + ]); + }); + test('dangerous query errors', () => { const rules = SqlSyncRules.fromYaml( ` diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f914cf675..5ea006fb8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -114,6 +114,9 @@ importers: libs/lib-services: dependencies: + '@powersync/service-errors': + specifier: workspace:* + version: link:../../packages/service-errors ajv: specifier: ^8.12.0 version: 8.16.0 @@ -571,6 +574,8 @@ importers: specifier: ^5.6.2 version: 5.6.2 + packages/service-errors: {} + packages/sync-rules: dependencies: '@powersync/service-jsonbig': diff --git a/service/Dockerfile b/service/Dockerfile index ea01a10fe..e751a388a 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -10,6 +10,7 @@ COPY packages/jpgwire/package.json packages/jpgwire/tsconfig.json packages/jpgwi COPY packages/jsonbig/package.json packages/jsonbig/tsconfig.json packages/jsonbig/ COPY packages/rsocket-router/package.json packages/rsocket-router/tsconfig.json packages/rsocket-router/ COPY packages/service-core/package.json packages/service-core/tsconfig.json packages/service-core/ +COPY packages/service-errors/package.json packages/service-errors/tsconfig.json packages/service-errors/ COPY packages/sync-rules/package.json packages/sync-rules/tsconfig.json packages/sync-rules/ COPY packages/types/package.json packages/types/tsconfig.json packages/types/ @@ -28,6 +29,7 @@ RUN pnpm install --frozen-lockfile COPY service/src service/src/ COPY packages/service-core/src packages/service-core/src/ +COPY packages/service-errors/src packages/service-errors/src/ COPY packages/jpgwire/src packages/jpgwire/src/ COPY packages/jpgwire/ca packages/jpgwire/ca/ COPY packages/jsonbig/src packages/jsonbig/src/ diff --git a/tsconfig.json b/tsconfig.json index fc9861cc0..e9d0017c5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,6 +13,9 @@ { "path": "./packages/service-core" }, + { + "path": "./packages/service-errors" + }, { "path": "./packages/sync-rules" },