Skip to content

Commit f347f7d

Browse files
committed
make sys db schema configurable
1 parent 0c8e174 commit f347f7d

File tree

6 files changed

+264
-110
lines changed

6 files changed

+264
-110
lines changed

dbos/client.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ import (
1414
)
1515

1616
type ClientConfig struct {
17-
DatabaseURL string // Connection URL for the PostgreSQL database
18-
Logger *slog.Logger // Optional custom logger
19-
SystemDBPool *pgxpool.Pool // Optional existing connection pool for the system database
17+
DatabaseURL string // Connection URL for the PostgreSQL database
18+
DatabaseSchema string // Database schema name (defaults to "dbos")
19+
Logger *slog.Logger // Optional custom logger
20+
SystemDBPool *pgxpool.Pool // Optional existing connection pool for the system database
2021
}
2122

2223
// Client provides a programmatic way to interact with your DBOS application from external code.
@@ -52,10 +53,11 @@ type client struct {
5253
// }
5354
func NewClient(ctx context.Context, config ClientConfig) (Client, error) {
5455
dbosCtx, err := NewDBOSContext(ctx, Config{
55-
DatabaseURL: config.DatabaseURL,
56-
AppName: "dbos-client",
57-
Logger: config.Logger,
58-
SystemDBPool: config.SystemDBPool,
56+
DatabaseURL: config.DatabaseURL,
57+
DatabaseSchema: config.DatabaseSchema,
58+
AppName: "dbos-client",
59+
Logger: config.Logger,
60+
SystemDBPool: config.SystemDBPool,
5961
})
6062
if err != nil {
6163
return nil, err

dbos/client_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,25 @@ func TestForkWorkflow(t *testing.T) {
690690
}
691691

692692
func TestListWorkflows(t *testing.T) {
693-
// Setup server context
694-
serverCtx := setupDBOS(t, true, true)
693+
// Setup server context with custom schema
694+
databaseURL := getDatabaseURL()
695+
resetTestDatabase(t, databaseURL)
696+
697+
customSchema := "dbos_list_test"
698+
serverCtx, err := NewDBOSContext(context.Background(), Config{
699+
DatabaseURL: databaseURL,
700+
AppName: "test-list-workflows",
701+
DatabaseSchema: customSchema,
702+
})
703+
require.NoError(t, err)
704+
require.NotNil(t, serverCtx)
705+
706+
// Register cleanup for server context
707+
t.Cleanup(func() {
708+
if serverCtx != nil {
709+
serverCtx.Shutdown(30 * time.Second)
710+
}
711+
})
695712

696713
// Create queue for communication
697714
queue := NewWorkflowQueue(serverCtx, "list-workflows-queue")
@@ -711,13 +728,13 @@ func TestListWorkflows(t *testing.T) {
711728
RegisterWorkflow(serverCtx, simpleWorkflow, WithWorkflowName("SimpleWorkflow"))
712729

713730
// Launch server
714-
err := serverCtx.Launch()
731+
err = serverCtx.Launch()
715732
require.NoError(t, err)
716733

717-
// Setup client
718-
databaseURL := getDatabaseURL()
734+
// Setup client with same custom schema
719735
config := ClientConfig{
720-
DatabaseURL: databaseURL,
736+
DatabaseURL: databaseURL,
737+
DatabaseSchema: customSchema,
721738
}
722739
client, err := NewClient(context.Background(), config)
723740
require.NoError(t, err)
@@ -908,7 +925,6 @@ func TestListWorkflows(t *testing.T) {
908925
assert.Nil(t, wf.Output, "expected output to be nil when LoadOutput=false")
909926
}
910927
})
911-
912928
// Verify all queue entries are cleaned up
913929
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests")
914930
}

dbos/dbos.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
const (
2525
_DEFAULT_ADMIN_SERVER_PORT = 3001
26+
_DEFAULT_SYSTEM_DB_SCHEMA = "dbos"
2627
_DBOS_DOMAIN = "cloud.dbos.dev"
2728
)
2829

@@ -31,6 +32,7 @@ const (
3132
type Config struct {
3233
DatabaseURL string // PostgreSQL connection string (required)
3334
AppName string // Application name for identification (required)
35+
DatabaseSchema string // Database schema name (defaults to "dbos")
3436
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
3537
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
3638
AdminServerPort int // Port for the admin HTTP server (default: 3001)
@@ -57,6 +59,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
5759
dbosConfig := &Config{
5860
DatabaseURL: inputConfig.DatabaseURL,
5961
AppName: inputConfig.AppName,
62+
DatabaseSchema: inputConfig.DatabaseSchema,
6063
Logger: inputConfig.Logger,
6164
AdminServer: inputConfig.AdminServer,
6265
AdminServerPort: inputConfig.AdminServerPort,
@@ -71,6 +74,9 @@ func processConfig(inputConfig *Config) (*Config, error) {
7174
if dbosConfig.Logger == nil {
7275
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
7376
}
77+
if dbosConfig.DatabaseSchema == "" {
78+
dbosConfig.DatabaseSchema = _DEFAULT_SYSTEM_DB_SCHEMA
79+
}
7480

7581
// Override with environment variables if set
7682
if envAppVersion := os.Getenv("DBOS__APPVERSION"); envAppVersion != "" {
@@ -325,9 +331,10 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
325331
initExecutor.applicationID = os.Getenv("DBOS__APPID")
326332

327333
newSystemDatabaseInputs := newSystemDatabaseInput{
328-
databaseURL: config.DatabaseURL,
329-
customPool: config.SystemDBPool,
330-
logger: initExecutor.logger,
334+
databaseURL: config.DatabaseURL,
335+
databaseSchema: config.DatabaseSchema,
336+
customPool: config.SystemDBPool,
337+
logger: initExecutor.logger,
331338
}
332339

333340
// Create the system database

dbos/dbos_test.go

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dbos
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"log/slog"
78
"testing"
89
"time"
@@ -99,7 +100,7 @@ func TestConfig(t *testing.T) {
99100
dbosCtx, ok := customdbosContext.(*dbosContext)
100101
defer dbosCtx.Shutdown(10 * time.Second)
101102
require.True(t, ok)
102-
103+
103104
sysDB, ok := dbosCtx.systemDB.(*sysDB)
104105
require.True(t, ok)
105106
assert.Same(t, pool, sysDB.pool, "The pool in dbosContext should be the same as the custom pool provided")
@@ -325,4 +326,111 @@ func TestConfig(t *testing.T) {
325326

326327
require.NotNil(t, ctx2)
327328
})
329+
330+
t.Run("SystemDBMigrationWithCustomSchema", func(t *testing.T) {
331+
t.Setenv("DBOS__APPVERSION", "v1.0.0")
332+
t.Setenv("DBOS__APPID", "test-custom-schema")
333+
t.Setenv("DBOS__VMID", "test-executor-id")
334+
335+
customSchema := "dbos_custom_test"
336+
ctx, err := NewDBOSContext(context.Background(), Config{
337+
DatabaseURL: databaseURL,
338+
AppName: "test-custom-schema-migration",
339+
DatabaseSchema: customSchema,
340+
})
341+
require.NoError(t, err)
342+
defer func() {
343+
if ctx != nil {
344+
ctx.Shutdown(1 * time.Minute)
345+
}
346+
}()
347+
348+
require.NotNil(t, ctx)
349+
350+
// Get the internal systemDB instance to check tables directly
351+
dbosCtx, ok := ctx.(*dbosContext)
352+
require.True(t, ok, "expected dbosContext")
353+
require.NotNil(t, dbosCtx.systemDB)
354+
355+
sysDB, ok := dbosCtx.systemDB.(*sysDB)
356+
require.True(t, ok, "expected sysDB")
357+
358+
// Verify schema name was set correctly
359+
assert.Equal(t, customSchema, sysDB.schema, "schema name should match custom schema")
360+
361+
// Verify all expected tables exist in the custom schema
362+
dbCtx := context.Background()
363+
364+
// Test workflow_status table in custom schema
365+
var exists bool
366+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'workflow_status')", customSchema).Scan(&exists)
367+
require.NoError(t, err)
368+
assert.True(t, exists, "workflow_status table should exist in custom schema")
369+
370+
// Test operation_outputs table in custom schema
371+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'operation_outputs')", customSchema).Scan(&exists)
372+
require.NoError(t, err)
373+
assert.True(t, exists, "operation_outputs table should exist in custom schema")
374+
375+
// Test workflow_events table in custom schema
376+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'workflow_events')", customSchema).Scan(&exists)
377+
require.NoError(t, err)
378+
assert.True(t, exists, "workflow_events table should exist in custom schema")
379+
380+
// Test notifications table in custom schema
381+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'notifications')", customSchema).Scan(&exists)
382+
require.NoError(t, err)
383+
assert.True(t, exists, "notifications table should exist in custom schema")
384+
385+
// Test that all tables can be queried using custom schema (empty results expected)
386+
rows, err := sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.workflow_status LIMIT 1", customSchema))
387+
require.NoError(t, err)
388+
rows.Close()
389+
390+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.operation_outputs LIMIT 1", customSchema))
391+
require.NoError(t, err)
392+
rows.Close()
393+
394+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.workflow_events LIMIT 1", customSchema))
395+
require.NoError(t, err)
396+
rows.Close()
397+
398+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT destination_uuid FROM %s.notifications LIMIT 1", customSchema))
399+
require.NoError(t, err)
400+
rows.Close()
401+
402+
// Check that the dbos_migrations table exists in custom schema
403+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'dbos_migrations')", customSchema).Scan(&exists)
404+
require.NoError(t, err)
405+
assert.True(t, exists, "dbos_migrations table should exist in custom schema")
406+
407+
// Verify migration version is 1 (after initial migration)
408+
var version int64
409+
var count int
410+
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT COUNT(*) FROM %s.dbos_migrations", customSchema)).Scan(&count)
411+
require.NoError(t, err)
412+
assert.Equal(t, 1, count, "dbos_migrations table should have exactly one row")
413+
414+
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
415+
require.NoError(t, err)
416+
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
417+
418+
// Test manual shutdown and recreate with same custom schema
419+
ctx.Shutdown(1 * time.Minute)
420+
421+
// Recreate context with same custom schema - should have no error since DB is already migrated
422+
ctx2, err := NewDBOSContext(context.Background(), Config{
423+
DatabaseURL: databaseURL,
424+
AppName: "test-custom-schema-recreate",
425+
DatabaseSchema: customSchema,
426+
})
427+
require.NoError(t, err)
428+
defer func() {
429+
if ctx2 != nil {
430+
ctx2.Shutdown(1 * time.Minute)
431+
}
432+
}()
433+
434+
require.NotNil(t, ctx2)
435+
})
328436
}

0 commit comments

Comments
 (0)