Skip to content

Commit 068a6f8

Browse files
authored
Merge branch 'main' into feat/display_url
2 parents ef4f4aa + 5eb5ab1 commit 068a6f8

File tree

7 files changed

+359
-131
lines changed

7 files changed

+359
-131
lines changed

dbos/client.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ import (
1414
)
1515

1616
type ClientConfig struct {
17-
DatabaseURL string // Connection URL for the PostgreSQL database
18-
Logger *slog.Logger // Optional custom logger
19-
SystemDBPool *pgxpool.Pool // Optional existing connection pool for the system database
17+
DatabaseURL string // Connection URL for the PostgreSQL database
18+
DatabaseSchema string // Database schema name (defaults to "dbos")
19+
Logger *slog.Logger // Optional custom logger
20+
SystemDBPool *pgxpool.Pool // Optional existing connection pool for the system database
2021
}
2122

2223
// Client provides a programmatic way to interact with your DBOS application from external code.
@@ -52,10 +53,11 @@ type client struct {
5253
// }
5354
func NewClient(ctx context.Context, config ClientConfig) (Client, error) {
5455
dbosCtx, err := NewDBOSContext(ctx, Config{
55-
DatabaseURL: config.DatabaseURL,
56-
AppName: "dbos-client",
57-
Logger: config.Logger,
58-
SystemDBPool: config.SystemDBPool,
56+
DatabaseURL: config.DatabaseURL,
57+
DatabaseSchema: config.DatabaseSchema,
58+
AppName: "dbos-client",
59+
Logger: config.Logger,
60+
SystemDBPool: config.SystemDBPool,
5961
})
6062
if err != nil {
6163
return nil, err

dbos/client_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,25 @@ func TestForkWorkflow(t *testing.T) {
690690
}
691691

692692
func TestListWorkflows(t *testing.T) {
693-
// Setup server context
694-
serverCtx := setupDBOS(t, true, true)
693+
// Setup server context with custom schema
694+
databaseURL := getDatabaseURL()
695+
resetTestDatabase(t, databaseURL)
696+
697+
customSchema := "dbos_list_test"
698+
serverCtx, err := NewDBOSContext(context.Background(), Config{
699+
DatabaseURL: databaseURL,
700+
AppName: "test-list-workflows",
701+
DatabaseSchema: customSchema,
702+
})
703+
require.NoError(t, err)
704+
require.NotNil(t, serverCtx)
705+
706+
// Register cleanup for server context
707+
t.Cleanup(func() {
708+
if serverCtx != nil {
709+
serverCtx.Shutdown(30 * time.Second)
710+
}
711+
})
695712

696713
// Create queue for communication
697714
queue := NewWorkflowQueue(serverCtx, "list-workflows-queue")
@@ -711,13 +728,13 @@ func TestListWorkflows(t *testing.T) {
711728
RegisterWorkflow(serverCtx, simpleWorkflow, WithWorkflowName("SimpleWorkflow"))
712729

713730
// Launch server
714-
err := serverCtx.Launch()
731+
err = serverCtx.Launch()
715732
require.NoError(t, err)
716733

717-
// Setup client
718-
databaseURL := getDatabaseURL()
734+
// Setup client with same custom schema
719735
config := ClientConfig{
720-
DatabaseURL: databaseURL,
736+
DatabaseURL: databaseURL,
737+
DatabaseSchema: customSchema,
721738
}
722739
client, err := NewClient(context.Background(), config)
723740
require.NoError(t, err)
@@ -908,7 +925,6 @@ func TestListWorkflows(t *testing.T) {
908925
assert.Nil(t, wf.Output, "expected output to be nil when LoadOutput=false")
909926
}
910927
})
911-
912928
// Verify all queue entries are cleaned up
913929
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests")
914930
}

dbos/dbos.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
const (
2525
_DEFAULT_ADMIN_SERVER_PORT = 3001
26+
_DEFAULT_SYSTEM_DB_SCHEMA = "dbos"
2627
_DBOS_DOMAIN = "cloud.dbos.dev"
2728
)
2829

@@ -31,6 +32,7 @@ const (
3132
type Config struct {
3233
DatabaseURL string // PostgreSQL connection string (required)
3334
AppName string // Application name for identification (required)
35+
DatabaseSchema string // Database schema name (defaults to "dbos")
3436
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
3537
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
3638
AdminServerPort int // Port for the admin HTTP server (default: 3001)
@@ -57,6 +59,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
5759
dbosConfig := &Config{
5860
DatabaseURL: inputConfig.DatabaseURL,
5961
AppName: inputConfig.AppName,
62+
DatabaseSchema: inputConfig.DatabaseSchema,
6063
Logger: inputConfig.Logger,
6164
AdminServer: inputConfig.AdminServer,
6265
AdminServerPort: inputConfig.AdminServerPort,
@@ -71,6 +74,9 @@ func processConfig(inputConfig *Config) (*Config, error) {
7174
if dbosConfig.Logger == nil {
7275
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
7376
}
77+
if dbosConfig.DatabaseSchema == "" {
78+
dbosConfig.DatabaseSchema = _DEFAULT_SYSTEM_DB_SCHEMA
79+
}
7480

7581
// Override with environment variables if set
7682
if envAppVersion := os.Getenv("DBOS__APPVERSION"); envAppVersion != "" {
@@ -325,9 +331,10 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
325331
initExecutor.applicationID = os.Getenv("DBOS__APPID")
326332

327333
newSystemDatabaseInputs := newSystemDatabaseInput{
328-
databaseURL: config.DatabaseURL,
329-
customPool: config.SystemDBPool,
330-
logger: initExecutor.logger,
334+
databaseURL: config.DatabaseURL,
335+
databaseSchema: config.DatabaseSchema,
336+
customPool: config.SystemDBPool,
337+
logger: initExecutor.logger,
331338
}
332339

333340
// Create the system database

dbos/dbos_test.go

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package dbos
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"log/slog"
78
"testing"
89
"time"
910

11+
"github.com/google/uuid"
1012
"github.com/jackc/pgx/v5/pgxpool"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
@@ -99,7 +101,7 @@ func TestConfig(t *testing.T) {
99101
dbosCtx, ok := customdbosContext.(*dbosContext)
100102
defer dbosCtx.Shutdown(10 * time.Second)
101103
require.True(t, ok)
102-
104+
103105
sysDB, ok := dbosCtx.systemDB.(*sysDB)
104106
require.True(t, ok)
105107
assert.Same(t, pool, sysDB.pool, "The pool in dbosContext should be the same as the custom pool provided")
@@ -326,3 +328,192 @@ func TestConfig(t *testing.T) {
326328
require.NotNil(t, ctx2)
327329
})
328330
}
331+
332+
func TestCustomSystemDBSchema(t *testing.T) {
333+
t.Setenv("DBOS__APPVERSION", "v1.0.0")
334+
t.Setenv("DBOS__APPID", "test-custom-schema")
335+
t.Setenv("DBOS__VMID", "test-executor-id")
336+
337+
databaseURL := getDatabaseURL()
338+
customSchema := "dbos_custom_test"
339+
340+
ctx, err := NewDBOSContext(context.Background(), Config{
341+
DatabaseURL: databaseURL,
342+
AppName: "test-custom-schema-migration",
343+
DatabaseSchema: customSchema,
344+
})
345+
require.NoError(t, err)
346+
defer func() {
347+
if ctx != nil {
348+
ctx.Shutdown(1 * time.Minute)
349+
}
350+
}()
351+
352+
require.NotNil(t, ctx)
353+
354+
t.Run("CustomSchemaSetup", func(t *testing.T) {
355+
// Get the internal systemDB instance to check tables directly
356+
dbosCtx, ok := ctx.(*dbosContext)
357+
require.True(t, ok, "expected dbosContext")
358+
require.NotNil(t, dbosCtx.systemDB)
359+
360+
sysDB, ok := dbosCtx.systemDB.(*sysDB)
361+
require.True(t, ok, "expected sysDB")
362+
363+
// Verify schema name was set correctly
364+
assert.Equal(t, customSchema, sysDB.schema, "schema name should match custom schema")
365+
366+
// Verify all expected tables exist in the custom schema
367+
dbCtx := context.Background()
368+
369+
// Test workflow_status table in custom schema
370+
var exists bool
371+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'workflow_status')", customSchema).Scan(&exists)
372+
require.NoError(t, err)
373+
assert.True(t, exists, "workflow_status table should exist in custom schema")
374+
375+
// Test operation_outputs table in custom schema
376+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'operation_outputs')", customSchema).Scan(&exists)
377+
require.NoError(t, err)
378+
assert.True(t, exists, "operation_outputs table should exist in custom schema")
379+
380+
// Test workflow_events table in custom schema
381+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'workflow_events')", customSchema).Scan(&exists)
382+
require.NoError(t, err)
383+
assert.True(t, exists, "workflow_events table should exist in custom schema")
384+
385+
// Test notifications table in custom schema
386+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'notifications')", customSchema).Scan(&exists)
387+
require.NoError(t, err)
388+
assert.True(t, exists, "notifications table should exist in custom schema")
389+
390+
// Test that all tables can be queried using custom schema (empty results expected)
391+
rows, err := sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.workflow_status LIMIT 1", customSchema))
392+
require.NoError(t, err)
393+
rows.Close()
394+
395+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.operation_outputs LIMIT 1", customSchema))
396+
require.NoError(t, err)
397+
rows.Close()
398+
399+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT workflow_uuid FROM %s.workflow_events LIMIT 1", customSchema))
400+
require.NoError(t, err)
401+
rows.Close()
402+
403+
rows, err = sysDB.pool.Query(dbCtx, fmt.Sprintf("SELECT destination_uuid FROM %s.notifications LIMIT 1", customSchema))
404+
require.NoError(t, err)
405+
rows.Close()
406+
407+
// Check that the dbos_migrations table exists in custom schema
408+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = 'dbos_migrations')", customSchema).Scan(&exists)
409+
require.NoError(t, err)
410+
assert.True(t, exists, "dbos_migrations table should exist in custom schema")
411+
412+
// Verify migration version is 1 (after initial migration)
413+
var version int64
414+
var count int
415+
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT COUNT(*) FROM %s.dbos_migrations", customSchema)).Scan(&count)
416+
require.NoError(t, err)
417+
assert.Equal(t, 1, count, "dbos_migrations table should have exactly one row")
418+
419+
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
420+
require.NoError(t, err)
421+
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
422+
})
423+
424+
// Test workflows for exercising Send/Recv and SetEvent/GetEvent
425+
type testWorkflowInput struct {
426+
PartnerWorkflowID string
427+
Message string
428+
}
429+
430+
// Workflow A: Uses Send() and GetEvent() - waits for workflow B
431+
sendGetEventWorkflow := func(ctx DBOSContext, input testWorkflowInput) (string, error) {
432+
// Send a message to the partner workflow
433+
err := Send(ctx, input.PartnerWorkflowID, input.Message, "test-topic")
434+
if err != nil {
435+
return "", err
436+
}
437+
438+
// Wait for an event from the partner workflow
439+
result, err := GetEvent[string](ctx, input.PartnerWorkflowID, "response-key", 5*time.Hour)
440+
if err != nil {
441+
return "", err
442+
}
443+
444+
return result, nil
445+
}
446+
447+
// Workflow B: Uses Recv() and SetEvent() - waits for workflow A
448+
recvSetEventWorkflow := func(ctx DBOSContext, input testWorkflowInput) (string, error) {
449+
// Receive a message from the partner workflow
450+
receivedMsg, err := Recv[string](ctx, "test-topic", 5*time.Hour)
451+
if err != nil {
452+
return "", err
453+
}
454+
455+
time.Sleep(1 * time.Second)
456+
457+
// Set an event for the partner workflow
458+
err = SetEvent(ctx, "response-key", "response-from-workflow-b")
459+
if err != nil {
460+
return "", err
461+
}
462+
463+
return receivedMsg, nil
464+
}
465+
466+
t.Run("CustomSchemaUsage", func(t *testing.T) {
467+
// Register the test workflows
468+
RegisterWorkflow(ctx, sendGetEventWorkflow)
469+
RegisterWorkflow(ctx, recvSetEventWorkflow)
470+
471+
// Launch the DBOS context
472+
ctx.Launch()
473+
474+
// Test RunWorkflow - start both workflows that will communicate with each other
475+
workflowAID := uuid.NewString()
476+
workflowBID := uuid.NewString()
477+
478+
// Start workflow B first (receiver)
479+
handleB, err := RunWorkflow(ctx, recvSetEventWorkflow, testWorkflowInput{
480+
PartnerWorkflowID: workflowAID,
481+
Message: "test-message-from-b",
482+
}, WithWorkflowID(workflowBID))
483+
require.NoError(t, err, "failed to start recvSetEventWorkflow")
484+
485+
// Small delay to ensure workflow B is ready to receive
486+
time.Sleep(100 * time.Millisecond)
487+
488+
// Start workflow A (sender)
489+
handleA, err := RunWorkflow(ctx, sendGetEventWorkflow, testWorkflowInput{
490+
PartnerWorkflowID: workflowBID,
491+
Message: "test-message-from-a",
492+
}, WithWorkflowID(workflowAID))
493+
require.NoError(t, err, "failed to start sendGetEventWorkflow")
494+
495+
// Wait for both workflows to complete
496+
resultA, err := handleA.GetResult()
497+
require.NoError(t, err, "failed to get result from workflow A")
498+
assert.Equal(t, "response-from-workflow-b", resultA, "workflow A should receive response from workflow B")
499+
500+
resultB, err := handleB.GetResult()
501+
require.NoError(t, err, "failed to get result from workflow B")
502+
assert.Equal(t, "test-message-from-a", resultB, "workflow B should receive message from workflow A")
503+
504+
// Test GetWorkflowSteps
505+
stepsA, err := GetWorkflowSteps(ctx, workflowAID)
506+
require.NoError(t, err, "failed to get workflow A steps")
507+
require.Len(t, stepsA, 3, "workflow A should have 3 steps (Send + GetEvent + Sleep)")
508+
assert.Equal(t, "DBOS.send", stepsA[0].StepName, "first step should be Send")
509+
assert.Equal(t, "DBOS.getEvent", stepsA[1].StepName, "second step should be GetEvent")
510+
assert.Equal(t, "DBOS.sleep", stepsA[2].StepName, "third step should be Sleep")
511+
512+
stepsB, err := GetWorkflowSteps(ctx, workflowBID)
513+
require.NoError(t, err, "failed to get workflow B steps")
514+
require.Len(t, stepsB, 3, "workflow B should have 3 steps (Recv + Sleep + SetEvent)")
515+
assert.Equal(t, "DBOS.recv", stepsB[0].StepName, "first step should be Recv")
516+
assert.Equal(t, "DBOS.sleep", stepsB[1].StepName, "second step should be Sleep")
517+
assert.Equal(t, "DBOS.setEvent", stepsB[2].StepName, "third step should be SetEvent")
518+
})
519+
}

0 commit comments

Comments
 (0)