diff --git a/.changeset/bright-gifts-turn.md b/.changeset/bright-gifts-turn.md new file mode 100644 index 000000000..6c69422ee --- /dev/null +++ b/.changeset/bright-gifts-turn.md @@ -0,0 +1,23 @@ +--- +'@powersync/service-core': minor +--- + +- Added `ServiceContextMode` to `ServiceContext`. This conveys the mode in which the PowerSync service was started in. +- `RouterEngine` is now always present on `ServiceContext`. The router will only configure actual servers, when started, if routes have been registered. +- Added typecasting to `!env` YAML custom tag function. YAML config environment variable substitution now supports casting string environment variables to `number` and `boolean` types. + +```yaml +replication: + connections: [] + +storage: + type: mongodb + +api: + parameters: + max_buckets_per_connection: !env PS_MAX_BUCKETS::number + +healthcheck: + probes: + use_http: !env PS_MONGO_HEALTHCHECK::boolean +``` diff --git a/.changeset/khaki-cows-thank.md b/.changeset/khaki-cows-thank.md new file mode 100644 index 000000000..a54af7a28 --- /dev/null +++ b/.changeset/khaki-cows-thank.md @@ -0,0 +1,32 @@ +--- +'@powersync/service-image': minor +--- + +- Added typecasting to `!env` YAML custom tag function. YAML config environment variable substitution now supports casting string environment variables to `number` and `boolean` types. + +```yaml +replication: + connections: [] + +storage: + type: mongodb + +api: + parameters: + max_buckets_per_connection: !env PS_MAX_BUCKETS::number + +healthcheck: + probes: + use_http: !env PS_MONGO_HEALTHCHECK::boolean +``` + +- Added the ability to customize healthcheck probe exposure in the configuration. Backwards compatibility is maintained if no `healthcheck->probes` config is provided. + +```yaml +healthcheck: + probes: + # Health status can be accessed by reading files (previously always enabled) + use_filesystem: true + # Health status can be accessed via HTTP requests (previously enabled for API and UNIFIED service modes) + use_http: true +``` diff --git a/.changeset/little-pianos-fetch.md b/.changeset/little-pianos-fetch.md new file mode 100644 index 000000000..12ec2c816 --- /dev/null +++ b/.changeset/little-pianos-fetch.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-types': minor +--- + +Added healthcheck types to PowerSyncConfig diff --git a/.changeset/shaggy-bikes-relax.md b/.changeset/shaggy-bikes-relax.md new file mode 100644 index 000000000..384f309e5 --- /dev/null +++ b/.changeset/shaggy-bikes-relax.md @@ -0,0 +1,5 @@ +--- +'@powersync/lib-services-framework': minor +--- + +Switched default health check probe mechanism from filesystem to in-memory implementation. Consumers now need to manually opt-in to filesystem probes. diff --git a/.changeset/thick-wolves-invent.md b/.changeset/thick-wolves-invent.md new file mode 100644 index 000000000..e39cc3dea --- /dev/null +++ b/.changeset/thick-wolves-invent.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-core': minor +--- + +Initial core module release. This moves RouterEngine API route registrations, health check probe configuration and metrics configuration from the service runners to this shared module. diff --git a/libs/lib-services/src/container.ts b/libs/lib-services/src/container.ts index f081b995d..f591f24d9 100644 --- a/libs/lib-services/src/container.ts +++ b/libs/lib-services/src/container.ts @@ -1,9 +1,14 @@ +import { ServiceAssertionError } from '@powersync/service-errors'; import _ from 'lodash'; import { ErrorReporter } from './alerts/definitions.js'; import { NoOpReporter } from './alerts/no-op-reporter.js'; import { MigrationManager } from './migrations/MigrationManager.js'; -import { ProbeModule, TerminationHandler, createFSProbe, createTerminationHandler } from './signals/signals-index.js'; -import { ServiceAssertionError } from '@powersync/service-errors'; +import { + ProbeModule, + TerminationHandler, + createInMemoryProbe, + createTerminationHandler +} from './signals/signals-index.js'; export enum ContainerImplementation { REPORTER = 'reporter', @@ -45,7 +50,7 @@ export type ServiceIdentifier = string | symbol | Newable | Abst const DEFAULT_GENERATORS: ContainerImplementationDefaultGenerators = { [ContainerImplementation.REPORTER]: () => NoOpReporter, - [ContainerImplementation.PROBES]: () => createFSProbe(), + [ContainerImplementation.PROBES]: () => createInMemoryProbe(), [ContainerImplementation.TERMINATION_HANDLER]: () => createTerminationHandler(), [ContainerImplementation.MIGRATION_MANAGER]: () => new MigrationManager() }; diff --git a/modules/module-core/CHANGELOG.md b/modules/module-core/CHANGELOG.md new file mode 100644 index 000000000..9f62cbaa1 --- /dev/null +++ b/modules/module-core/CHANGELOG.md @@ -0,0 +1 @@ +# @powersync/service-module-core diff --git a/modules/module-core/LICENSE b/modules/module-core/LICENSE new file mode 100644 index 000000000..c8efd46cc --- /dev/null +++ b/modules/module-core/LICENSE @@ -0,0 +1,67 @@ +# Functional Source License, Version 1.1, Apache 2.0 Future License + +## Abbreviation + +FSL-1.1-Apache-2.0 + +## Notice + +Copyright 2023-2024 Journey Mobile, Inc. + +## Terms and Conditions + +### Licensor ("We") + +The party offering the Software under these Terms and Conditions. + +### The Software + +The "Software" is each version of the software that we make available under these Terms and Conditions, as indicated by our inclusion of these Terms and Conditions with the Software. + +### License Grant + +Subject to your compliance with this License Grant and the Patents, Redistribution and Trademark clauses below, we hereby grant you the right to use, copy, modify, create derivative works, publicly perform, publicly display and redistribute the Software for any Permitted Purpose identified below. + +### Permitted Purpose + +A Permitted Purpose is any purpose other than a Competing Use. A Competing Use means making the Software available to others in a commercial product or service that: + +1. substitutes for the Software; +2. substitutes for any other product or service we offer using the Software that exists as of the date we make the Software available; or +3. offers the same or substantially similar functionality as the Software. + +Permitted Purposes specifically include using the Software: + +1. for your internal use and access; +2. for non-commercial education; +3. for non-commercial research; and +4. in connection with professional services that you provide to a licensee using the Software in accordance with these Terms and Conditions. + +### Patents + +To the extent your use for a Permitted Purpose would necessarily infringe our patents, the license grant above includes a license under our patents. If you make a claim against any party that the Software infringes or contributes to the infringement of any patent, then your patent license to the Software ends immediately. + +### Redistribution + +The Terms and Conditions apply to all copies, modifications and derivatives of the Software. +If you redistribute any copies, modifications or derivatives of the Software, you must include a copy of or a link to these Terms and Conditions and not remove any copyright notices provided in or with the Software. + +### Disclaimer + +THE SOFTWARE IS PROVIDED "AS IS" AND WITHOUT WARRANTIES OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABILITY, TITLE OR NON-INFRINGEMENT. +IN NO EVENT WILL WE HAVE ANY LIABILITY TO YOU ARISING OUT OF OR RELATED TO THE SOFTWARE, INCLUDING INDIRECT, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES, EVEN IF WE HAVE BEEN INFORMED OF THEIR POSSIBILITY IN ADVANCE. + +### Trademarks + +Except for displaying the License Details and identifying us as the origin of the Software, you have no right under these Terms and Conditions to use our trademarks, trade names, service marks or product names. + +## Grant of Future License + +We hereby irrevocably grant you an additional license to use the Software under the Apache License, Version 2.0 that is effective on the second anniversary of the date we make the Software available. On or after that date, you may use the Software under the Apache License, Version 2.0, in which case the following will apply: + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/modules/module-core/README.md b/modules/module-core/README.md new file mode 100644 index 000000000..a924059a2 --- /dev/null +++ b/modules/module-core/README.md @@ -0,0 +1,3 @@ +# PowerSync Service Module Core + +Module which registers and configures basic core functionality for PowerSync services. diff --git a/modules/module-core/package.json b/modules/module-core/package.json new file mode 100644 index 000000000..cda225db6 --- /dev/null +++ b/modules/module-core/package.json @@ -0,0 +1,39 @@ +{ + "name": "@powersync/service-module-core", + "repository": "https://github.com/powersync-ja/powersync-service", + "types": "dist/index.d.ts", + "version": "0.0.0", + "main": "dist/index.js", + "license": "FSL-1.1-Apache-2.0", + "type": "module", + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsc -b", + "build:tests": "tsc -b test/tsconfig.json", + "clean": "rm -rf ./dist && tsc -b --clean", + "test": "vitest" + }, + "exports": { + ".": { + "import": "./dist/index.js", + "require": "./dist/index.js", + "default": "./dist/index.js" + }, + "./types": { + "import": "./dist/types/types.js", + "require": "./dist/types/types.js", + "default": "./dist/types/types.js" + } + }, + "dependencies": { + "@powersync/lib-services-framework": "workspace:*", + "@powersync/service-core": "workspace:*", + "@powersync/service-rsocket-router": "workspace:*", + "@powersync/service-types": "workspace:*", + "fastify": "4.23.2", + "@fastify/cors": "8.4.1" + }, + "devDependencies": {} +} diff --git a/modules/module-core/src/CoreModule.ts b/modules/module-core/src/CoreModule.ts new file mode 100644 index 000000000..7ba637be2 --- /dev/null +++ b/modules/module-core/src/CoreModule.ts @@ -0,0 +1,178 @@ +import cors from '@fastify/cors'; +import * as framework from '@powersync/lib-services-framework'; +import * as core from '@powersync/service-core'; +import { ReactiveSocketRouter } from '@powersync/service-rsocket-router'; +import fastify from 'fastify'; + +export class CoreModule extends core.modules.AbstractModule { + constructor() { + super({ + name: 'Core' + }); + } + + public async initialize(context: core.ServiceContextContainer): Promise { + this.configureTags(context); + + if ([core.system.ServiceContextMode.API, core.system.ServiceContextMode.UNIFIED].includes(context.serviceMode)) { + // Service should include API routes + this.registerAPIRoutes(context); + } + + // Configures a Fastify server and RSocket server + this.configureRouterImplementation(context); + + // Configures health check probes based off configuration + this.configureHealthChecks(context); + + await this.configureMetrics(context); + } + + protected configureTags(context: core.ServiceContextContainer) { + core.utils.setTags(context.configuration.metadata); + } + + protected registerAPIRoutes(context: core.ServiceContextContainer) { + context.routerEngine.registerRoutes({ + api_routes: [ + ...core.routes.endpoints.ADMIN_ROUTES, + ...core.routes.endpoints.CHECKPOINT_ROUTES, + ...core.routes.endpoints.SYNC_RULES_ROUTES + ], + stream_routes: [...core.routes.endpoints.SYNC_STREAM_ROUTES], + socket_routes: [core.routes.endpoints.syncStreamReactive] + }); + } + + /** + * Configures the HTTP server which will handle routes once the router engine is started + */ + protected configureRouterImplementation(context: core.ServiceContextContainer) { + context.lifeCycleEngine.withLifecycle(context.routerEngine, { + start: async (routerEngine) => { + // The router engine will only start servers if routes have been registered + await routerEngine.start(async (routes) => { + const server = fastify.fastify(); + + server.register(cors, { + origin: '*', + allowedHeaders: ['Content-Type', 'Authorization', 'User-Agent', 'X-User-Agent'], + exposedHeaders: ['Content-Type'], + // Cache time for preflight response + maxAge: 3600 + }); + + core.routes.configureFastifyServer(server, { + service_context: context, + routes: { + api: { routes: routes.api_routes }, + sync_stream: { + routes: routes.stream_routes, + queue_options: { + concurrency: context.configuration.api_parameters.max_concurrent_connections, + max_queue_depth: 0 + } + } + } + }); + + const socketRouter = new ReactiveSocketRouter({ + max_concurrent_connections: context.configuration.api_parameters.max_concurrent_connections + }); + + core.routes.configureRSocket(socketRouter, { + server: server.server, + service_context: context, + route_generators: routes.socket_routes + }); + + const { port } = context.configuration; + + await server.listen({ + host: '0.0.0.0', + port + }); + + framework.logger.info(`Running on port ${port}`); + + return { + onShutdown: async () => { + framework.logger.info('Shutting down HTTP server...'); + await server.close(); + framework.logger.info('HTTP server stopped'); + } + }; + }); + } + }); + } + + protected configureHealthChecks(context: core.ServiceContextContainer) { + const { + configuration: { + healthcheck: { probes } + } + } = context; + + const exposesAPI = [core.system.ServiceContextMode.API, core.system.ServiceContextMode.UNIFIED].includes( + context.serviceMode + ); + + if (context.serviceMode == core.system.ServiceContextMode.API) { + /** + * In the pure API mode we don't currently have any other code which touches the probes. + * This configures a timer which will touch every 5 seconds. + */ + let timer: NodeJS.Timeout | null = null; + context.lifeCycleEngine.withLifecycle(null, { + start: () => { + timer = setInterval(() => { + context + .get(framework.ContainerImplementation.PROBES) + .touch() + .catch((ex) => this.logger.error(`Caught error while updating liveness probe: ${ex}`)); + }, 5_000); + }, + stop: () => { + if (timer) { + clearInterval(timer); + timer = null; + } + } + }); + } + + /** + * Maintains backwards compatibility if LEGACY_DEFAULT is present by: + * - Enabling HTTP probes if the service started in API or UNIFIED mode + * - Always enabling filesystem probes always exposing HTTP probes + * Probe types must explicitly be selected if not using LEGACY_DEFAULT + */ + if (probes.use_http || (exposesAPI && probes.use_legacy)) { + context.routerEngine.registerRoutes({ + api_routes: core.routes.endpoints.PROBES_ROUTES + }); + } + + if (probes.use_legacy || probes.use_filesystem) { + context.register(framework.ContainerImplementation.PROBES, framework.createFSProbe()); + } + } + + protected async configureMetrics(context: core.ServiceContextContainer) { + const apiMetrics = [core.metrics.MetricModes.API]; + const streamMetrics = [core.metrics.MetricModes.REPLICATION, core.metrics.MetricModes.STORAGE]; + const metricsModeMap: Partial> = { + [core.system.ServiceContextMode.API]: apiMetrics, + [core.system.ServiceContextMode.SYNC]: streamMetrics, + [core.system.ServiceContextMode.UNIFIED]: [...apiMetrics, ...streamMetrics] + }; + + await core.metrics.registerMetrics({ + service_context: context, + modes: metricsModeMap[context.serviceMode] ?? [] + }); + } + + public async teardown(options: core.modules.TearDownOptions): Promise {} +} diff --git a/modules/module-core/src/index.ts b/modules/module-core/src/index.ts new file mode 100644 index 000000000..e1b46a131 --- /dev/null +++ b/modules/module-core/src/index.ts @@ -0,0 +1 @@ +export * from './CoreModule.js'; diff --git a/modules/module-core/src/types/types.ts b/modules/module-core/src/types/types.ts new file mode 100644 index 000000000..445438b30 --- /dev/null +++ b/modules/module-core/src/types/types.ts @@ -0,0 +1 @@ +// No types for this module yet diff --git a/modules/module-core/test/src/probes.test.ts b/modules/module-core/test/src/probes.test.ts new file mode 100644 index 000000000..ecf0ae6fc --- /dev/null +++ b/modules/module-core/test/src/probes.test.ts @@ -0,0 +1,173 @@ +import { CoreModule } from '@module/CoreModule.js'; +import { container, ContainerImplementation, ProbeModule } from '@powersync/lib-services-framework'; +import { modules, system, utils } from '@powersync/service-core'; +import { constants } from 'fs'; +import fs from 'fs/promises'; +import path from 'path'; +import { describe, expect, it } from 'vitest'; + +type TestContainerOptions = { + yamlConfig: string; + mode: system.ServiceContextMode; +}; + +const createTestContainer = async (options: TestContainerOptions) => { + // Initialize framework components + container.registerDefaults(); + + const moduleManager = new modules.ModuleManager(); + moduleManager.register([new CoreModule()]); + + const collectedConfig = await new utils.CompoundConfigCollector().collectConfig({ + config_base64: Buffer.from(options.yamlConfig).toString('base64') + }); + + const serviceContext = new system.ServiceContextContainer({ + configuration: collectedConfig, + serviceMode: options.mode + }); + + await moduleManager.initialize(serviceContext); + + return { + [Symbol.asyncDispose]: async () => { + await serviceContext.lifeCycleEngine.stop(); + }, + serviceContext + }; +}; + +describe('Probes', () => { + it('should expose HTTP probes in API mode in legacy mode', async () => { + await using context = await createTestContainer({ + yamlConfig: /* yaml */ ` + # Test config + telemetry: + disable_telemetry_sharing: true + storage: + type: memory + `, + mode: system.ServiceContextMode.API + }); + + const { serviceContext } = context; + + // The router engine should have been configured with probe routes + const testRoute = serviceContext.routerEngine.routes.api_routes.find((r) => r.path == '/probes/startup'); + expect(testRoute).exist; + }); + + it('should not expose routes in sync mode in legacy mode', async () => { + await using context = await createTestContainer({ + yamlConfig: /* yaml */ ` + # Test config + telemetry: + disable_telemetry_sharing: true + storage: + type: memory + `, + mode: system.ServiceContextMode.SYNC + }); + + const { + serviceContext: { routerEngine } + } = context; + + // The router engine should have been configured with probe routes + expect(routerEngine.routes.api_routes).empty; + expect(routerEngine.routes.stream_routes).empty; + expect(routerEngine.routes.socket_routes).empty; + }); + + it('should not expose API routes in sync mode with HTTP probes enabled', async () => { + await using context = await createTestContainer({ + yamlConfig: /* yaml */ ` + # Test config + telemetry: + disable_telemetry_sharing: true + storage: + type: memory + healthcheck: + probes: + use_http: true + `, + mode: system.ServiceContextMode.SYNC + }); + + const { + serviceContext: { routerEngine } + } = context; + + // The router engine should have been configured with probe routes + expect(routerEngine.routes.stream_routes).empty; + expect(routerEngine.routes.socket_routes).empty; + expect(routerEngine.routes.api_routes.map((r) => r.path)).deep.equal([ + '/probes/startup', + '/probes/liveness', + '/probes/readiness' + ]); // Only the HTTP probes + }); + + it('should use filesystem probes in legacy mode', async () => { + await using context = await createTestContainer({ + yamlConfig: /* yaml */ ` + # Test config + telemetry: + disable_telemetry_sharing: true + storage: + type: memory + `, + mode: system.ServiceContextMode.API + }); + + const { serviceContext } = context; + + // This should be a filesystem probe + const probes = serviceContext.get(ContainerImplementation.PROBES); + + const aliveProbePath = path.join(process.cwd(), '.probes', 'poll'); + + await fs.unlink(aliveProbePath).catch(() => {}); + + await probes.touch(); + + const exists = await fs + .access(aliveProbePath, constants.F_OK) + .then(() => true) + .catch(() => false); + expect(exists).to.be.true; + }); + + it('should not use filesystem probes if not enabled', async () => { + await using context = await createTestContainer({ + yamlConfig: /* yaml */ ` + # Test config + telemetry: + disable_telemetry_sharing: true + storage: + type: memory + healthcheck: + probes: + use_http: true + `, + mode: system.ServiceContextMode.API + }); + + const { serviceContext } = context; + + // This should be a filesystem probe + const probes = serviceContext.get(ContainerImplementation.PROBES); + + const aliveProbePath = path.join(process.cwd(), '.probes', 'poll'); + + await fs.unlink(aliveProbePath).catch(() => {}); + + await probes.touch(); + + const exists = await fs + .access(aliveProbePath, constants.F_OK) + .then(() => true) + .catch(() => false); + expect(exists).to.be.false; + }); +}); diff --git a/modules/module-core/test/tsconfig.json b/modules/module-core/test/tsconfig.json new file mode 100644 index 000000000..afdd021e7 --- /dev/null +++ b/modules/module-core/test/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "baseUrl": "./", + "noEmit": true, + "esModuleInterop": true, + "skipLibCheck": true, + "sourceMap": true, + "paths": { + "@/*": ["../../../packages/service-core/src/*"], + "@module/*": ["../src/*"], + "@core-tests/*": ["../../../packages/service-core/test/src/*"] + } + }, + "include": ["src"], + "references": [ + { + "path": "../" + } + ] +} diff --git a/modules/module-core/tsconfig.json b/modules/module-core/tsconfig.json new file mode 100644 index 000000000..fcd895c62 --- /dev/null +++ b/modules/module-core/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist", + "esModuleInterop": true, + "skipLibCheck": true, + "sourceMap": true + }, + "include": ["src"], + "references": [ + { + "path": "../../packages/types" + }, + { + "path": "../../packages/service-core" + }, + { + "path": "../../libs/lib-services" + } + ] +} diff --git a/modules/module-core/vitest.config.ts b/modules/module-core/vitest.config.ts new file mode 100644 index 000000000..30831c4ce --- /dev/null +++ b/modules/module-core/vitest.config.ts @@ -0,0 +1,14 @@ +import tsconfigPaths from 'vite-tsconfig-paths'; +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + plugins: [tsconfigPaths()], + test: { + poolOptions: { + threads: { + singleThread: true + } + }, + pool: 'threads' + } +}); diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index ae50a8f03..0c2275b79 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -37,7 +37,10 @@ export function registerCompactAction(program: Command) { logger.info(`Compacting storage for ${buckets?.join(', ')}...`); } const config = await utils.loadConfig(extractRunnerOptions(options)); - const serviceContext = new system.ServiceContextContainer(config); + const serviceContext = new system.ServiceContextContainer({ + serviceMode: system.ServiceContextMode.COMPACT, + configuration: config + }); // Register modules in order to allow custom module compacting const moduleManager = container.getImplementation(modules.ModuleManager); diff --git a/packages/service-core/src/entry/commands/migrate-action.ts b/packages/service-core/src/entry/commands/migrate-action.ts index 3350118f5..9a1edd8e5 100644 --- a/packages/service-core/src/entry/commands/migrate-action.ts +++ b/packages/service-core/src/entry/commands/migrate-action.ts @@ -18,7 +18,10 @@ export function registerMigrationAction(program: Command) { .argument('', 'Migration direction. `up` or `down`') .action(async (direction: migrations.Direction, options) => { const config = await utils.loadConfig(extractRunnerOptions(options)); - const serviceContext = new system.ServiceContextContainer(config); + const serviceContext = new system.ServiceContextContainer({ + serviceMode: system.ServiceContextMode.MIGRATION, + configuration: config + }); // Register modules in order to allow custom module migrations const moduleManager = container.getImplementation(modules.ModuleManager); diff --git a/packages/service-core/src/entry/commands/test-connection-action.ts b/packages/service-core/src/entry/commands/test-connection-action.ts index ac67f1a9d..8e6095587 100644 --- a/packages/service-core/src/entry/commands/test-connection-action.ts +++ b/packages/service-core/src/entry/commands/test-connection-action.ts @@ -17,7 +17,10 @@ export function registerTestConnectionAction(program: Command) { return testConnectionCommand.description('Test connection').action(async (options) => { try { const config = await utils.loadConfig(extractRunnerOptions(options)); - const serviceContext = new system.ServiceContextContainer(config); + const serviceContext = new system.ServiceContextContainer({ + serviceMode: system.ServiceContextMode.TEST_CONNECTION, + configuration: config + }); const replication = new ReplicationEngine(); serviceContext.register(ReplicationEngine, replication); diff --git a/packages/service-core/src/routes/RouterEngine.ts b/packages/service-core/src/routes/RouterEngine.ts index 5852984e1..50f18f186 100644 --- a/packages/service-core/src/routes/RouterEngine.ts +++ b/packages/service-core/src/routes/RouterEngine.ts @@ -2,15 +2,8 @@ import { logger } from '@powersync/lib-services-framework'; import * as api from '../api/api-index.js'; -import { ADMIN_ROUTES } from './endpoints/admin.js'; -import { CHECKPOINT_ROUTES } from './endpoints/checkpointing.js'; -import { PROBES_ROUTES } from './endpoints/probes.js'; -import { syncStreamReactive } from './endpoints/socket-route.js'; -import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js'; -import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js'; import { SocketRouteGenerator } from './router-socket.js'; import { RouteDefinition } from './router.js'; -import { SyncContext } from '../sync/SyncContext.js'; export type RouterSetupResponse = { onShutdown: () => Promise; @@ -47,14 +40,25 @@ export class RouterEngine { this.cleanupHandler = null; this.closed = false; - // Default routes this.routes = { - api_routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...SYNC_RULES_ROUTES, ...PROBES_ROUTES], - stream_routes: [...SYNC_STREAM_ROUTES], - socket_routes: [syncStreamReactive] + api_routes: [], + stream_routes: [], + socket_routes: [] }; } + public registerRoutes(routes: Partial) { + this.routes.api_routes.push(...(routes.api_routes ?? [])); + this.routes.stream_routes.push(...(routes.stream_routes ?? [])); + this.routes.socket_routes.push(...(routes.socket_routes ?? [])); + } + + public get hasRoutes() { + return ( + this.routes.api_routes.length > 0 || this.routes.stream_routes.length > 0 || this.routes.socket_routes.length > 0 + ); + } + public registerAPI(api: api.RouteAPI) { if (this.api) { logger.warn('A RouteAPI has already been registered. Overriding existing implementation'); @@ -75,6 +79,12 @@ export class RouterEngine { */ async start(setup: RouterSetup) { logger.info('Starting Router Engine...'); + + if (!this.hasRoutes) { + logger.info('Router Engine will not start an HTTP server as no routes have been registered.'); + return; + } + const { onShutdown } = await setup(this.routes); this.cleanupHandler = onShutdown; logger.info('Successfully started Router Engine.'); diff --git a/packages/service-core/src/routes/configure-fastify.ts b/packages/service-core/src/routes/configure-fastify.ts index 5ac45c0a7..2a7263f3a 100644 --- a/packages/service-core/src/routes/configure-fastify.ts +++ b/packages/service-core/src/routes/configure-fastify.ts @@ -9,7 +9,7 @@ import { PROBES_ROUTES } from './endpoints/probes.js'; import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js'; import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js'; import { createRequestQueueHook, CreateRequestQueueParams } from './hooks.js'; -import { RouteDefinition, RouterServiceContext } from './router.js'; +import { RouteDefinition } from './router.js'; /** * A list of route definitions to be registered as endpoints. @@ -59,14 +59,9 @@ export function configureFastifyServer(server: fastify.FastifyInstance, options: const { service_context, routes = DEFAULT_ROUTE_OPTIONS } = options; const generateContext = async () => { - const { routerEngine } = service_context; - if (!routerEngine) { - throw new Error(`RouterEngine has not been registered`); - } - return { user_id: undefined, - service_context: service_context as RouterServiceContext + service_context: service_context }; }; diff --git a/packages/service-core/src/routes/configure-rsocket.ts b/packages/service-core/src/routes/configure-rsocket.ts index 58a932dd4..39bdddd3d 100644 --- a/packages/service-core/src/routes/configure-rsocket.ts +++ b/packages/service-core/src/routes/configure-rsocket.ts @@ -37,10 +37,6 @@ export function configureRSocket(router: ReactiveSocketRouter, options: throw new errors.AuthorizationError(ErrorCode.PSYNC_S2106, 'Authentication required'); } - if (!service_context.routerEngine) { - throw new Error(`RouterEngine has not been registered`); - } - return { token, user_agent, diff --git a/packages/service-core/src/routes/endpoints/admin.ts b/packages/service-core/src/routes/endpoints/admin.ts index 97fadbce7..2ec4fb492 100644 --- a/packages/service-core/src/routes/endpoints/admin.ts +++ b/packages/service-core/src/routes/endpoints/admin.ts @@ -19,7 +19,7 @@ export const executeSql = routeDefinition({ } } = payload; - const apiHandler = payload.context.service_context.routerEngine!.getAPI(); + const apiHandler = payload.context.service_context.routerEngine.getAPI(); const sourceConfig = await apiHandler.getSourceConfig(); if (!sourceConfig.debug_api) { @@ -47,7 +47,7 @@ export const diagnostics = routeDefinition({ const { service_context } = context; const include_content = payload.params.sync_rules_content ?? false; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const status = await apiHandler.getConnectionStatus(); if (!status) { @@ -94,7 +94,7 @@ export const getSchema = routeDefinition({ authorize: authApi, validator: schema.createTsCodecValidator(internal_routes.GetSchemaRequest, { allowAdditional: true }), handler: async (payload) => { - const apiHandler = payload.context.service_context.routerEngine!.getAPI(); + const apiHandler = payload.context.service_context.routerEngine.getAPI(); return internal_routes.GetSchemaResponse.encode(await api.getConnectionsSchema(apiHandler)); } @@ -112,7 +112,7 @@ export const reprocess = routeDefinition({ const { storageEngine: { activeBucketStorage } } = service_context; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const next = await activeBucketStorage.getNextSyncRules(apiHandler.getParseSyncRulesOptions()); if (next != null) { throw new Error(`Busy processing sync rules - cannot reprocess`); @@ -159,7 +159,7 @@ export const validate = routeDefinition({ context: { service_context } } = payload; const content = payload.params.sync_rules; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const schemaData = await api.getConnectionsSchema(apiHandler); const schema = new StaticSchema(schemaData.connections); diff --git a/packages/service-core/src/routes/endpoints/checkpointing.ts b/packages/service-core/src/routes/endpoints/checkpointing.ts index 7e707950a..64b884f73 100644 --- a/packages/service-core/src/routes/endpoints/checkpointing.ts +++ b/packages/service-core/src/routes/endpoints/checkpointing.ts @@ -18,7 +18,7 @@ export const writeCheckpoint = routeDefinition({ const { context: { service_context } } = payload; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); // This old API needs a persisted checkpoint id. // Since we don't use LSNs anymore, the only way to get that is to wait. @@ -54,7 +54,7 @@ export const writeCheckpoint2 = routeDefinition({ handler: async (payload) => { const { user_id, service_context } = payload.context; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const { replicationHead, writeCheckpoint } = await util.createWriteCheckpoint({ userId: user_id, diff --git a/packages/service-core/src/routes/endpoints/route-endpoints-index.ts b/packages/service-core/src/routes/endpoints/route-endpoints-index.ts index 07556a028..0f21a7b4c 100644 --- a/packages/service-core/src/routes/endpoints/route-endpoints-index.ts +++ b/packages/service-core/src/routes/endpoints/route-endpoints-index.ts @@ -1,5 +1,6 @@ export * from './admin.js'; export * from './checkpointing.js'; +export * from './probes.js'; export * from './socket-route.js'; export * from './sync-rules.js'; export * from './sync-stream.js'; diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 006ac4225..6a8e90ffe 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -33,7 +33,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => } }); - if (routerEngine!.closed) { + if (routerEngine.closed) { responder.onError( new errors.ServiceError({ status: 503, @@ -64,9 +64,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => return; } - const syncRules = bucketStorage.getParsedSyncRules(routerEngine!.getAPI().getParseSyncRulesOptions()); + const syncRules = bucketStorage.getParsedSyncRules(routerEngine.getAPI().getParseSyncRulesOptions()); - const removeStopHandler = routerEngine!.addStopHandler(() => { + const removeStopHandler = routerEngine.addStopHandler(() => { controller.abort(); }); diff --git a/packages/service-core/src/routes/endpoints/sync-rules.ts b/packages/service-core/src/routes/endpoints/sync-rules.ts index f08f4a4a1..4eb690fc5 100644 --- a/packages/service-core/src/routes/endpoints/sync-rules.ts +++ b/packages/service-core/src/routes/endpoints/sync-rules.ts @@ -53,7 +53,7 @@ export const deploySyncRules = routeDefinition({ const content = payload.params.content; try { - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); SqlSyncRules.fromYaml(payload.params.content, { ...apiHandler.getParseSyncRulesOptions(), // We don't do any schema-level validation at this point @@ -94,7 +94,7 @@ export const validateSyncRules = routeDefinition({ handler: async (payload) => { const content = payload.params.content; const { service_context } = payload.context; - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const info = await debugSyncRules(apiHandler, content); @@ -121,7 +121,7 @@ export const currentSyncRules = routeDefinition({ }); } - const apiHandler = service_context.routerEngine!.getAPI(); + const apiHandler = service_context.routerEngine.getAPI(); const info = await debugSyncRules(apiHandler, sync_rules.sync_rules_content); const next = await activeBucketStorage.getNextSyncRulesContent(); @@ -158,7 +158,7 @@ export const reprocessSyncRules = routeDefinition({ const { storageEngine: { activeBucketStorage } } = payload.context.service_context; - const apiHandler = payload.context.service_context.routerEngine!.getAPI(); + const apiHandler = payload.context.service_context.routerEngine.getAPI(); const sync_rules = await activeBucketStorage.getActiveSyncRules(apiHandler.getParseSyncRulesOptions()); if (sync_rules == null) { throw new errors.ServiceError({ diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 7ff4140ed..4e101705f 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -26,7 +26,7 @@ export const syncStreamed = routeDefinition({ const userAgent = headers['x-user-agent'] ?? headers['user-agent']; const clientId = payload.params.client_id; - if (routerEngine!.closed) { + if (routerEngine.closed) { throw new errors.ServiceError({ status: 503, code: ErrorCode.PSYNC_S2003, @@ -47,7 +47,7 @@ export const syncStreamed = routeDefinition({ }); } - const syncRules = bucketStorage.getParsedSyncRules(routerEngine!.getAPI().getParseSyncRulesOptions()); + const syncRules = bucketStorage.getParsedSyncRules(routerEngine.getAPI().getParseSyncRulesOptions()); const controller = new AbortController(); const tracker = new sync.RequestTracker(metricsEngine); @@ -72,7 +72,7 @@ export const syncStreamed = routeDefinition({ { objectMode: false, highWaterMark: 16 * 1024 } ); - const deregister = routerEngine!.addStopHandler(() => { + const deregister = routerEngine.addStopHandler(() => { // This error is not currently propagated to the client controller.abort(); stream.destroy(new Error('Shutting down system')); diff --git a/packages/service-core/src/routes/router.ts b/packages/service-core/src/routes/router.ts index fed544622..15f7f30b1 100644 --- a/packages/service-core/src/routes/router.ts +++ b/packages/service-core/src/routes/router.ts @@ -6,7 +6,8 @@ import { RouterEngine } from './RouterEngine.js'; /** * The {@link RouterEngine} must be provided for these routes */ -export type RouterServiceContext = ServiceContext & { routerEngine: RouterEngine }; +export type RouterServiceContext = ServiceContext; + /** * Common context for routes */ diff --git a/packages/service-core/src/runner/teardown.ts b/packages/service-core/src/runner/teardown.ts index c9832b6d6..af200c2ae 100644 --- a/packages/service-core/src/runner/teardown.ts +++ b/packages/service-core/src/runner/teardown.ts @@ -14,7 +14,11 @@ export async function teardown(runnerConfig: utils.RunnerConfig) { try { logger.info(`Tearing down PowerSync instance...`); const config = await utils.loadConfig(runnerConfig); - const serviceContext = new system.ServiceContextContainer(config); + const serviceContext = new system.ServiceContextContainer({ + serviceMode: system.ServiceContextMode.TEARDOWN, + configuration: config + }); + const moduleManager = container.getImplementation(modules.ModuleManager); await moduleManager.initialize(serviceContext); // This is mostly done to ensure that the storage is ready diff --git a/packages/service-core/src/system/ServiceContext.ts b/packages/service-core/src/system/ServiceContext.ts index 40887b474..a4a4d9843 100644 --- a/packages/service-core/src/system/ServiceContext.ts +++ b/packages/service-core/src/system/ServiceContext.ts @@ -6,18 +6,34 @@ import { PowerSyncMigrationManager } from '../migrations/PowerSyncMigrationManag import * as replication from '../replication/replication-index.js'; import * as routes from '../routes/routes-index.js'; import * as storage from '../storage/storage-index.js'; -import * as utils from '../util/util-index.js'; import { SyncContext } from '../sync/SyncContext.js'; +import * as utils from '../util/util-index.js'; export interface ServiceContext { configuration: utils.ResolvedPowerSyncConfig; lifeCycleEngine: LifeCycledSystem; metricsEngine: metrics.MetricsEngine; replicationEngine: replication.ReplicationEngine | null; - routerEngine: routes.RouterEngine | null; + routerEngine: routes.RouterEngine; storageEngine: storage.StorageEngine; migrations: PowerSyncMigrationManager; syncContext: SyncContext; + serviceMode: ServiceContextMode; +} + +export enum ServiceContextMode { + API = utils.ServiceRunner.API, + SYNC = utils.ServiceRunner.SYNC, + UNIFIED = utils.ServiceRunner.UNIFIED, + COMPACT = 'compact', + MIGRATION = 'migration', + TEARDOWN = 'teardown', + TEST_CONNECTION = 'test-connection' +} + +export interface ServiceContextOptions { + serviceMode: ServiceContextMode; + configuration: utils.ResolvedPowerSyncConfig; } /** @@ -26,11 +42,18 @@ export interface ServiceContext { * This controls registering, initializing and the lifecycle of various services. */ export class ServiceContextContainer implements ServiceContext { + configuration: utils.ResolvedPowerSyncConfig; lifeCycleEngine: LifeCycledSystem; storageEngine: storage.StorageEngine; syncContext: SyncContext; + routerEngine: routes.RouterEngine; + serviceMode: ServiceContextMode; + + constructor(options: ServiceContextOptions) { + this.serviceMode = options.serviceMode; + const { configuration } = options; + this.configuration = configuration; - constructor(public configuration: utils.ResolvedPowerSyncConfig) { this.lifeCycleEngine = new LifeCycledSystem(); this.storageEngine = new storage.StorageEngine({ @@ -42,6 +65,11 @@ export class ServiceContextContainer implements ServiceContext { stop: (storageEngine) => storageEngine.shutDown() }); + this.routerEngine = new routes.RouterEngine(); + this.lifeCycleEngine.withLifecycle(this.routerEngine, { + stop: (routerEngine) => routerEngine.shutDown() + }); + this.syncContext = new SyncContext({ maxDataFetchConcurrency: configuration.api_parameters.max_data_fetch_concurrency, maxBuckets: configuration.api_parameters.max_buckets_per_connection, @@ -55,21 +83,12 @@ export class ServiceContextContainer implements ServiceContext { // Migrations should be executed before the system starts start: () => migrationManager[Symbol.asyncDispose]() }); - - this.lifeCycleEngine.withLifecycle(this.storageEngine, { - start: (storageEngine) => storageEngine.start(), - stop: (storageEngine) => storageEngine.shutDown() - }); } get replicationEngine(): replication.ReplicationEngine | null { return container.getOptional(replication.ReplicationEngine); } - get routerEngine(): routes.RouterEngine | null { - return container.getOptional(routes.RouterEngine); - } - get metricsEngine(): metrics.MetricsEngine { return container.getImplementation(metrics.MetricsEngine); } diff --git a/packages/service-core/src/util/config/collectors/config-collector.ts b/packages/service-core/src/util/config/collectors/config-collector.ts index 940e17439..649bc4391 100644 --- a/packages/service-core/src/util/config/collectors/config-collector.ts +++ b/packages/service-core/src/util/config/collectors/config-collector.ts @@ -4,24 +4,13 @@ import { schema } from '@powersync/lib-services-framework'; import { configFile } from '@powersync/service-types'; import { RunnerConfig } from '../types.js'; +import { YamlEnvTag } from './impl/yaml-env.js'; export enum ConfigFileFormat { YAML = 'yaml', JSON = 'json' } -/** - * Environment variables can be substituted into the YAML config - * when parsing if the environment variable name starts with this prefix. - * Attempting to substitute any other environment variable will throw an exception. - * - * Example of substitution: - * storage: - * type: mongodb - * uri: !env PS_MONGO_URI - */ -const YAML_ENV_PREFIX = 'PS_'; - // ts-codec itself doesn't give great validation errors, so we use json schema for that const configSchemaValidator = schema.parseJSONSchema(configFile.PowerSyncConfigJSONSchema).validator(); @@ -100,27 +89,7 @@ export abstract class ConfigCollector { schema: 'core', keepSourceTokens: true, lineCounter, - customTags: [ - { - tag: '!env', - resolve(envName: string, onError: (error: string) => void) { - if (!envName.startsWith(YAML_ENV_PREFIX)) { - onError( - `Attempting to substitute environment variable ${envName} is not allowed. Variables must start with "${YAML_ENV_PREFIX}"` - ); - return envName; - } - const value = process.env[envName]; - if (typeof value == 'undefined') { - onError( - `Attempted to substitute environment variable "${envName}" which is undefined. Set this variable on the environment.` - ); - return envName; - } - return value; - } - } - ] + customTags: [YamlEnvTag] }); if (parsed.errors.length) { diff --git a/packages/service-core/src/util/config/collectors/impl/yaml-env.ts b/packages/service-core/src/util/config/collectors/impl/yaml-env.ts new file mode 100644 index 000000000..559fccd0e --- /dev/null +++ b/packages/service-core/src/util/config/collectors/impl/yaml-env.ts @@ -0,0 +1,67 @@ +import * as yaml from 'yaml'; + +/** + * Environment variables can be substituted into the YAML config + * when parsing if the environment variable name starts with this prefix. + * Attempting to substitute any other environment variable will throw an exception. + * + * Example of substitution: + * storage: + * type: mongodb + * uri: !env PS_MONGO_URI + */ +const YAML_ENV_PREFIX = 'PS_'; + +/** + * Custom YAML tag which performs string environment variable substitution + * Allows for type casting string environment variables to boolean or number + * by using the syntax !env PS_MONGO_PORT::number or !env PS_USE_SUPABASE::boolean + */ +export const YamlEnvTag: yaml.ScalarTag = { + tag: '!env', + resolve(envName: string, onError: (error: string) => void) { + if (!envName.startsWith(YAML_ENV_PREFIX)) { + onError( + `Attempting to substitute environment variable ${envName} is not allowed. Variables must start with "${YAML_ENV_PREFIX}"` + ); + return envName; + } + + // allow type casting if the envName contains a type suffix + // e.g. PS_MONGO_PORT::number or PS_USE_SUPABASE::boolean + const [name, type = 'string'] = envName.split('::'); + + let value = process.env[name]; + + if (typeof value == 'undefined') { + onError( + `Attempted to substitute environment variable "${envName}" which is undefined. Set this variable on the environment.` + ); + return envName; + } + + switch (type) { + case 'string': + return value; + case 'number': + const numberValue = Number(value); + if (Number.isNaN(numberValue)) { + onError(`Environment variable "${envName}" is not a valid number. Got: "${value}".`); + return envName; + } + return numberValue; + case 'boolean': + if (value?.toLowerCase() == 'true') { + return true; + } else if (value?.toLowerCase() == 'false') { + return false; + } else { + onError(`Environment variable "${envName}" is not a boolean. Expected "true" or "false", got "${value}".`); + return envName; + } + default: + onError(`Environment variable "${envName}" has an invalid type suffix "${type}".`); + return envName; + } + } +}; diff --git a/packages/service-core/src/util/config/compound-config-collector.ts b/packages/service-core/src/util/config/compound-config-collector.ts index dab753e0a..742800d9b 100644 --- a/packages/service-core/src/util/config/compound-config-collector.ts +++ b/packages/service-core/src/util/config/compound-config-collector.ts @@ -5,11 +5,6 @@ import { ConfigCollector } from './collectors/config-collector.js'; import { Base64ConfigCollector } from './collectors/impl/base64-config-collector.js'; import { FallbackConfigCollector } from './collectors/impl/fallback-config-collector.js'; import { FileSystemConfigCollector } from './collectors/impl/filesystem-config-collector.js'; -import { Base64SyncRulesCollector } from './sync-rules/impl/base64-sync-rules-collector.js'; -import { FileSystemSyncRulesCollector } from './sync-rules/impl/filesystem-sync-rules-collector.js'; -import { InlineSyncRulesCollector } from './sync-rules/impl/inline-sync-rules-collector.js'; -import { SyncRulesCollector } from './sync-rules/sync-collector.js'; -import { ResolvedPowerSyncConfig, RunnerConfig, SyncRulesConfig } from './types.js'; import { DEFAULT_MAX_BUCKETS_PER_CONNECTION, DEFAULT_MAX_CONCURRENT_CONNECTIONS, @@ -17,6 +12,11 @@ import { DEFAULT_MAX_PARAMETER_QUERY_RESULTS, DEFAULT_MAX_POOL_SIZE } from './defaults.js'; +import { Base64SyncRulesCollector } from './sync-rules/impl/base64-sync-rules-collector.js'; +import { FileSystemSyncRulesCollector } from './sync-rules/impl/filesystem-sync-rules-collector.js'; +import { InlineSyncRulesCollector } from './sync-rules/impl/inline-sync-rules-collector.js'; +import { SyncRulesCollector } from './sync-rules/sync-collector.js'; +import { ResolvedPowerSyncConfig, RunnerConfig, SyncRulesConfig } from './types.js'; export type CompoundConfigCollectorOptions = { /** @@ -159,6 +159,23 @@ export class CompoundConfigCollector { internal_service_endpoint: baseConfig.telemetry?.internal_service_endpoint ?? 'https://pulse.journeyapps.com/v1/metrics' }, + healthcheck: { + /** + * Default to legacy mode if no probes config is provided. + * If users provide a config, all options require explicit opt-in. + */ + probes: baseConfig.healthcheck?.probes + ? { + use_filesystem: baseConfig.healthcheck.probes.use_filesystem ?? false, + use_http: baseConfig.healthcheck.probes.use_http ?? false, + use_legacy: baseConfig.healthcheck.probes.use_legacy ?? false + } + : { + use_filesystem: false, + use_http: false, + use_legacy: true + } + }, api_parameters: { max_buckets_per_connection: baseConfig.api?.parameters?.max_buckets_per_connection ?? DEFAULT_MAX_BUCKETS_PER_CONNECTION, diff --git a/packages/service-core/src/util/config/types.ts b/packages/service-core/src/util/config/types.ts index 56b834efb..614b8e2b0 100644 --- a/packages/service-core/src/util/config/types.ts +++ b/packages/service-core/src/util/config/types.ts @@ -69,5 +69,18 @@ export type ResolvedPowerSyncConfig = { /** Prefix for postgres replication slot names. May eventually be connection-specific. */ slot_name_prefix: string; + + healthcheck: { + probes: { + use_filesystem: boolean; + use_http: boolean; + /** + * @deprecated This maintains backwards compatibility with the legacy default. + * Explicit probe configuration should be used instead. + */ + use_legacy: boolean; + }; + }; + parameters: Record; }; diff --git a/packages/service-core/test/src/config.test.ts b/packages/service-core/test/src/config.test.ts new file mode 100644 index 000000000..104ff1a00 --- /dev/null +++ b/packages/service-core/test/src/config.test.ts @@ -0,0 +1,72 @@ +// Vitest Unit Tests +import { CompoundConfigCollector } from '@/index.js'; +import { describe, expect, it, vi } from 'vitest'; + +describe('Config', () => { + it('should substitute env variables in YAML config', {}, async () => { + const type = 'mongodb'; + vi.stubEnv('PS_MONGO_TYPE', type); + + const yamlConfig = /* yaml */ ` + # PowerSync config + replication: + connections: [] + storage: + type: !env PS_MONGO_TYPE + `; + + const collector = new CompoundConfigCollector(); + + const config = await collector.collectConfig({ + config_base64: Buffer.from(yamlConfig, 'utf-8').toString('base64') + }); + + expect(config.storage.type).toBe(type); + }); + + it('should substitute boolean env variables in YAML config', {}, async () => { + vi.stubEnv('PS_MONGO_HEALTHCHECK', 'true'); + + const yamlConfig = /* yaml */ ` + # PowerSync config + replication: + connections: [] + storage: + type: mongodb + healthcheck: + probes: + use_http: !env PS_MONGO_HEALTHCHECK::boolean + `; + + const collector = new CompoundConfigCollector(); + + const config = await collector.collectConfig({ + config_base64: Buffer.from(yamlConfig, 'utf-8').toString('base64') + }); + + expect(config.healthcheck.probes.use_http).toBe(true); + }); + + it('should substitute number env variables in YAML config', {}, async () => { + vi.stubEnv('PS_MAX_BUCKETS', '1'); + + const yamlConfig = /* yaml */ ` + # PowerSync config + replication: + connections: [] + storage: + type: mongodb + api: + parameters: + max_buckets_per_connection: !env PS_MAX_BUCKETS::number + `; + + const collector = new CompoundConfigCollector(); + + const config = await collector.collectConfig({ + config_base64: Buffer.from(yamlConfig, 'utf-8').toString('base64') + }); + + expect(config.api_parameters.max_buckets_per_connection).toBe(1); + }); +}); diff --git a/packages/types/src/config/PowerSyncConfig.ts b/packages/types/src/config/PowerSyncConfig.ts index 1cc37287c..cd9e288ef 100644 --- a/packages/types/src/config/PowerSyncConfig.ts +++ b/packages/types/src/config/PowerSyncConfig.ts @@ -477,6 +477,34 @@ export const powerSyncConfig = t }) .optional(), + healthcheck: t + .object({ + probes: t + .object({ + use_filesystem: t.boolean + .meta({ + description: `Enables exposing healthcheck status via filesystem files.` + }) + .optional(), + use_http: t.boolean + .meta({ + description: `Enables exposing healthcheck status via HTTP endpoints.` + }) + .optional(), + use_legacy: t.boolean + .meta({ + description: dedent` + Deprecated. + Enables HTTP probes for both API and UNIFIED service modes. FileSystem probes are always enabled. + ` + }) + .optional() + }) + .meta({ description: 'Mechanisms for exposing health check data.' }) + .optional() + }) + .optional(), + parameters: t .record(t.number.or(t.string).or(t.boolean).or(t.Null)) .meta({ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 68f8349bf..6f0c20caa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -158,6 +158,27 @@ importers: specifier: ^3.0.5 version: 3.0.5(@types/node@22.13.1)(yaml@2.5.0) + modules/module-core: + dependencies: + '@fastify/cors': + specifier: 8.4.1 + version: 8.4.1 + '@powersync/lib-services-framework': + specifier: workspace:* + version: link:../../libs/lib-services + '@powersync/service-core': + specifier: workspace:* + version: link:../../packages/service-core + '@powersync/service-rsocket-router': + specifier: workspace:* + version: link:../../packages/rsocket-router + '@powersync/service-types': + specifier: workspace:* + version: link:../../packages/types + fastify: + specifier: 4.23.2 + version: 4.23.2 + modules/module-mongodb: dependencies: '@powersync/lib-service-mongodb': @@ -654,15 +675,15 @@ importers: service: dependencies: - '@fastify/cors': - specifier: 8.4.1 - version: 8.4.1 '@powersync/lib-services-framework': specifier: workspace:* version: link:../libs/lib-services '@powersync/service-core': specifier: workspace:* version: link:../packages/service-core + '@powersync/service-module-core': + specifier: workspace:* + version: link:../modules/module-core '@powersync/service-module-mongodb': specifier: workspace:* version: link:../modules/module-mongodb @@ -684,9 +705,6 @@ importers: '@sentry/node': specifier: ^8.9.2 version: 8.17.0 - fastify: - specifier: 4.23.2 - version: 4.23.2 devDependencies: '@sentry/types': specifier: ^8.9.2 diff --git a/service/Dockerfile b/service/Dockerfile index 8c8fb33f9..7b5e3247d 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -16,6 +16,7 @@ COPY libs/lib-services/package.json libs/lib-services/tsconfig.json libs/lib-ser COPY libs/lib-mongodb/package.json libs/lib-mongodb/tsconfig.json libs/lib-mongodb/ COPY libs/lib-postgres/package.json libs/lib-postgres/tsconfig.json libs/lib-postgres/ +COPY modules/module-core/package.json modules/module-core/tsconfig.json modules/module-core/ COPY modules/module-postgres/package.json modules/module-postgres/tsconfig.json modules/module-postgres/ COPY modules/module-postgres-storage/package.json modules/module-postgres-storage/tsconfig.json modules/module-postgres-storage/ COPY modules/module-mongodb/package.json modules/module-mongodb/tsconfig.json modules/module-mongodb/ @@ -41,6 +42,7 @@ COPY libs/lib-services/src libs/lib-services/src/ COPY libs/lib-mongodb/src libs/lib-mongodb/src/ COPY libs/lib-postgres/src libs/lib-postgres/src/ +COPY modules/module-core/src modules/module-core/src/ COPY modules/module-postgres/src modules/module-postgres/src/ COPY modules/module-postgres-storage/src modules/module-postgres-storage/src/ COPY modules/module-mongodb/src modules/module-mongodb/src/ diff --git a/service/package.json b/service/package.json index b9a5f0188..2a82cecc3 100644 --- a/service/package.json +++ b/service/package.json @@ -10,7 +10,6 @@ "clean": "rm -rf ./lib && tsc -b --clean" }, "dependencies": { - "@fastify/cors": "8.4.1", "@powersync/service-core": "workspace:*", "@powersync/lib-services-framework": "workspace:*", "@powersync/service-module-postgres": "workspace:*", @@ -19,8 +18,8 @@ "@powersync/service-module-mongodb-storage": "workspace:*", "@powersync/service-module-mysql": "workspace:*", "@powersync/service-rsocket-router": "workspace:*", - "@sentry/node": "^8.9.2", - "fastify": "4.23.2" + "@powersync/service-module-core": "workspace:*", + "@sentry/node": "^8.9.2" }, "devDependencies": { "@sentry/types": "^8.9.2", diff --git a/service/src/entry.ts b/service/src/entry.ts index 06a062a5b..61b943f17 100644 --- a/service/src/entry.ts +++ b/service/src/entry.ts @@ -1,6 +1,7 @@ import { container, ContainerImplementation } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; +import { CoreModule } from '@powersync/service-module-core'; import { MongoModule } from '@powersync/service-module-mongodb'; import { MongoStorageModule } from '@powersync/service-module-mongodb-storage'; import { MySQLModule } from '@powersync/service-module-mysql'; @@ -17,10 +18,11 @@ container.register(ContainerImplementation.REPORTER, createSentryReporter()); const moduleManager = new core.modules.ModuleManager(); moduleManager.register([ - new PostgresModule(), - new MySQLModule(), + new CoreModule(), new MongoModule(), new MongoStorageModule(), + new MySQLModule(), + new PostgresModule(), new PostgresStorageModule() ]); // This is a bit of a hack. Commands such as the teardown command or even migrations might diff --git a/service/src/runners/server.ts b/service/src/runners/server.ts index 6eff0cfce..5876a6977 100644 --- a/service/src/runners/server.ts +++ b/service/src/runners/server.ts @@ -1,76 +1,7 @@ -import cors from '@fastify/cors'; -import fastify from 'fastify'; - import { container, logger } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; - -import { ReactiveSocketRouter } from '@powersync/service-rsocket-router'; import { logBooting } from '../util/version.js'; -/** - * Configures the server portion on a {@link ServiceContext} - */ -export function registerServerServices(serviceContext: core.system.ServiceContextContainer) { - serviceContext.register(core.routes.RouterEngine, new core.routes.RouterEngine()); - serviceContext.lifeCycleEngine.withLifecycle(serviceContext.routerEngine!, { - start: async (routerEngine) => { - await routerEngine!.start(async (routes) => { - const server = fastify.fastify(); - - server.register(cors, { - origin: '*', - allowedHeaders: ['Content-Type', 'Authorization', 'User-Agent', 'X-User-Agent'], - exposedHeaders: ['Content-Type'], - // Cache time for preflight response - maxAge: 3600 - }); - - core.routes.configureFastifyServer(server, { - service_context: serviceContext, - routes: { - api: { routes: routes.api_routes }, - sync_stream: { - routes: routes.stream_routes, - queue_options: { - concurrency: serviceContext.configuration.api_parameters.max_concurrent_connections, - max_queue_depth: 0 - } - } - } - }); - - const socketRouter = new ReactiveSocketRouter({ - max_concurrent_connections: serviceContext.configuration.api_parameters.max_concurrent_connections - }); - - core.routes.configureRSocket(socketRouter, { - server: server.server, - service_context: serviceContext, - route_generators: routes.socket_routes - }); - - const { port } = serviceContext.configuration; - - await server.listen({ - host: '0.0.0.0', - port - }); - - logger.info(`Running on port ${port}`); - - return { - onShutdown: async () => { - logger.info('Shutting down HTTP server...'); - await server.close(); - logger.info('HTTP server stopped'); - } - }; - }); - }, - stop: (routerEngine) => routerEngine!.shutDown() - }); -} - /** * Starts an API server */ @@ -78,14 +9,9 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) { logBooting('API Container'); const config = await core.utils.loadConfig(runnerConfig); - core.utils.setTags(config.metadata); - const serviceContext = new core.system.ServiceContextContainer(config); - - registerServerServices(serviceContext); - - await core.metrics.registerMetrics({ - service_context: serviceContext, - modes: [core.metrics.MetricModes.API] + const serviceContext = new core.system.ServiceContextContainer({ + serviceMode: core.system.ServiceContextMode.API, + configuration: config }); const moduleManager = container.getImplementation(core.modules.ModuleManager); diff --git a/service/src/runners/stream-worker.ts b/service/src/runners/stream-worker.ts index ed27fef71..e9197a376 100644 --- a/service/src/runners/stream-worker.ts +++ b/service/src/runners/stream-worker.ts @@ -20,18 +20,14 @@ export const startStreamRunner = async (runnerConfig: core.utils.RunnerConfig) = logBooting('Replication Container'); const config = await core.utils.loadConfig(runnerConfig); - core.utils.setTags(config.metadata); - // Self-hosted version allows for automatic migrations - const serviceContext = new core.system.ServiceContextContainer(config); + const serviceContext = new core.system.ServiceContextContainer({ + serviceMode: core.system.ServiceContextMode.SYNC, + configuration: config + }); registerReplicationServices(serviceContext); - await core.metrics.registerMetrics({ - service_context: serviceContext, - modes: [core.metrics.MetricModes.REPLICATION, core.metrics.MetricModes.STORAGE] - }); - const moduleManager = container.getImplementation(core.modules.ModuleManager); await moduleManager.initialize(serviceContext); diff --git a/service/src/runners/unified-runner.ts b/service/src/runners/unified-runner.ts index b5767b5a4..c008286c9 100644 --- a/service/src/runners/unified-runner.ts +++ b/service/src/runners/unified-runner.ts @@ -1,9 +1,8 @@ import { container, logger } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; -import { registerServerServices } from './server.js'; -import { registerReplicationServices } from './stream-worker.js'; import { logBooting } from '../util/version.js'; +import { registerReplicationServices } from './stream-worker.js'; /** * Starts an API server @@ -12,17 +11,13 @@ export const startUnifiedRunner = async (runnerConfig: core.utils.RunnerConfig) logBooting('Unified Container'); const config = await core.utils.loadConfig(runnerConfig); - core.utils.setTags(config.metadata); - const serviceContext = new core.system.ServiceContextContainer(config); + const serviceContext = new core.system.ServiceContextContainer({ + serviceMode: core.system.ServiceContextMode.UNIFIED, + configuration: config + }); - registerServerServices(serviceContext); registerReplicationServices(serviceContext); - await core.metrics.registerMetrics({ - service_context: serviceContext, - modes: [core.metrics.MetricModes.API, core.metrics.MetricModes.REPLICATION, core.metrics.MetricModes.STORAGE] - }); - const moduleManager = container.getImplementation(core.modules.ModuleManager); await moduleManager.initialize(serviceContext); diff --git a/service/src/util/version.ts b/service/src/util/version.ts index 887b86921..ecbdcebf8 100644 --- a/service/src/util/version.ts +++ b/service/src/util/version.ts @@ -1,8 +1,9 @@ import { logger } from '@powersync/lib-services-framework'; +import pkg from '@powersync/service-core/package.json' with { type: 'json' }; + export function logBooting(runner: string) { - const version = process.env.POWERSYNC_VERSION ?? '-dev'; - const isCloud = process.env.MICRO_SERVICE_NAME == 'powersync'; - const edition = isCloud ? 'Cloud Edition' : 'Enterprise Edition'; + const version = pkg.version; + const edition = 'Open Edition'; logger.info(`Booting PowerSync Service v${version}, ${runner}, ${edition}`, { version, edition, runner }); } diff --git a/service/tsconfig.json b/service/tsconfig.json index e92cccf9c..6a9560f45 100644 --- a/service/tsconfig.json +++ b/service/tsconfig.json @@ -30,6 +30,9 @@ { "path": "../libs/lib-services" }, + { + "path": "../modules/module-core" + }, { "path": "../modules/module-postgres" },