Skip to content

Commit 6ffb0d7

Browse files
cevianclaude
andcommitted
refactor: centralize schema resolution, create connections table during scaffolding
- Add getAppSchema() that reads DATABASE_SCHEMA from .env (single source of truth) - Replace all ad-hoc schema lookups (dev-server, MCP tool, CLI) with getAppSchema() - Move ensureConnectionsTable call into setupAppSchema (scaffolding) only - Remove redundant ensureConnectionsTable from factory.ts and dev-server.ts - Remove ensureConnectionsTable from public API exports and factory test mock Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d003224 commit 6ffb0d7

File tree

8 files changed

+51
-20
lines changed

8 files changed

+51
-20
lines changed

packages/core/src/__tests__/factory.test.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@ import { create0pflow } from "../factory.js";
55
import { Workflow } from "../workflow.js";
66
import { Node } from "../node.js";
77

8-
// Mock connections schema (avoids pg connection in tests)
9-
vi.mock("../connections/schema.js", () => ({
10-
ensureConnectionsTable: vi.fn().mockResolvedValue(undefined),
11-
}));
12-
138
// Mock DBOS
149
vi.mock("@dbos-inc/dbos-sdk", () => ({
1510
DBOS: {

packages/core/src/cli/app.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// packages/cli/src/app.ts
22
import fs from "fs";
33
import path from "path";
4+
import * as dotenv from "dotenv";
45

56
/**
67
* Get the app name from package.json in cwd
@@ -20,3 +21,36 @@ export function getAppName(): string | undefined {
2021
return undefined;
2122
}
2223
}
24+
25+
/**
26+
* Get the app schema name from DATABASE_SCHEMA in the project's .env file.
27+
* This is the canonical source — written by setup_app_schema.
28+
* @param projectRoot Directory containing the .env file (defaults to cwd)
29+
*/
30+
export function getAppSchema(projectRoot?: string): string {
31+
const envPath = path.join(projectRoot ?? process.cwd(), ".env");
32+
33+
if (!fs.existsSync(envPath)) {
34+
throw new Error(
35+
"DATABASE_SCHEMA not found in .env file. " +
36+
"Run setup_app_schema to configure the database.",
37+
);
38+
}
39+
40+
try {
41+
const env = dotenv.parse(fs.readFileSync(envPath, "utf-8"));
42+
if (!env.DATABASE_SCHEMA) {
43+
throw new Error(
44+
"DATABASE_SCHEMA not found in .env file. " +
45+
"Run setup_app_schema to configure the database.",
46+
);
47+
}
48+
return env.DATABASE_SCHEMA;
49+
} catch (err) {
50+
if (err instanceof Error && err.message.includes("DATABASE_SCHEMA")) throw err;
51+
throw new Error(
52+
"DATABASE_SCHEMA not found in .env file. " +
53+
"Run setup_app_schema to configure the database.",
54+
);
55+
}
56+
}

packages/core/src/cli/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { discoverWorkflows, discoverNodes } from "./discovery.js";
1212
import { resolveEnv } from "./env.js";
1313
import { listRuns, getRun } from "./runs.js";
1414
import { getTrace, printTrace } from "./trace.js";
15-
import { getAppName } from "./app.js";
15+
import { getAppName, getAppSchema } from "./app.js";
1616
import { startMcpServer } from "./mcp/server.js";
1717
import { runInstall, runUninstall } from "./install.js";
1818
import { runRun } from "./run.js";
@@ -209,7 +209,7 @@ workflow
209209

210210
const pflow = await create0pflow({
211211
databaseUrl: process.env.DATABASE_URL!,
212-
appName: getAppName() ?? "opflow",
212+
appName: getAppSchema(),
213213
workflows: workflowRegistry,
214214
nodes,
215215
});
@@ -347,7 +347,7 @@ node
347347

348348
const pflow = await create0pflow({
349349
databaseUrl: process.env.DATABASE_URL!,
350-
appName: getAppName() ?? "opflow",
350+
appName: getAppSchema(),
351351
nodes,
352352
});
353353

packages/core/src/cli/mcp/lib/scaffolding.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import * as dotenv from "dotenv";
77
import pg from "pg";
88
import { packageRoot, version } from "../config.js";
99
import { writeAppTemplates, create0pflowDirectories } from "../lib/templates.js";
10+
import { ensureConnectionsTable } from "../../../connections/schema.js";
1011

1112
const execAsync = (cmd: string, cwd?: string) =>
1213
new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
@@ -319,6 +320,9 @@ export async function setupAppSchema({
319320
.join("\n");
320321

321322
await writeFile(envPath, `${newEnvContent}\n`);
323+
324+
// Create the opflow_connections table so it's ready before the dev UI launches
325+
await ensureConnectionsTable(appDatabaseUrl, appName);
322326
} catch (err) {
323327
const error = err as Error;
324328
return {

packages/core/src/cli/mcp/tools/getConnectionInfo.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import pg from "pg";
77
import type { ServerContext } from "../types.js";
88
import { createIntegrationProvider } from "../../../connections/integration-provider.js";
99
import { resolveConnectionId } from "../../../connections/resolver.js";
10-
import { getAppName } from "../../app.js";
10+
import { getAppSchema } from "../../app.js";
1111

1212
const inputSchema = {
1313
integration_id: z
@@ -100,7 +100,7 @@ export const getConnectionInfoFactory: ApiFactory<
100100

101101
// Look up connection_id using the same resolution as runtime:
102102
// exact (workflow_name, node_name) match first, then global (* / *) fallback
103-
const appSchema = env.DATABASE_SCHEMA ?? getAppName();
103+
const appSchema = getAppSchema();
104104
let connectionId: string | null = null;
105105
const pool = new pg.Pool({ connectionString: databaseUrl, max: 1 });
106106
try {

packages/core/src/dev-ui/dev-server.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import { createWSServer, type WSClientMessage } from "./ws.js";
77
import { createWatcher } from "./watcher.js";
88
import type { PtyManager } from "./pty.js";
99
import { handleApiRequest } from "./api.js";
10-
import { ensureConnectionsTable } from "../connections/index.js";
1110
import { createIntegrationProvider } from "../connections/integration-provider.js";
1211
import { getSchemaName } from "../dbos.js";
12+
import { getAppSchema } from "../cli/app.js";
1313
import pg from "pg";
1414

1515
const MIME: Record<string, string> = {
@@ -49,20 +49,21 @@ export async function startDevServer(options: DevServerOptions) {
4949
}
5050
const clientDir = resolve(pkgRoot, "dist/dev-ui-client");
5151

52-
// Resolve DBOS schema name from project's package.json
52+
// Read app schema from project's .env (DATABASE_SCHEMA, written by setup_app_schema)
5353
const projectPkgPath = resolve(projectRoot, "package.json");
54-
let appName: string | undefined;
54+
let pkgName: string | undefined;
5555
try {
56-
appName = JSON.parse(readFileSync(projectPkgPath, "utf-8")).name;
56+
pkgName = JSON.parse(readFileSync(projectPkgPath, "utf-8")).name;
5757
} catch { /* use default */ }
58-
const dbosSchema = getSchemaName(appName);
58+
59+
const appSchema = getAppSchema(projectRoot);
60+
const dbosSchema = getSchemaName(pkgName);
5961

6062
// Set up API context if database is configured
6163
const hasApi = !!(options.databaseUrl);
6264
let pool: pg.Pool | null = null;
6365

6466
if (hasApi) {
65-
await ensureConnectionsTable(options.databaseUrl!, appName ?? "opflow");
6667
pool = new pg.Pool({ connectionString: options.databaseUrl! });
6768
}
6869

@@ -82,7 +83,7 @@ export async function startDevServer(options: DevServerOptions) {
8283
pool,
8384
integrationProvider: integrationProvider!,
8485
schema: dbosSchema,
85-
appSchema: appName ?? "opflow",
86+
appSchema,
8687
});
8788
if (handled) return;
8889
} catch (err) {

packages/core/src/factory.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { initializeDBOS, shutdownDBOS } from "./dbos.js";
55
import { NodeRegistry } from "./nodes/registry.js";
66
import { configureAgentRuntime } from "./agent.js";
77
import { Workflow, configureWorkflowRuntime, type NodeWrapper } from "./workflow.js";
8-
import { ensureConnectionsTable } from "./connections/index.js";
98
import { createIntegrationProvider } from "./connections/integration-provider.js";
109
import pg from "pg";
1110

@@ -43,7 +42,6 @@ export async function create0pflow(config: PflowConfig): Promise<Pflow> {
4342
// Create shared pg pool for connection management (needed for local connection mapping)
4443
const pool = new pg.Pool({ connectionString: config.databaseUrl });
4544
const appSchema = config.appName;
46-
await ensureConnectionsTable(config.databaseUrl, appSchema);
4745

4846
// Configure workflow runtime with pool + integration provider
4947
configureWorkflowRuntime(pool, integrationProvider, appSchema);

packages/core/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ export type {
4949

5050
// Connection management
5151
export {
52-
ensureConnectionsTable,
5352
resolveConnectionId,
5453
upsertConnection,
5554
listConnections,

0 commit comments

Comments
 (0)