Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dbos

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -166,4 +167,105 @@ func TestConfig(t *testing.T) {
assert.Equal(t, "env-only-executor", ctx.GetExecutorID())
})
})

t.Run("SystemDBMigration", func(t *testing.T) {
t.Setenv("DBOS__APPVERSION", "v1.0.0")
t.Setenv("DBOS__APPID", "test-migration")
t.Setenv("DBOS__VMID", "test-executor-id")

ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-migration",
})
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
}
}()

require.NotNil(t, ctx)

// Get the internal systemDB instance to check tables directly
dbosCtx, ok := ctx.(*dbosContext)
require.True(t, ok, "expected dbosContext")
require.NotNil(t, dbosCtx.systemDB)

sysDB, ok := dbosCtx.systemDB.(*sysDB)
require.True(t, ok, "expected sysDB")

// Verify all expected tables exist and have correct structure
dbCtx := context.Background()

// Test workflow_status table
var exists bool
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_status')").Scan(&exists)
require.NoError(t, err)
assert.True(t, exists, "workflow_status table should exist")

// Test operation_outputs table
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'operation_outputs')").Scan(&exists)
require.NoError(t, err)
assert.True(t, exists, "operation_outputs table should exist")

// Test workflow_events table
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_events')").Scan(&exists)
require.NoError(t, err)
assert.True(t, exists, "workflow_events table should exist")

// Test notifications table
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'notifications')").Scan(&exists)
require.NoError(t, err)
assert.True(t, exists, "notifications table should exist")

// Test that all tables can be queried (empty results expected)
rows, err := sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_status LIMIT 1")
require.NoError(t, err)
rows.Close()

rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.operation_outputs LIMIT 1")
require.NoError(t, err)
rows.Close()

rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_events LIMIT 1")
require.NoError(t, err)
rows.Close()

rows, err = sysDB.pool.Query(dbCtx, "SELECT destination_uuid FROM dbos.notifications LIMIT 1")
require.NoError(t, err)
rows.Close()

// Check that the dbos_migrations table exists and has one row with the correct version
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'dbos_migrations')").Scan(&exists)
require.NoError(t, err)
assert.True(t, exists, "dbos_migrations table should exist")

// Verify migration version is 1 (after initial migration)
var version int64
var count int
err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count)
require.NoError(t, err)
assert.Equal(t, 1, count, "dbos_migrations table should have exactly one row")

err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")

// Test manual shutdown and recreate
ctx.Shutdown(1 * time.Minute)

// Recreate context - should have no error since DB is already migrated
ctx2, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-migration-recreate",
})
require.NoError(t, err)
defer func() {
if ctx2 != nil {
ctx2.Shutdown(1 * time.Minute)
}
}()

require.NotNil(t, ctx2)
})
}
18 changes: 0 additions & 18 deletions dbos/migrations/000001_initial_dbos_schema.down.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
-- 001_initial_dbos_schema.up.sql

-- Create the dbos schema
CREATE SCHEMA IF NOT EXISTS dbos;

-- Enable uuid extension for generating UUIDs
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Create workflow_status table
CREATE TABLE IF NOT EXISTS dbos.workflow_status (
CREATE TABLE dbos.workflow_status (
workflow_uuid TEXT PRIMARY KEY,
status TEXT,
name TEXT,
authenticated_user TEXT,
assumed_role TEXT,
authenticated_roles TEXT,
request TEXT,
output TEXT,
error TEXT,
executor_id TEXT,
Expand All @@ -27,34 +22,21 @@ CREATE TABLE IF NOT EXISTS dbos.workflow_status (
queue_name TEXT,
workflow_timeout_ms BIGINT,
workflow_deadline_epoch_ms BIGINT,
inputs TEXT,
started_at_epoch_ms BIGINT,
deduplication_id TEXT,
inputs TEXT,
priority INTEGER NOT NULL DEFAULT 0
);

-- Create indexes for workflow_status
CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at);
CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status);
CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at);
CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status);

-- Create unique constraint for queue_name and deduplication_id
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id'
AND table_name = 'workflow_status'
AND table_schema = 'dbos'
) THEN
ALTER TABLE dbos.workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);
END IF;
END $$;
ALTER TABLE dbos.workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);

-- Create operation_outputs table
CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
CREATE TABLE dbos.operation_outputs (
workflow_uuid TEXT NOT NULL,
function_id INTEGER NOT NULL,
function_name TEXT NOT NULL DEFAULT '',
Expand All @@ -66,7 +48,7 @@ CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS dbos.notifications (
CREATE TABLE dbos.notifications (
destination_uuid TEXT NOT NULL,
topic TEXT,
message TEXT NOT NULL,
Expand All @@ -75,8 +57,7 @@ CREATE TABLE IF NOT EXISTS dbos.notifications (
FOREIGN KEY (destination_uuid) REFERENCES dbos.workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);
-- Create index for notifications
CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic);

-- Create notification function
CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$
Expand All @@ -89,22 +70,11 @@ END;
$$ LANGUAGE plpgsql;

-- Create notification trigger
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.triggers
WHERE trigger_name = 'dbos_notifications_trigger'
AND event_object_table = 'notifications'
AND event_object_schema = 'dbos'
) THEN
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON dbos.notifications
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
END IF;
END $$;
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON dbos.notifications
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();

-- Create workflow_events table
CREATE TABLE IF NOT EXISTS dbos.workflow_events (
CREATE TABLE dbos.workflow_events (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
Expand All @@ -124,16 +94,26 @@ END;
$$ LANGUAGE plpgsql;

-- Create events trigger
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.triggers
WHERE trigger_name = 'dbos_workflow_events_trigger'
AND event_object_table = 'workflow_events'
AND event_object_schema = 'dbos'
) THEN
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON dbos.workflow_events
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
END IF;
END $$;
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON dbos.workflow_events
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();

CREATE TABLE dbos.streams (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
"offset" INTEGER NOT NULL,
PRIMARY KEY (workflow_uuid, key, "offset"),
FOREIGN KEY (workflow_uuid) REFERENCES dbos.workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE dbos.event_dispatch_kv (
service_name TEXT NOT NULL,
workflow_fn_name TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
update_seq NUMERIC(38,0),
update_time NUMERIC(38,15),
PRIMARY KEY (service_name, workflow_fn_name, key)
);
Loading
Loading