From ff3c91f3c3deab33ec1634be5b67f3bfa22b04c5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 3 Sep 2025 18:50:03 -0700 Subject: [PATCH 1/8] dbos migration --- dbos/dbos_test.go | 102 ++++++++++++ .../000001_initial_dbos_schema.down.sql | 18 --- ...chema.up.sql => 1_initial_dbos_schema.sql} | 0 dbos/system_database.go | 153 ++++++++++++++---- go.mod | 5 +- go.sum | 19 +-- 6 files changed, 227 insertions(+), 70 deletions(-) delete mode 100644 dbos/migrations/000001_initial_dbos_schema.down.sql rename dbos/migrations/{000001_initial_dbos_schema.up.sql => 1_initial_dbos_schema.sql} (100%) diff --git a/dbos/dbos_test.go b/dbos/dbos_test.go index 55f80105..73344b27 100644 --- a/dbos/dbos_test.go +++ b/dbos/dbos_test.go @@ -1,6 +1,7 @@ package dbos import ( + "context" "testing" "time" @@ -166,4 +167,105 @@ func TestConfig(t *testing.T) { assert.Equal(t, "env-only-executor", ctx.GetExecutorID()) }) }) + + t.Run("SystemDBMigration", func(t *testing.T) { + t.Setenv("DBOS__APPVERSION", "v1.0.0") + t.Setenv("DBOS__APPID", "test-migration") + t.Setenv("DBOS__VMID", "test-executor-id") + + ctx, err := NewDBOSContext(Config{ + DatabaseURL: databaseURL, + AppName: "test-migration", + }) + require.NoError(t, err) + defer func() { + if ctx != nil { + ctx.Shutdown(1 * time.Minute) + } + }() + + require.NotNil(t, ctx) + + // Get the internal systemDB instance to check tables directly + dbosCtx, ok := ctx.(*dbosContext) + require.True(t, ok, "expected dbosContext") + require.NotNil(t, dbosCtx.systemDB) + + sysDB, ok := dbosCtx.systemDB.(*sysDB) + require.True(t, ok, "expected sysDB") + + // Verify all expected tables exist and have correct structure + dbCtx := context.Background() + + // Test workflow_status table + var exists bool + err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_status')").Scan(&exists) + require.NoError(t, err) + assert.True(t, exists, "workflow_status table should exist") + + // Test operation_outputs table + err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'operation_outputs')").Scan(&exists) + require.NoError(t, err) + assert.True(t, exists, "operation_outputs table should exist") + + // Test workflow_events table + err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_events')").Scan(&exists) + require.NoError(t, err) + assert.True(t, exists, "workflow_events table should exist") + + // Test notifications table + err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'notifications')").Scan(&exists) + require.NoError(t, err) + assert.True(t, exists, "notifications table should exist") + + // Test that all tables can be queried (empty results expected) + rows, err := sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_status LIMIT 1") + require.NoError(t, err) + rows.Close() + + rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.operation_outputs LIMIT 1") + require.NoError(t, err) + rows.Close() + + rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_events LIMIT 1") + require.NoError(t, err) + rows.Close() + + rows, err = sysDB.pool.Query(dbCtx, "SELECT destination_uuid FROM dbos.notifications LIMIT 1") + require.NoError(t, err) + rows.Close() + + // Check that the dbos_migrations table exists and has one row with the correct version + err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'dbos_migrations')").Scan(&exists) + require.NoError(t, err) + assert.True(t, exists, "dbos_migrations table should exist") + + // Verify migration version is 1 (after initial migration) + var version int64 + var count int + err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 1, count, "dbos_migrations table should have exactly one row") + + err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version) + require.NoError(t, err) + assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)") + + // Test manual shutdown and recreate + ctx.Shutdown(1 * time.Minute) + + // Recreate context - should have no error since DB is already migrated + ctx2, err := NewDBOSContext(Config{ + DatabaseURL: databaseURL, + AppName: "test-migration-recreate", + }) + require.NoError(t, err) + defer func() { + if ctx2 != nil { + ctx2.Shutdown(1 * time.Minute) + } + }() + + require.NotNil(t, ctx2) + }) } diff --git a/dbos/migrations/000001_initial_dbos_schema.down.sql b/dbos/migrations/000001_initial_dbos_schema.down.sql deleted file mode 100644 index 10526474..00000000 --- a/dbos/migrations/000001_initial_dbos_schema.down.sql +++ /dev/null @@ -1,18 +0,0 @@ --- 001_initial_dbos_schema.down.sql - --- Drop triggers first -DROP TRIGGER IF EXISTS dbos_notifications_trigger ON dbos.notifications; -DROP TRIGGER IF EXISTS dbos_workflow_events_trigger ON dbos.workflow_events; - --- Drop function -DROP FUNCTION IF EXISTS dbos.notifications_function(); -DROP FUNCTION IF EXISTS dbos.workflow_events_function(); - --- Drop tables in reverse order to respect foreign key constraints -DROP TABLE IF EXISTS dbos.workflow_events; -DROP TABLE IF EXISTS dbos.notifications; -DROP TABLE IF EXISTS dbos.operation_outputs; -DROP TABLE IF EXISTS dbos.workflow_status; - --- Drop the schema (only if empty) -DROP SCHEMA IF EXISTS dbos; \ No newline at end of file diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/1_initial_dbos_schema.sql similarity index 100% rename from dbos/migrations/000001_initial_dbos_schema.up.sql rename to dbos/migrations/1_initial_dbos_schema.sql diff --git a/dbos/system_database.go b/dbos/system_database.go index 03ba401b..f2c4dbde 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -5,15 +5,15 @@ import ( "embed" "errors" "fmt" + "io/fs" "log/slog" - "net/url" + "path/filepath" + "sort" + "strconv" "strings" "sync" "time" - "github.com/golang-migrate/migrate/v4" - _ "github.com/golang-migrate/migrate/v4/database/pgx/v5" - "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" @@ -123,7 +123,7 @@ func createDatabaseIfNotExists(ctx context.Context, databaseURL string, logger * var migrationFiles embed.FS const ( - _DBOS_MIGRATION_TABLE = "dbos_schema_migrations" + _DBOS_MIGRATION_TABLE = "dbos_migrations" // PostgreSQL error codes _PG_ERROR_UNIQUE_VIOLATION = "23505" @@ -139,52 +139,137 @@ const ( ) func runMigrations(databaseURL string) error { - // Change the driver to pgx5 - parsedURL, err := url.Parse(databaseURL) + // Connect to the database + pool, err := pgxpool.New(context.Background(), databaseURL) if err != nil { - return fmt.Errorf("failed to parse database URL: %v", err) + return fmt.Errorf("failed to create connection pool: %v", err) } - // Handle various PostgreSQL URL schemes - switch parsedURL.Scheme { - case "postgres", "postgresql": - parsedURL.Scheme = "pgx5" - case "pgx5": - // Already in correct format - default: - return fmt.Errorf("unsupported database URL scheme: %s", parsedURL.Scheme) + defer pool.Close() + + // Begin transaction for atomic migration execution + ctx := context.Background() + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) } - databaseURL = parsedURL.String() + defer tx.Rollback(ctx) - // Add custom migration table name to avoid conflicts with user migrations - // Check if query parameters already exist - separator := "?" - if parsedURL.RawQuery != "" { - separator = "&" + // Create the DBOS schema if it doesn't exist + _, err = tx.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS dbos") + if err != nil { + return fmt.Errorf("failed to create DBOS schema: %v", err) } - databaseURL += separator + "x-migrations-table=" + _DBOS_MIGRATION_TABLE - // Create migration source from embedded files - d, err := iofs.New(migrationFiles, "migrations") + // Create the migrations table if it doesn't exist + createTableQuery := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS dbos.%s ( + version BIGINT NOT NULL PRIMARY KEY + )`, _DBOS_MIGRATION_TABLE) + + _, err = tx.Exec(ctx, createTableQuery) if err != nil { - return fmt.Errorf("failed to create migration source: %v", err) + return fmt.Errorf("failed to create migrations table: %v", err) + } + + // Get current migration version + var currentVersion int64 = 0 + query := fmt.Sprintf("SELECT version FROM dbos.%s LIMIT 1", _DBOS_MIGRATION_TABLE) + err = tx.QueryRow(ctx, query).Scan(¤tVersion) + if err != nil && err != pgx.ErrNoRows { + return fmt.Errorf("failed to get current migration version: %v", err) } - // Create migrator - m, err := migrate.NewWithSourceInstance("iofs", d, databaseURL) + // Read and parse migration files + migrations, err := parseMigrationFiles() if err != nil { - return fmt.Errorf("failed to create migrator: %v", err) + return fmt.Errorf("failed to parse migration files: %v", err) } - defer m.Close() - // Run migrations - // FIXME: tolerate errors when the migration is bcz we run an older version of transact - if err := m.Up(); err != nil && err != migrate.ErrNoChange { - return fmt.Errorf("failed to run migrations: %v", err) + // Apply migrations starting from the next version + for _, migration := range migrations { + if migration.version <= currentVersion { + continue + } + + // Execute the migration SQL + _, err = tx.Exec(ctx, migration.sql) + if err != nil { + return fmt.Errorf("failed to execute migration %d: %v", migration.version, err) + } + + // Update the migration version + if currentVersion == 0 { + // Insert first migration record + insertQuery := fmt.Sprintf("INSERT INTO dbos.%s (version) VALUES ($1)", _DBOS_MIGRATION_TABLE) + _, err = tx.Exec(ctx, insertQuery, migration.version) + } else { + // Update existing migration record + updateQuery := fmt.Sprintf("UPDATE dbos.%s SET version = $1", _DBOS_MIGRATION_TABLE) + _, err = tx.Exec(ctx, updateQuery, migration.version) + } + if err != nil { + return fmt.Errorf("failed to update migration version to %d: %v", migration.version, err) + } + + currentVersion = migration.version + } + + // Commit the transaction + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit migration transaction: %v", err) } return nil } +type migrationFile struct { + version int64 + sql string +} + +func parseMigrationFiles() ([]migrationFile, error) { + var migrations []migrationFile + + entries, err := fs.ReadDir(migrationFiles, "migrations") + if err != nil { + return nil, fmt.Errorf("failed to read migration directory: %v", err) + } + + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") { + continue + } + + // Extract version from filename (e.g., "1_initial_dbos_schema.sql" -> 1) + parts := strings.SplitN(entry.Name(), "_", 2) + if len(parts) < 2 { + continue + } + + version, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + continue // Skip files with invalid version format + } + + // Read migration SQL content + content, err := fs.ReadFile(migrationFiles, filepath.Join("migrations", entry.Name())) + if err != nil { + return nil, fmt.Errorf("failed to read migration file %s: %v", entry.Name(), err) + } + + migrations = append(migrations, migrationFile{ + version: version, + sql: string(content), + }) + } + + // Sort migrations by version + sort.Slice(migrations, func(i, j int) bool { + return migrations[i].version < migrations[j].version + }) + + return migrations, nil +} + // New creates a new SystemDatabase instance and runs migrations func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Logger) (systemDatabase, error) { // Create the database if it doesn't exist diff --git a/go.mod b/go.mod index 871bf91c..c3d2057c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ toolchain go1.25.0 require ( github.com/docker/docker v28.3.3+incompatible github.com/docker/go-connections v0.5.0 - github.com/golang-migrate/migrate/v4 v4.18.3 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa @@ -33,14 +32,14 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/sys/atomicwriter v0.1.0 // indirect + github.com/moby/term v0.5.2 // indirect + github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect diff --git a/go.sum b/go.sum index 6e25a0b3..ef743551 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= -github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= @@ -14,8 +14,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dhui/dktest v0.4.5 h1:uUfYBIVREmj/Rw6MvgmqNAYzTiKOHJak+enB5Di73MM= -github.com/dhui/dktest v0.4.5/go.mod h1:tmcyeHDKagvlDrz7gDKq4UAJOLIfVZYkfD5OnHDwcCo= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI= @@ -39,8 +37,6 @@ github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIx github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-migrate/migrate/v4 v4.18.3 h1:EYGkoOsvgHHfm5U/naS1RP/6PL/Xv3S4B/swMiAmDLs= -github.com/golang-migrate/migrate/v4 v4.18.3/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -49,11 +45,6 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= @@ -72,16 +63,14 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs= github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= -github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= -github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ= +github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= From 3a6aee06c5872561f8c9d845a657733815422831 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 09:32:05 -0700 Subject: [PATCH 2/8] migration match python --- dbos/migrations/1_initial_dbos_schema.sql | 98 +++++++++-------------- 1 file changed, 39 insertions(+), 59 deletions(-) diff --git a/dbos/migrations/1_initial_dbos_schema.sql b/dbos/migrations/1_initial_dbos_schema.sql index 8d6199ae..3e54e87b 100644 --- a/dbos/migrations/1_initial_dbos_schema.sql +++ b/dbos/migrations/1_initial_dbos_schema.sql @@ -1,19 +1,14 @@ --- 001_initial_dbos_schema.up.sql - --- Create the dbos schema -CREATE SCHEMA IF NOT EXISTS dbos; - -- Enable uuid extension for generating UUIDs CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; --- Create workflow_status table -CREATE TABLE IF NOT EXISTS dbos.workflow_status ( +CREATE TABLE dbos.workflow_status ( workflow_uuid TEXT PRIMARY KEY, status TEXT, name TEXT, authenticated_user TEXT, assumed_role TEXT, authenticated_roles TEXT, + request TEXT, output TEXT, error TEXT, executor_id TEXT, @@ -27,34 +22,21 @@ CREATE TABLE IF NOT EXISTS dbos.workflow_status ( queue_name TEXT, workflow_timeout_ms BIGINT, workflow_deadline_epoch_ms BIGINT, + inputs TEXT, started_at_epoch_ms BIGINT, deduplication_id TEXT, - inputs TEXT, priority INTEGER NOT NULL DEFAULT 0 ); --- Create indexes for workflow_status -CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at); -CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id); -CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status); +CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at); +CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id); +CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status); --- Create unique constraint for queue_name and deduplication_id -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.table_constraints - WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id' - AND table_name = 'workflow_status' - AND table_schema = 'dbos' - ) THEN - ALTER TABLE dbos.workflow_status - ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id - UNIQUE (queue_name, deduplication_id); - END IF; -END $$; +ALTER TABLE dbos.workflow_status +ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id +UNIQUE (queue_name, deduplication_id); --- Create operation_outputs table -CREATE TABLE IF NOT EXISTS dbos.operation_outputs ( +CREATE TABLE dbos.operation_outputs ( workflow_uuid TEXT NOT NULL, function_id INTEGER NOT NULL, function_name TEXT NOT NULL DEFAULT '', @@ -66,7 +48,7 @@ CREATE TABLE IF NOT EXISTS dbos.operation_outputs ( ON UPDATE CASCADE ON DELETE CASCADE ); -CREATE TABLE IF NOT EXISTS dbos.notifications ( +CREATE TABLE dbos.notifications ( destination_uuid TEXT NOT NULL, topic TEXT, message TEXT NOT NULL, @@ -75,8 +57,7 @@ CREATE TABLE IF NOT EXISTS dbos.notifications ( FOREIGN KEY (destination_uuid) REFERENCES dbos.workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); --- Create index for notifications -CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic); +CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic); -- Create notification function CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$ @@ -89,22 +70,11 @@ END; $$ LANGUAGE plpgsql; -- Create notification trigger -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.triggers - WHERE trigger_name = 'dbos_notifications_trigger' - AND event_object_table = 'notifications' - AND event_object_schema = 'dbos' - ) THEN - CREATE TRIGGER dbos_notifications_trigger - AFTER INSERT ON dbos.notifications - FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function(); - END IF; -END $$; +CREATE TRIGGER dbos_notifications_trigger +AFTER INSERT ON dbos.notifications +FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function(); --- Create workflow_events table -CREATE TABLE IF NOT EXISTS dbos.workflow_events ( +CREATE TABLE dbos.workflow_events ( workflow_uuid TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, @@ -124,16 +94,26 @@ END; $$ LANGUAGE plpgsql; -- Create events trigger -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.triggers - WHERE trigger_name = 'dbos_workflow_events_trigger' - AND event_object_table = 'workflow_events' - AND event_object_schema = 'dbos' - ) THEN - CREATE TRIGGER dbos_workflow_events_trigger - AFTER INSERT ON dbos.workflow_events - FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); - END IF; -END $$; +CREATE TRIGGER dbos_workflow_events_trigger +AFTER INSERT ON dbos.workflow_events +FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); + +CREATE TABLE dbos.streams ( + workflow_uuid TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + "offset" INTEGER NOT NULL, + PRIMARY KEY (workflow_uuid, key, "offset"), + FOREIGN KEY (workflow_uuid) REFERENCES dbos.workflow_status(workflow_uuid) + ON UPDATE CASCADE ON DELETE CASCADE +); + +CREATE TABLE dbos.event_dispatch_kv ( + service_name TEXT NOT NULL, + workflow_fn_name TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + update_seq NUMERIC(38,0), + update_time NUMERIC(38,15), + PRIMARY KEY (service_name, workflow_fn_name, key) +); From 877acd0c1517712764ce3d42d58161f1e93b2af0 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 09:32:27 -0700 Subject: [PATCH 3/8] store migration in strings, remove sorting logic --- dbos/system_database.go | 64 ++++++----------------------------------- 1 file changed, 9 insertions(+), 55 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index f2c4dbde..46d7f404 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2,14 +2,10 @@ package dbos import ( "context" - "embed" + _ "embed" "errors" "fmt" - "io/fs" "log/slog" - "path/filepath" - "sort" - "strconv" "strings" "sync" "time" @@ -119,8 +115,13 @@ func createDatabaseIfNotExists(ctx context.Context, databaseURL string, logger * return nil } -//go:embed migrations/*.sql -var migrationFiles embed.FS +//go:embed migrations/1_initial_dbos_schema.sql +var migration1 string + +// migrations contains all migration files with their version numbers +var migrations = []migrationFile{ + {version: 1, sql: migration1}, +} const ( _DBOS_MIGRATION_TABLE = "dbos_migrations" @@ -178,11 +179,7 @@ func runMigrations(databaseURL string) error { return fmt.Errorf("failed to get current migration version: %v", err) } - // Read and parse migration files - migrations, err := parseMigrationFiles() - if err != nil { - return fmt.Errorf("failed to parse migration files: %v", err) - } + // Use the embedded migrations slice // Apply migrations starting from the next version for _, migration := range migrations { @@ -226,49 +223,6 @@ type migrationFile struct { sql string } -func parseMigrationFiles() ([]migrationFile, error) { - var migrations []migrationFile - - entries, err := fs.ReadDir(migrationFiles, "migrations") - if err != nil { - return nil, fmt.Errorf("failed to read migration directory: %v", err) - } - - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") { - continue - } - - // Extract version from filename (e.g., "1_initial_dbos_schema.sql" -> 1) - parts := strings.SplitN(entry.Name(), "_", 2) - if len(parts) < 2 { - continue - } - - version, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - continue // Skip files with invalid version format - } - - // Read migration SQL content - content, err := fs.ReadFile(migrationFiles, filepath.Join("migrations", entry.Name())) - if err != nil { - return nil, fmt.Errorf("failed to read migration file %s: %v", entry.Name(), err) - } - - migrations = append(migrations, migrationFile{ - version: version, - sql: string(content), - }) - } - - // Sort migrations by version - sort.Slice(migrations, func(i, j int) bool { - return migrations[i].version < migrations[j].version - }) - - return migrations, nil -} // New creates a new SystemDatabase instance and runs migrations func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Logger) (systemDatabase, error) { From 8e7951a461dd3035b3c6c9726b438b79ea89b74b Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 09:35:59 -0700 Subject: [PATCH 4/8] nit --- dbos/system_database.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 46d7f404..c79f214d 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -118,6 +118,11 @@ func createDatabaseIfNotExists(ctx context.Context, databaseURL string, logger * //go:embed migrations/1_initial_dbos_schema.sql var migration1 string +type migrationFile struct { + version int64 + sql string +} + // migrations contains all migration files with their version numbers var migrations = []migrationFile{ {version: 1, sql: migration1}, @@ -218,12 +223,6 @@ func runMigrations(databaseURL string) error { return nil } -type migrationFile struct { - version int64 - sql string -} - - // New creates a new SystemDatabase instance and runs migrations func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Logger) (systemDatabase, error) { // Create the database if it doesn't exist From 728dd82b8058ac3bc250617d19bece67e60a72df Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 09:59:03 -0700 Subject: [PATCH 5/8] debug cli tests --- cmd/dbos/cli_integration_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 173171ce..b66c788e 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "io" @@ -16,6 +17,7 @@ import ( "time" "github.com/dbos-inc/dbos-transact-golang/dbos" + _ "github.com/jackc/pgx/v5/stdlib" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -68,6 +70,17 @@ func TestCLIWorkflow(t *testing.T) { require.NoError(t, err, "Reset database command failed: %s", string(output)) assert.Contains(t, string(output), "System database has been reset successfully", "Output should confirm database reset") + + // log in the database and ensure the dbos schema does not exist anymore + db, err := sql.Open("pgx", getDatabaseURL()) + require.NoError(t, err) + defer db.Close() + + var exists bool + err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'dbos')").Scan(&exists) + require.NoError(t, err) + + assert.False(t, exists, "DBOS schema should not exist") }) t.Run("ProjectInitialization", func(t *testing.T) { From d4f9251c3257f7cb2c815db65b347dc0e6ca32f2 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 10:13:13 -0700 Subject: [PATCH 6/8] more gha debug --- cmd/dbos/cli_integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index b66c788e..427b63c2 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -70,6 +70,7 @@ func TestCLIWorkflow(t *testing.T) { require.NoError(t, err, "Reset database command failed: %s", string(output)) assert.Contains(t, string(output), "System database has been reset successfully", "Output should confirm database reset") + assert.Contains(t, string(output), "database\":\"dbos", "Output should confirm database reset") // log in the database and ensure the dbos schema does not exist anymore db, err := sql.Open("pgx", getDatabaseURL()) From d46ebd11d0ccfd575c89311c68f7642d5fe60f6c Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 10:34:05 -0700 Subject: [PATCH 7/8] chicken, eggs, and migrations --- cmd/dbos/cli_integration_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 427b63c2..954d3dc7 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -159,6 +159,11 @@ func testProjectInitialization(t *testing.T, cliPath string) { modCmd := exec.Command("go", "mod", "tidy") modOutput, err := modCmd.CombinedOutput() require.NoError(t, err, "go mod tidy failed: %s", string(modOutput)) + + // TEMPORARY: go get github.com/dbos-inc/dbos-transact-golang/cmd/dbos@dbos-migration + tmpCmd := exec.Command("go", "get", "github.com/dbos-inc/dbos-transact-golang/cmd/dbos@dbos-migration") + tmpOutput, err := tmpCmd.CombinedOutput() + require.NoError(t, err, "Failed to get dbos-migration: %s", string(tmpOutput)) } // testApplicationLifecycle starts the application and triggers workflows From 6182b4f7a679b99874071a056f41d6d1b6a98a36 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 4 Sep 2025 10:51:27 -0700 Subject: [PATCH 8/8] make the counter atomic in TestScheduledWorkflows to handle occasional slowness in stopping the workflow scheduler --- dbos/workflows_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 9c8f80d0..9ccecc0f 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -7,6 +7,7 @@ import ( "reflect" "runtime" "sync" + "sync/atomic" "testing" "time" @@ -1269,7 +1270,7 @@ func TestWorkflowDeadLetterQueue(t *testing.T) { } var ( - counter = 0 + counter atomic.Int64 counter1Ch = make(chan time.Time, 100) ) @@ -1278,8 +1279,7 @@ func TestScheduledWorkflows(t *testing.T) { RegisterWorkflow(dbosCtx, func(ctx DBOSContext, scheduledTime time.Time) (string, error) { startTime := time.Now() - counter++ - if counter == 10 { + if counter.Add(1) == 10 { return "", fmt.Errorf("counter reached 10, stopping workflow") } select { @@ -1335,8 +1335,8 @@ func TestScheduledWorkflows(t *testing.T) { // Stop the workflowScheduler and check if it stops executing dbosCtx.(*dbosContext).getWorkflowScheduler().Stop() time.Sleep(3 * time.Second) // Wait a bit to ensure no more executions - currentCounter := counter // If more scheduled executions happen, this can also trigger a data race. If the scheduler is correct, there should be no race. - require.Less(t, counter, currentCounter+2, "Scheduled workflow continued executing after stopping scheduler") + currentCounter := counter.Load() + require.Less(t, counter.Load(), currentCounter+2, "Scheduled workflow continued executing after stopping scheduler") }) }