From df940b4304da7303ec4659d9bdc8a6a9d5ed8754 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 11:48:20 -0700 Subject: [PATCH 01/12] test --- cmd/dbos/cli_integration_test.go | 137 +++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 44 deletions(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index f7c8826..ced12c3 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -20,6 +20,7 @@ import ( "github.com/dbos-inc/dbos-transact-golang/dbos" "github.com/google/uuid" + "github.com/jackc/pgx/v5" _ "github.com/jackc/pgx/v5/stdlib" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,16 +39,12 @@ const ( ) // getDatabaseURL returns a default database URL if none is configured, following dbos/utils_test.go pattern -func getDatabaseURL() string { - databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL") - if databaseURL == "" { - password := os.Getenv("PGPASSWORD") - if password == "" { - password = "dbos" - } - databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", url.QueryEscape(password)) +func getDatabaseURL(dbRole string) string { + password := os.Getenv("PGPASSWORD") + if password == "" { + password = "dbos" } - return databaseURL + return fmt.Sprintf("postgres://%s:%s@localhost:5432/dbos?sslmode=disable", dbRole, url.QueryEscape(password)) } // TestCLIWorkflow provides comprehensive integration testing of the DBOS CLI @@ -69,16 +66,25 @@ func TestCLIWorkflow(t *testing.T) { name string schemaName string schemaArgs []string + dbRole string }{ { name: "CustomSchema", schemaName: "test_schema", schemaArgs: []string{"--schema", "test_schema"}, + dbRole: "postgres", }, { name: "DefaultSchema", schemaName: "dbos", schemaArgs: []string{}, // No schema argument, use default + dbRole: "postgres", + }, + { + name: "FunnySchema", + schemaName: "F8nny_sCHem@-n@m3", + schemaArgs: []string{"--schema", "F8nny_sCHem@-n@m3"}, + dbRole: "notpostgres", }, } @@ -99,34 +105,52 @@ func TestCLIWorkflow(t *testing.T) { t.Run("ResetDatabase", func(t *testing.T) { args := append([]string{"reset", "-y"}, config.schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL("postgres")) output, err := cmd.CombinedOutput() require.NoError(t, err, "Reset database command failed: %s", string(output)) assert.Contains(t, string(output), "System database has been reset successfully", "Output should confirm database reset") - assert.Contains(t, string(output), "database\":\"dbos", "Output should confirm database reset") - // log in the database and ensure the schema does not exist anymore - db, err := sql.Open("pgx", getDatabaseURL()) + // If db role is specified, attempt to remove it from postgres + if config.dbRole != "postgres" { + db, err := sql.Open("pgx", getDatabaseURL("postgres")) + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(fmt.Sprintf("DROP ROLE IF EXISTS %s", pgx.Identifier{config.dbRole}.Sanitize())) + require.NoError(t, err) + } + + // log in the database and ensure schema and role do not exist anymore + db, err := sql.Open("pgx", getDatabaseURL("postgres")) require.NoError(t, err) defer db.Close() var exists bool err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1)", config.schemaName).Scan(&exists) require.NoError(t, err) - assert.False(t, exists, fmt.Sprintf("Schema %s should not exist", config.schemaName)) + + if config.dbRole != "postgres" { + err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname = $1)", config.dbRole).Scan(&exists) + require.NoError(t, err) + assert.False(t, exists, fmt.Sprintf("Role %s should not exist", config.dbRole)) + } }) t.Run("ProjectInitialization", func(t *testing.T) { testProjectInitialization(t, cliPath) }) + t.Run("MigrateCommand", func(t *testing.T) { + testMigrateCommand(t, cliPath, config.schemaArgs, config.dbRole) + }) + // Start a test application using dbos start startArgs := append([]string{"start"}, config.schemaArgs...) cmd := exec.CommandContext(context.Background(), cliPath, startArgs...) - envVars := append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + envVars := append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(config.dbRole)) // Pass the schema to the test app if using custom schema if config.schemaName != "dbos" { envVars = append(envVars, "DBOS_SCHEMA="+config.schemaName) @@ -152,11 +176,11 @@ func TestCLIWorkflow(t *testing.T) { }) t.Run("WorkflowCommands", func(t *testing.T) { - testWorkflowCommands(t, cliPath, config.schemaArgs) + testWorkflowCommands(t, cliPath, config.schemaArgs, config.dbRole) }) t.Run("ErrorHandling", func(t *testing.T) { - testErrorHandling(t, cliPath, config.schemaArgs) + testErrorHandling(t, cliPath, config.schemaArgs, config.dbRole) }) }) } @@ -222,32 +246,57 @@ func testProjectInitialization(t *testing.T, cliPath string) { require.NoError(t, err, "go mod tidy failed: %s", string(modOutput)) } +func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { + // If the role is set, create it in Postgres first + if dbRole != "postgres" { + db, err := sql.Open("pgx", getDatabaseURL("postgres")) + require.NoError(t, err) + defer db.Close() + password := os.Getenv("PGPASSWORD") + if password == "" { + password = "dbos" + } + _, err = db.Exec(fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", dbRole, password)) + require.NoError(t, err) + } + + args := append([]string{"--verbose", "migrate"}, schemaArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } + cmd := exec.Command(cliPath, args...) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL("postgres")) + output, err := cmd.CombinedOutput() + require.NoError(t, err, "Migrate command failed: %s", string(output)) + assert.Contains(t, string(output), "DBOS migrations completed successfully", "Output should confirm migration") +} + // testWorkflowCommands comprehensively tests all workflow CLI commands -func testWorkflowCommands(t *testing.T, cliPath string, schemaArgs []string) { +func testWorkflowCommands(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { t.Run("ListWorkflows", func(t *testing.T) { - testListWorkflows(t, cliPath, schemaArgs) + testListWorkflows(t, cliPath, schemaArgs, dbRole) }) t.Run("GetWorkflow", func(t *testing.T) { - testGetWorkflow(t, cliPath, schemaArgs) + testGetWorkflow(t, cliPath, schemaArgs, dbRole) }) t.Run("CancelResumeWorkflow", func(t *testing.T) { - testCancelResumeWorkflow(t, cliPath, schemaArgs) + testCancelResumeWorkflow(t, cliPath, schemaArgs, dbRole) }) t.Run("ForkWorkflow", func(t *testing.T) { - testForkWorkflow(t, cliPath, schemaArgs) + testForkWorkflow(t, cliPath, schemaArgs, dbRole) }) t.Run("GetWorkflowSteps", func(t *testing.T) { - testGetWorkflowSteps(t, cliPath, schemaArgs) + testGetWorkflowSteps(t, cliPath, schemaArgs, dbRole) }) } // testListWorkflows tests various workflow listing scenarios -func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string) { +func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { // Create some test workflows first to ensure we have data to filter resp, err := http.Get("http://localhost:" + testServerPort + "/workflow") require.NoError(t, err, "Failed to trigger workflow") @@ -395,7 +444,7 @@ func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string) { t.Run(tc.name, func(t *testing.T) { args := append(tc.args, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "List command failed: %s", string(output)) @@ -437,7 +486,7 @@ func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string) { } // testGetWorkflow tests retrieving individual workflow details -func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string) { +func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -459,7 +508,7 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string) { t.Run("GetWorkflowJSON", func(t *testing.T) { args := append([]string{"workflow", "get", workflowID}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Get workflow JSON command failed: %s", string(output)) @@ -473,7 +522,7 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string) { assert.NotEmpty(t, status.Name, "Should have workflow name") // Redo the test with the db url in flags - args2 := append([]string{"workflow", "get", workflowID, "--db-url", getDatabaseURL()}, schemaArgs...) + args2 := append([]string{"workflow", "get", workflowID, "--db-url", getDatabaseURL(dbRole)}, schemaArgs...) cmd2 := exec.Command(cliPath, args2...) output2, err2 := cmd2.CombinedOutput() @@ -509,7 +558,7 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string) { // Test with environment variable D args3 := append([]string{"workflow", "get", workflowID}, schemaArgs...) cmd3 := exec.Command(cliPath, args3...) - cmd3.Env = append(os.Environ(), "D="+getDatabaseURL()) + cmd3.Env = append(os.Environ(), "D="+getDatabaseURL(dbRole)) output3, err3 := cmd3.CombinedOutput() require.NoError(t, err3, "Get workflow JSON command with config env var failed: %s", string(output3)) @@ -525,7 +574,7 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string) { } // testCancelResumeWorkflow tests workflow state management -func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string) { +func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -547,7 +596,7 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string) t.Run("CancelWorkflow", func(t *testing.T) { args := append([]string{"workflow", "cancel", workflowID}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Cancel workflow command failed: %s", string(output)) @@ -557,7 +606,7 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string) // Verify workflow is actually cancelled getArgs := append([]string{"workflow", "get", workflowID}, schemaArgs...) getCmd := exec.Command(cliPath, getArgs...) - getCmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + getCmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) getOutput, err := getCmd.CombinedOutput() require.NoError(t, err, "Get workflow status failed: %s", string(getOutput)) @@ -571,7 +620,7 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string) t.Run("ResumeWorkflow", func(t *testing.T) { args := append([]string{"workflow", "resume", workflowID}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Resume workflow command failed: %s", string(output)) @@ -588,7 +637,7 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string) } // testForkWorkflow tests workflow forking functionality -func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string) { +func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -612,7 +661,7 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string) { targetVersion := "1.0.0" args := append([]string{"workflow", "fork", workflowID, "--forked-workflow-id", newID, "--application-version", targetVersion}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Fork workflow command failed: %s", string(output)) @@ -632,7 +681,7 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string) { t.Run("ForkWorkflowFromStep", func(t *testing.T) { args := append([]string{"workflow", "fork", workflowID, "--step", "2"}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Fork workflow from step command failed: %s", string(output)) @@ -651,7 +700,7 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string) { // Test fork with invalid step number (0 should be converted to 1) args := append([]string{"workflow", "fork", workflowID, "--step", "-1"}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Fork workflow from step command failed: %s", string(output)) @@ -668,7 +717,7 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string) { } // testGetWorkflowSteps tests retrieving workflow steps -func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string) { +func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -690,7 +739,7 @@ func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string) { t.Run("GetStepsJSON", func(t *testing.T) { args := append([]string{"workflow", "steps", workflowID}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() require.NoError(t, err, "Get workflow steps JSON command failed: %s", string(output)) @@ -712,14 +761,14 @@ func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string) { } // testErrorHandling tests various error conditions and edge cases -func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string) { +func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { t.Run("InvalidWorkflowID", func(t *testing.T) { invalidID := "invalid-workflow-id-12345" // Test get with invalid ID args := append([]string{"workflow", "get", invalidID}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() assert.Error(t, err, "Should fail with invalid workflow ID") @@ -730,7 +779,7 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string) { // Test get without workflow ID args := append([]string{"workflow", "get"}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() assert.Error(t, err, "Should fail without workflow ID") @@ -740,7 +789,7 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string) { t.Run("InvalidStatusFilter", func(t *testing.T) { args := append([]string{"workflow", "list", "--status", "INVALID_STATUS"}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() assert.Error(t, err, "Should fail with invalid status") @@ -750,7 +799,7 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string) { t.Run("InvalidTimeFormat", func(t *testing.T) { args := append([]string{"workflow", "list", "--start-time", "invalid-time-format"}, schemaArgs...) cmd := exec.Command(cliPath, args...) - cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL()) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) output, err := cmd.CombinedOutput() assert.Error(t, err, "Should fail with invalid time format") From 4a3a1e52fdf4aea9bf912b278219b8d66cf8012b Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 11:48:40 -0700 Subject: [PATCH 02/12] sanitize schema/role input when granting permissions --- cmd/dbos/migrate.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/dbos/migrate.go b/cmd/dbos/migrate.go index 89c9d82..d855e72 100644 --- a/cmd/dbos/migrate.go +++ b/cmd/dbos/migrate.go @@ -8,6 +8,7 @@ import ( "runtime" "time" + "github.com/jackc/pgx/v5" _ "github.com/jackc/pgx/v5/stdlib" "github.com/spf13/cobra" ) @@ -38,7 +39,7 @@ func runMigrate(cmd *cobra.Command, args []string) error { // Create DBOS context which will run migrations automatically for the system DB _, err = createDBOSContext(ctx, dbURL) if err != nil { - return fmt.Errorf("failed to create DBOS context: %w", err) + return err } // Determine the schema to use (from flag or default) @@ -93,14 +94,17 @@ func grantDBOSSchemaPermissions(databaseURL, roleName, schemaName string) error defer cancel() // Grant usage on the specified schema + schemaSQL := pgx.Identifier{schemaName}.Sanitize() + roleSQL := pgx.Identifier{roleName}.Sanitize() + queries := []string{ - fmt.Sprintf(`GRANT USAGE ON SCHEMA %s TO "%s"`, schemaName, roleName), - fmt.Sprintf(`GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA %s TO "%s"`, schemaName, roleName), - fmt.Sprintf(`GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA %s TO "%s"`, schemaName, roleName), - fmt.Sprintf(`GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA %s TO "%s"`, schemaName, roleName), - fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT ALL ON TABLES TO "%s"`, schemaName, roleName), - fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT ALL ON SEQUENCES TO "%s"`, schemaName, roleName), - fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT EXECUTE ON FUNCTIONS TO "%s"`, schemaName, roleName), + fmt.Sprintf(`GRANT USAGE ON SCHEMA %s TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA %s TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA %s TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA %s TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT ALL ON TABLES TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT ALL ON SEQUENCES TO %s`, schemaSQL, roleSQL), + fmt.Sprintf(`ALTER DEFAULT PRIVILEGES IN SCHEMA %s GRANT EXECUTE ON FUNCTIONS TO %s`, schemaSQL, roleSQL), } for _, query := range queries { From ec76d834e2af3cc83a79b61724f262e30119ef96 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 11:49:04 -0700 Subject: [PATCH 03/12] better erroring --- dbos/dbos.go | 2 +- dbos/system_database.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 63b1fef..b290bec 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -339,7 +339,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error // Create the system database systemDB, err := newSystemDatabase(initExecutor, newSystemDatabaseInputs) if err != nil { - return nil, newInitializationError(fmt.Sprintf("failed to create system database: %v", err)) + return nil, newInitializationError(err.Error()) } initExecutor.systemDB = systemDB initExecutor.logger.Debug("System database initialized") diff --git a/dbos/system_database.go b/dbos/system_database.go index 6cf659f..366bada 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -90,7 +90,7 @@ func createDatabaseIfNotExists(ctx context.Context, pool *pgxpool.Pool, logger * poolConfig := pool.Config() dbName := poolConfig.ConnConfig.Database if dbName == "" { - return newInitializationError("database name not found in pool configuration") + return errors.New("database name not found in pool configuration") } // Create a connection to the postgres database to create the target database @@ -98,7 +98,7 @@ func createDatabaseIfNotExists(ctx context.Context, pool *pgxpool.Pool, logger * serverConfig.Database = "postgres" conn, err := pgx.ConnectConfig(ctx, serverConfig) if err != nil { - return newInitializationError(fmt.Sprintf("failed to connect to PostgreSQL server: %v", err)) + return fmt.Errorf("failed to connect to PostgreSQL server: %v", err) } defer conn.Close(ctx) @@ -107,13 +107,13 @@ func createDatabaseIfNotExists(ctx context.Context, pool *pgxpool.Pool, logger * err = conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)", dbName).Scan(&exists) if err != nil { - return newInitializationError(fmt.Sprintf("failed to check if database exists: %v", err)) + return fmt.Errorf("failed to check if database exists: %v", err) } if !exists { createSQL := fmt.Sprintf("CREATE DATABASE %s", pgx.Identifier{dbName}.Sanitize()) _, err = conn.Exec(ctx, createSQL) if err != nil { - return newInitializationError(fmt.Sprintf("failed to create database %s: %v", dbName, err)) + return fmt.Errorf("failed to create database %s: %v", dbName, err) } logger.Debug("Database created", "name", dbName) } From 91f1756e44865069ca974eac56d8329298388cd2 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 11:49:30 -0700 Subject: [PATCH 04/12] sanitize schema and split create migration table from checking its existence when running migrations --- dbos/system_database.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 366bada..9233341 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -173,8 +173,8 @@ func runMigrations(pool *pgxpool.Pool, schema string) error { // Check if the schema exists var schemaExists bool - checkSchemaQuery := fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = '%s')", schema) - err = tx.QueryRow(ctx, checkSchemaQuery).Scan(&schemaExists) + checkSchemaQuery := `SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1)` + err = tx.QueryRow(ctx, checkSchemaQuery, schema).Scan(&schemaExists) if err != nil { return fmt.Errorf("failed to check if schema %s exists: %v", schema, err) } @@ -189,13 +189,19 @@ func runMigrations(pool *pgxpool.Pool, schema string) error { } // Create the migrations table if it doesn't exist - createTableQuery := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s ( - version BIGINT NOT NULL PRIMARY KEY - )`, pgx.Identifier{schema}.Sanitize(), _DBOS_MIGRATION_TABLE) + checkMigrationTableExistsQuery := `SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2)` - _, err = tx.Exec(ctx, createTableQuery) + var migrationTableExists bool + err = tx.QueryRow(ctx, checkMigrationTableExistsQuery, schema, _DBOS_MIGRATION_TABLE).Scan(&migrationTableExists) if err != nil { - return fmt.Errorf("failed to create migrations table: %v", err) + return fmt.Errorf("failed to check if migration table exists: %v", err) + } + if !migrationTableExists { + createTableQuery := fmt.Sprintf(`CREATE TABLE %s.%s (version BIGINT NOT NULL PRIMARY KEY)`, pgx.Identifier{schema}.Sanitize(), _DBOS_MIGRATION_TABLE) + _, err = tx.Exec(ctx, createTableQuery) + if err != nil { + return fmt.Errorf("failed to create migrations table: %v", err) + } } // Get current migration version From 779c91a7b209614726796abfb3ba4c1adfe8f957 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:13:03 -0700 Subject: [PATCH 05/12] sanitize in test --- cmd/dbos/cli_integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index ced12c3..6b91335 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -256,7 +256,8 @@ func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRol if password == "" { password = "dbos" } - _, err = db.Exec(fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", dbRole, password)) + query := fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", pgx.Identifier{dbRole}.Sanitize(), password) + _, err = db.Exec(query) require.NoError(t, err) } From 79491b08181d5f63cc696733dfb3e3339895cd58 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:13:34 -0700 Subject: [PATCH 06/12] query escape --- cmd/dbos/cli_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 6b91335..4df264f 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -256,7 +256,7 @@ func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRol if password == "" { password = "dbos" } - query := fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", pgx.Identifier{dbRole}.Sanitize(), password) + query := fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", pgx.Identifier{dbRole}.Sanitize(), url.QueryEscape(password)) _, err = db.Exec(query) require.NoError(t, err) } From bf0754ca7b778502d8b23fcf04dde809fc2136cc Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:28:02 -0700 Subject: [PATCH 07/12] one args --- cmd/dbos/cli_integration_test.go | 129 +++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 39 deletions(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 4df264f..f7fff93 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -65,26 +65,26 @@ func TestCLIWorkflow(t *testing.T) { testConfigs := []struct { name string schemaName string - schemaArgs []string dbRole string + args []string }{ { name: "CustomSchema", schemaName: "test_schema", - schemaArgs: []string{"--schema", "test_schema"}, dbRole: "postgres", + args: []string{"--schema", "test_schema"}, }, { name: "DefaultSchema", schemaName: "dbos", - schemaArgs: []string{}, // No schema argument, use default dbRole: "postgres", + args: []string{}, }, { name: "FunnySchema", schemaName: "F8nny_sCHem@-n@m3", - schemaArgs: []string{"--schema", "F8nny_sCHem@-n@m3"}, dbRole: "notpostgres", + args: []string{"--schema", "F8nny_sCHem@-n@m3"}, }, } @@ -103,7 +103,7 @@ func TestCLIWorkflow(t *testing.T) { }) t.Run("ResetDatabase", func(t *testing.T) { - args := append([]string{"reset", "-y"}, config.schemaArgs...) + args := append([]string{"reset", "-y"}, config.args...) cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL("postgres")) @@ -144,11 +144,14 @@ func TestCLIWorkflow(t *testing.T) { }) t.Run("MigrateCommand", func(t *testing.T) { - testMigrateCommand(t, cliPath, config.schemaArgs, config.dbRole) + testMigrateCommand(t, cliPath, config.args, config.dbRole) }) // Start a test application using dbos start - startArgs := append([]string{"start"}, config.schemaArgs...) + startArgs := append([]string{"start"}, config.args...) + if config.dbRole != "postgres" { + startArgs = append(startArgs, "--app-role", config.dbRole) + } cmd := exec.CommandContext(context.Background(), cliPath, startArgs...) envVars := append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(config.dbRole)) // Pass the schema to the test app if using custom schema @@ -176,11 +179,11 @@ func TestCLIWorkflow(t *testing.T) { }) t.Run("WorkflowCommands", func(t *testing.T) { - testWorkflowCommands(t, cliPath, config.schemaArgs, config.dbRole) + testWorkflowCommands(t, cliPath, config.args, config.dbRole) }) t.Run("ErrorHandling", func(t *testing.T) { - testErrorHandling(t, cliPath, config.schemaArgs, config.dbRole) + testErrorHandling(t, cliPath, config.args, config.dbRole) }) }) } @@ -246,7 +249,7 @@ func testProjectInitialization(t *testing.T, cliPath string) { require.NoError(t, err, "go mod tidy failed: %s", string(modOutput)) } -func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testMigrateCommand(t *testing.T, cliPath string, baseArgs []string, dbRole string) { // If the role is set, create it in Postgres first if dbRole != "postgres" { db, err := sql.Open("pgx", getDatabaseURL("postgres")) @@ -261,7 +264,7 @@ func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRol require.NoError(t, err) } - args := append([]string{"--verbose", "migrate"}, schemaArgs...) + args := append([]string{"--verbose", "migrate"}, baseArgs...) if dbRole != "postgres" { args = append(args, "--app-role", dbRole) } @@ -273,31 +276,31 @@ func testMigrateCommand(t *testing.T, cliPath string, schemaArgs []string, dbRol } // testWorkflowCommands comprehensively tests all workflow CLI commands -func testWorkflowCommands(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testWorkflowCommands(t *testing.T, cliPath string, baseArgs []string, dbRole string) { t.Run("ListWorkflows", func(t *testing.T) { - testListWorkflows(t, cliPath, schemaArgs, dbRole) + testListWorkflows(t, cliPath, baseArgs, dbRole) }) t.Run("GetWorkflow", func(t *testing.T) { - testGetWorkflow(t, cliPath, schemaArgs, dbRole) + testGetWorkflow(t, cliPath, baseArgs, dbRole) }) t.Run("CancelResumeWorkflow", func(t *testing.T) { - testCancelResumeWorkflow(t, cliPath, schemaArgs, dbRole) + testCancelResumeWorkflow(t, cliPath, baseArgs, dbRole) }) t.Run("ForkWorkflow", func(t *testing.T) { - testForkWorkflow(t, cliPath, schemaArgs, dbRole) + testForkWorkflow(t, cliPath, baseArgs, dbRole) }) t.Run("GetWorkflowSteps", func(t *testing.T) { - testGetWorkflowSteps(t, cliPath, schemaArgs, dbRole) + testGetWorkflowSteps(t, cliPath, baseArgs, dbRole) }) } // testListWorkflows tests various workflow listing scenarios -func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testListWorkflows(t *testing.T, cliPath string, baseArgs []string, dbRole string) { // Create some test workflows first to ensure we have data to filter resp, err := http.Get("http://localhost:" + testServerPort + "/workflow") require.NoError(t, err, "Failed to trigger workflow") @@ -443,7 +446,10 @@ func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string, dbRole for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - args := append(tc.args, schemaArgs...) + args := append(tc.args, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -487,7 +493,7 @@ func testListWorkflows(t *testing.T, cliPath string, schemaArgs []string, dbRole } // testGetWorkflow tests retrieving individual workflow details -func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testGetWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -507,7 +513,10 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole s assert.NotEmpty(t, workflowID, "Workflow ID should not be empty") t.Run("GetWorkflowJSON", func(t *testing.T) { - args := append([]string{"workflow", "get", workflowID}, schemaArgs...) + args := append([]string{"workflow", "get", workflowID}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -523,7 +532,10 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole s assert.NotEmpty(t, status.Name, "Should have workflow name") // Redo the test with the db url in flags - args2 := append([]string{"workflow", "get", workflowID, "--db-url", getDatabaseURL(dbRole)}, schemaArgs...) + args2 := append([]string{"workflow", "get", workflowID, "--db-url", getDatabaseURL(dbRole)}, baseArgs...) + if dbRole != "postgres" { + args2 = append(args2, "--app-role", dbRole) + } cmd2 := exec.Command(cliPath, args2...) output2, err2 := cmd2.CombinedOutput() @@ -557,7 +569,10 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole s }) // Test with environment variable D - args3 := append([]string{"workflow", "get", workflowID}, schemaArgs...) + args3 := append([]string{"workflow", "get", workflowID}, baseArgs...) + if dbRole != "postgres" { + args3 = append(args3, "--app-role", dbRole) + } cmd3 := exec.Command(cliPath, args3...) cmd3.Env = append(os.Environ(), "D="+getDatabaseURL(dbRole)) @@ -575,7 +590,7 @@ func testGetWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole s } // testCancelResumeWorkflow tests workflow state management -func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testCancelResumeWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -595,7 +610,10 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, assert.NotEmpty(t, workflowID, "Workflow ID should not be empty") t.Run("CancelWorkflow", func(t *testing.T) { - args := append([]string{"workflow", "cancel", workflowID}, schemaArgs...) + args := append([]string{"workflow", "cancel", workflowID}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -605,7 +623,10 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, assert.Contains(t, string(output), "Successfully cancelled", "Should confirm cancellation") // Verify workflow is actually cancelled - getArgs := append([]string{"workflow", "get", workflowID}, schemaArgs...) + getArgs := append([]string{"workflow", "get", workflowID}, baseArgs...) + if dbRole != "postgres" { + getArgs = append(getArgs, "--app-role", dbRole) + } getCmd := exec.Command(cliPath, getArgs...) getCmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -619,7 +640,10 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, }) t.Run("ResumeWorkflow", func(t *testing.T) { - args := append([]string{"workflow", "resume", workflowID}, schemaArgs...) + args := append([]string{"workflow", "resume", workflowID}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -638,7 +662,7 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, schemaArgs []string, } // testForkWorkflow tests workflow forking functionality -func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testForkWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -660,7 +684,10 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole t.Run("ForkWorkflow", func(t *testing.T) { newID := uuid.NewString() targetVersion := "1.0.0" - args := append([]string{"workflow", "fork", workflowID, "--forked-workflow-id", newID, "--application-version", targetVersion}, schemaArgs...) + args := append([]string{"workflow", "fork", workflowID, "--forked-workflow-id", newID, "--application-version", targetVersion}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -680,7 +707,10 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole }) t.Run("ForkWorkflowFromStep", func(t *testing.T) { - args := append([]string{"workflow", "fork", workflowID, "--step", "2"}, schemaArgs...) + args := append([]string{"workflow", "fork", workflowID, "--step", "2"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -699,7 +729,10 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole t.Run("ForkWorkflowFromNegativeStep", func(t *testing.T) { // Test fork with invalid step number (0 should be converted to 1) - args := append([]string{"workflow", "fork", workflowID, "--step", "-1"}, schemaArgs...) + args := append([]string{"workflow", "fork", workflowID, "--step", "-1"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -718,7 +751,7 @@ func testForkWorkflow(t *testing.T, cliPath string, schemaArgs []string, dbRole } // testGetWorkflowSteps tests retrieving workflow steps -func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testGetWorkflowSteps(t *testing.T, cliPath string, baseArgs []string, dbRole string) { resp, err := http.Get("http://localhost:" + testServerPort + "/queue") require.NoError(t, err, "Failed to trigger queue workflow") defer resp.Body.Close() @@ -738,7 +771,10 @@ func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string, dbR assert.NotEmpty(t, workflowID, "Workflow ID should not be empty") t.Run("GetStepsJSON", func(t *testing.T) { - args := append([]string{"workflow", "steps", workflowID}, schemaArgs...) + args := append([]string{"workflow", "steps", workflowID}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -762,12 +798,15 @@ func testGetWorkflowSteps(t *testing.T, cliPath string, schemaArgs []string, dbR } // testErrorHandling tests various error conditions and edge cases -func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole string) { +func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole string) { t.Run("InvalidWorkflowID", func(t *testing.T) { invalidID := "invalid-workflow-id-12345" // Test get with invalid ID - args := append([]string{"workflow", "get", invalidID}, schemaArgs...) + args := append([]string{"workflow", "get", invalidID}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -778,7 +817,10 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole t.Run("MissingWorkflowID", func(t *testing.T) { // Test get without workflow ID - args := append([]string{"workflow", "get"}, schemaArgs...) + args := append([]string{"workflow", "get"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -788,7 +830,10 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole }) t.Run("InvalidStatusFilter", func(t *testing.T) { - args := append([]string{"workflow", "list", "--status", "INVALID_STATUS"}, schemaArgs...) + args := append([]string{"workflow", "list", "--status", "INVALID_STATUS"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -798,7 +843,10 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole }) t.Run("InvalidTimeFormat", func(t *testing.T) { - args := append([]string{"workflow", "list", "--start-time", "invalid-time-format"}, schemaArgs...) + args := append([]string{"workflow", "list", "--start-time", "invalid-time-format"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -809,7 +857,10 @@ func testErrorHandling(t *testing.T, cliPath string, schemaArgs []string, dbRole t.Run("MissingDatabaseURL", func(t *testing.T) { // Test without system DB url in the flags or env var - args := append([]string{"workflow", "list"}, schemaArgs...) + args := append([]string{"workflow", "list"}, baseArgs...) + if dbRole != "postgres" { + args = append(args, "--app-role", dbRole) + } cmd := exec.Command(cliPath, args...) output, err := cmd.CombinedOutput() From ee64cb6d26b117c9b52c1c458c4d95eb041c80cb Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:46:58 -0700 Subject: [PATCH 08/12] fix + be verbose --- cmd/dbos/cli_integration_test.go | 71 ++++++++------------------------ 1 file changed, 18 insertions(+), 53 deletions(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index f7fff93..49e6d1a 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "database/sql" _ "embed" @@ -149,9 +150,6 @@ func TestCLIWorkflow(t *testing.T) { // Start a test application using dbos start startArgs := append([]string{"start"}, config.args...) - if config.dbRole != "postgres" { - startArgs = append(startArgs, "--app-role", config.dbRole) - } cmd := exec.CommandContext(context.Background(), cliPath, startArgs...) envVars := append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(config.dbRole)) // Pass the schema to the test app if using custom schema @@ -159,8 +157,22 @@ func TestCLIWorkflow(t *testing.T) { envVars = append(envVars, "DBOS_SCHEMA="+config.schemaName) } cmd.Env = envVars - err = cmd.Start() - require.NoError(t, err, "Failed to start application") + stdout, _ := cmd.StdoutPipe() + stderr, _ := cmd.StderrPipe() + require.NoError(t, cmd.Start(), "Failed to start application") + go func() { + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + t.Logf("[app stdout] %s", scanner.Text()) + } + }() + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + t.Logf("[app stderr] %s", scanner.Text()) + } + }() + // Wait for server to be ready require.Eventually(t, func() bool { resp, err := http.Get("http://localhost:" + testServerPort) @@ -273,6 +285,7 @@ func testMigrateCommand(t *testing.T, cliPath string, baseArgs []string, dbRole output, err := cmd.CombinedOutput() require.NoError(t, err, "Migrate command failed: %s", string(output)) assert.Contains(t, string(output), "DBOS migrations completed successfully", "Output should confirm migration") + fmt.Println(string(output)) } // testWorkflowCommands comprehensively tests all workflow CLI commands @@ -447,9 +460,6 @@ func testListWorkflows(t *testing.T, cliPath string, baseArgs []string, dbRole s for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { args := append(tc.args, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -514,9 +524,6 @@ func testGetWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole str t.Run("GetWorkflowJSON", func(t *testing.T) { args := append([]string{"workflow", "get", workflowID}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -533,9 +540,6 @@ func testGetWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole str // Redo the test with the db url in flags args2 := append([]string{"workflow", "get", workflowID, "--db-url", getDatabaseURL(dbRole)}, baseArgs...) - if dbRole != "postgres" { - args2 = append(args2, "--app-role", dbRole) - } cmd2 := exec.Command(cliPath, args2...) output2, err2 := cmd2.CombinedOutput() @@ -570,9 +574,6 @@ func testGetWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole str // Test with environment variable D args3 := append([]string{"workflow", "get", workflowID}, baseArgs...) - if dbRole != "postgres" { - args3 = append(args3, "--app-role", dbRole) - } cmd3 := exec.Command(cliPath, args3...) cmd3.Env = append(os.Environ(), "D="+getDatabaseURL(dbRole)) @@ -611,9 +612,6 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, baseArgs []string, d t.Run("CancelWorkflow", func(t *testing.T) { args := append([]string{"workflow", "cancel", workflowID}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -624,9 +622,6 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, baseArgs []string, d // Verify workflow is actually cancelled getArgs := append([]string{"workflow", "get", workflowID}, baseArgs...) - if dbRole != "postgres" { - getArgs = append(getArgs, "--app-role", dbRole) - } getCmd := exec.Command(cliPath, getArgs...) getCmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -641,9 +636,6 @@ func testCancelResumeWorkflow(t *testing.T, cliPath string, baseArgs []string, d t.Run("ResumeWorkflow", func(t *testing.T) { args := append([]string{"workflow", "resume", workflowID}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -685,9 +677,6 @@ func testForkWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole st newID := uuid.NewString() targetVersion := "1.0.0" args := append([]string{"workflow", "fork", workflowID, "--forked-workflow-id", newID, "--application-version", targetVersion}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -708,9 +697,6 @@ func testForkWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole st t.Run("ForkWorkflowFromStep", func(t *testing.T) { args := append([]string{"workflow", "fork", workflowID, "--step", "2"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -730,9 +716,6 @@ func testForkWorkflow(t *testing.T, cliPath string, baseArgs []string, dbRole st t.Run("ForkWorkflowFromNegativeStep", func(t *testing.T) { // Test fork with invalid step number (0 should be converted to 1) args := append([]string{"workflow", "fork", workflowID, "--step", "-1"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -772,9 +755,6 @@ func testGetWorkflowSteps(t *testing.T, cliPath string, baseArgs []string, dbRol t.Run("GetStepsJSON", func(t *testing.T) { args := append([]string{"workflow", "steps", workflowID}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -804,9 +784,6 @@ func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole s // Test get with invalid ID args := append([]string{"workflow", "get", invalidID}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -818,9 +795,6 @@ func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole s t.Run("MissingWorkflowID", func(t *testing.T) { // Test get without workflow ID args := append([]string{"workflow", "get"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -831,9 +805,6 @@ func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole s t.Run("InvalidStatusFilter", func(t *testing.T) { args := append([]string{"workflow", "list", "--status", "INVALID_STATUS"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -844,9 +815,6 @@ func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole s t.Run("InvalidTimeFormat", func(t *testing.T) { args := append([]string{"workflow", "list", "--start-time", "invalid-time-format"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) @@ -858,9 +826,6 @@ func testErrorHandling(t *testing.T, cliPath string, baseArgs []string, dbRole s t.Run("MissingDatabaseURL", func(t *testing.T) { // Test without system DB url in the flags or env var args := append([]string{"workflow", "list"}, baseArgs...) - if dbRole != "postgres" { - args = append(args, "--app-role", dbRole) - } cmd := exec.Command(cliPath, args...) output, err := cmd.CombinedOutput() From 63f5786be4500d0f18de052e930c1471c8aa6fcf Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:58:18 -0700 Subject: [PATCH 09/12] no need to escape here --- cmd/dbos/cli_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 49e6d1a..5d45598 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -271,7 +271,7 @@ func testMigrateCommand(t *testing.T, cliPath string, baseArgs []string, dbRole if password == "" { password = "dbos" } - query := fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", pgx.Identifier{dbRole}.Sanitize(), url.QueryEscape(password)) + query := fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD '%s'", pgx.Identifier{dbRole}.Sanitize(), password) _, err = db.Exec(query) require.NoError(t, err) } From 15bed197f60042676ce0285f156c1810abfe6876 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 13:58:28 -0700 Subject: [PATCH 10/12] simpler erroring --- dbos/system_database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 9233341..7c68e73 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2468,7 +2468,7 @@ func (s *sysDB) resetSystemDB(ctx context.Context) error { // Connect to the postgres database conn, err := pgx.ConnectConfig(ctx, postgresConfig) if err != nil { - return fmt.Errorf("failed to connect to PostgreSQL server: %w", err) + return err } defer conn.Close(ctx) From c3d2b832784ba0ff38a25d4049b21ae0fb41dbe6 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 14:05:39 -0700 Subject: [PATCH 11/12] fix --- dbos/dbos_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/dbos_test.go b/dbos/dbos_test.go index e924aff..068a993 100644 --- a/dbos/dbos_test.go +++ b/dbos/dbos_test.go @@ -670,7 +670,7 @@ func TestCustomPool(t *testing.T) { dbosErr, ok := err.(*DBOSError) require.True(t, ok, "expected DBOSError, got %T", err) assert.Equal(t, InitializationError, dbosErr.Code) - expectedMsg := "Error initializing DBOS Transact: failed to create system database" + expectedMsg := "Error initializing DBOS Transact: failed to validate custom pool" assert.Contains(t, dbosErr.Message, expectedMsg) }) From 04500512a925476574cb28f75bc7d8e5259746d6 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 29 Sep 2025 14:51:17 -0700 Subject: [PATCH 12/12] use an interesting pg_role --- cmd/dbos/cli_integration_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 5d45598..a72b21d 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -45,7 +45,14 @@ func getDatabaseURL(dbRole string) string { if password == "" { password = "dbos" } - return fmt.Sprintf("postgres://%s:%s@localhost:5432/dbos?sslmode=disable", dbRole, url.QueryEscape(password)) + dsn := &url.URL{ + Scheme: "postgres", + Host: "localhost:5432", + Path: "/dbos", + RawQuery: "sslmode=disable", + } + dsn.User = url.UserPassword(dbRole, password) + return dsn.String() } // TestCLIWorkflow provides comprehensive integration testing of the DBOS CLI @@ -84,7 +91,7 @@ func TestCLIWorkflow(t *testing.T) { { name: "FunnySchema", schemaName: "F8nny_sCHem@-n@m3", - dbRole: "notpostgres", + dbRole: "User Name-123@acme.com#$%&!", args: []string{"--schema", "F8nny_sCHem@-n@m3"}, }, }