Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
assert.True(t, exists, "dbos_migrations table should exist")

// Verify migration version is 1 (after initial migration)
// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
var version int64
var count int
err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count)
Expand All @@ -260,7 +260,7 @@ func TestConfig(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(2), version, "migration version should be 1 (after initial migration)")
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")

// Test manual shutdown and recreate
Shutdown(ctx, 1*time.Minute)
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
require.NoError(t, err)
assert.True(t, exists, "dbos_migrations table should exist in custom schema")

// Verify migration version is 1 (after initial migration)
// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
var version int64
var count int
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT COUNT(*) FROM %s.dbos_migrations", customSchema)).Scan(&count)
Expand All @@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(2), version, "migration version should be 2 (after initial migration and queue partition key migration)")
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")
})

// Test workflows for exercising Send/Recv and SetEvent/GetEvent
Expand Down
5 changes: 5 additions & 0 deletions dbos/migrations/3_add_workflow_status_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Migration 3: Add index on workflow_status for queue, status, and started_at_epoch_ms
-- This index improves query performance for queue operations that filter by queue_name, status, and started_at_epoch_ms

CREATE INDEX "idx_workflow_status_queue_status_started" ON %s."workflow_status" ("queue_name", "status", "started_at_epoch_ms");

6 changes: 6 additions & 0 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ var migration1SQL string
//go:embed migrations/2_add_queue_partition_key.sql
var migration2SQL string

//go:embed migrations/3_add_workflow_status_index.sql
var migration3SQL string

type migrationFile struct {
version int64
sql string
Expand Down Expand Up @@ -165,10 +168,13 @@ func runMigrations(pool *pgxpool.Pool, schema string) error {

migration2SQLProcessed := fmt.Sprintf(migration2SQL, sanitizedSchema)

migration3SQLProcessed := fmt.Sprintf(migration3SQL, sanitizedSchema)

// Build migrations list with processed SQL
migrations := []migrationFile{
{version: 1, sql: migration1SQLProcessed},
{version: 2, sql: migration2SQLProcessed},
{version: 3, sql: migration3SQLProcessed},
}

// Begin transaction for atomic migration execution
Expand Down
Loading