Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.database.SystemDatabase;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -233,232 +238,28 @@ static void runDbosMigrations(Connection conn, String schema, List<String> migra

public static List<String> getMigrations(String schema) {
Objects.requireNonNull(schema);
var migrations =
List.of(
migration1,
migration2,
migration3,
migration4,
migration5,
migration6,
migration7,
migration8,
migration9,
migration10,
migration11,
migration12);
return migrations.stream().map(m -> m.formatted(schema)).toList();
List<String> migrations = new ArrayList<>();
int migrationNum = 1;
while (true) {
String resourceName = "/db/migrations/migration" + migrationNum + ".sql";
try (InputStream is = MigrationManager.class.getResourceAsStream(resourceName)) {
if (is == null) {
break;
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
}
migrations.add(sb.toString().formatted(schema));
}
} catch (Exception e) {
throw new RuntimeException("Failed to load migration resource: " + resourceName, e);
}
migrationNum++;
}
return migrations;
}

static final String migration1 =
"""
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE "%1$s".workflow_status (
workflow_uuid TEXT PRIMARY KEY,
status TEXT,
name TEXT,
authenticated_user TEXT,
assumed_role TEXT,
authenticated_roles TEXT,
request TEXT,
output TEXT,
error TEXT,
executor_id TEXT,
created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
updated_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
application_version TEXT,
application_id TEXT,
class_name VARCHAR(255) DEFAULT NULL,
config_name VARCHAR(255) DEFAULT NULL,
recovery_attempts BIGINT DEFAULT 0,
queue_name TEXT,
workflow_timeout_ms BIGINT,
workflow_deadline_epoch_ms BIGINT,
inputs TEXT,
started_at_epoch_ms BIGINT,
deduplication_id TEXT,
priority INT4 NOT NULL DEFAULT 0
);

CREATE INDEX workflow_status_created_at_index ON "%1$s".workflow_status (created_at);
CREATE INDEX workflow_status_executor_id_index ON "%1$s".workflow_status (executor_id);
CREATE INDEX workflow_status_status_index ON "%1$s".workflow_status (status);

ALTER TABLE "%1$s".workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);

CREATE TABLE "%1$s".operation_outputs (
workflow_uuid TEXT NOT NULL,
function_id INT4 NOT NULL,
function_name TEXT NOT NULL DEFAULT '',
output TEXT,
error TEXT,
child_workflow_id TEXT,
PRIMARY KEY (workflow_uuid, function_id),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE "%1$s".notifications (
message_uuid TEXT NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, -- Built-in function
destination_uuid TEXT NOT NULL,
topic TEXT,
message TEXT NOT NULL,
created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
FOREIGN KEY (destination_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX idx_workflow_topic ON "%1$s".notifications (destination_uuid, topic);

-- Create notification function
CREATE OR REPLACE FUNCTION "%1$s".notifications_function() RETURNS TRIGGER AS $$
DECLARE
payload text := NEW.destination_uuid || '::' || NEW.topic;
BEGIN
PERFORM pg_notify('dbos_notifications_channel', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create notification trigger
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON "%1$s".notifications
FOR EACH ROW EXECUTE FUNCTION "%1$s".notifications_function();

CREATE TABLE "%1$s".workflow_events (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (workflow_uuid, key),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

-- Create events function
CREATE OR REPLACE FUNCTION "%1$s".workflow_events_function() RETURNS TRIGGER AS $$
DECLARE
payload text := NEW.workflow_uuid || '::' || NEW.key;
BEGIN
PERFORM pg_notify('dbos_workflow_events_channel', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create events trigger
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON "%1$s".workflow_events
FOR EACH ROW EXECUTE FUNCTION "%1$s".workflow_events_function();

CREATE TABLE "%1$s".streams (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
"offset" INT4 NOT NULL,
PRIMARY KEY (workflow_uuid, key, "offset"),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE "%1$s".event_dispatch_kv (
service_name TEXT NOT NULL,
workflow_fn_name TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
update_seq NUMERIC(38,0),
update_time NUMERIC(38,15),
PRIMARY KEY (service_name, workflow_fn_name, key)
);
""";

static final String migration2 =
"""
ALTER TABLE "%1$s".workflow_status ADD COLUMN queue_partition_key TEXT;
""";

static final String migration3 =
"""
create index "idx_workflow_status_queue_status_started" on "%1$s"."workflow_status" ("queue_name", "status", "started_at_epoch_ms")
""";

static final String migration4 =
"""
ALTER TABLE "%1$s".workflow_status ADD COLUMN forked_from TEXT;
CREATE INDEX "idx_workflow_status_forked_from" ON "%1$s"."workflow_status" ("forked_from");
""";

static final String migration5 =
"""
ALTER TABLE "%1$s".operation_outputs ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT;
""";

static final String migration6 =
"""
CREATE TABLE "%1$s".workflow_events_history (
workflow_uuid TEXT NOT NULL,
function_id INT4 NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (workflow_uuid, function_id, key),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);
ALTER TABLE "%1$s".streams ADD COLUMN function_id INT4 NOT NULL DEFAULT 0;
""";

static final String migration7 =
"""
ALTER TABLE "%1$s"."workflow_status" ADD COLUMN "owner_xid" VARCHAR(40) DEFAULT NULL
""";

static final String migration8 =
"""
ALTER TABLE "%1$s"."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL;
CREATE INDEX "idx_workflow_status_parent_workflow_id" ON "%1$s"."workflow_status" ("parent_workflow_id");
""";

static final String migration9 =
"""
CREATE TABLE "%1$s".workflow_schedules (
schedule_id TEXT PRIMARY KEY,
schedule_name TEXT NOT NULL UNIQUE,
workflow_name TEXT NOT NULL,
workflow_class_name TEXT,
schedule TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'ACTIVE',
context TEXT NOT NULL
);
""";

static final String migration10 =
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_schema = '%1$s'
AND table_name = 'notifications'
AND constraint_type = 'PRIMARY KEY'
) THEN
ALTER TABLE "%1$s".notifications ADD PRIMARY KEY (message_uuid);
END IF;
END $$;
""";

static final String migration11 =
"""
ALTER TABLE "%1$s"."workflow_status" ADD COLUMN "serialization" TEXT DEFAULT NULL;
ALTER TABLE "%1$s"."notifications" ADD COLUMN "serialization" TEXT DEFAULT NULL;
ALTER TABLE "%1$s"."workflow_events" ADD COLUMN "serialization" TEXT DEFAULT NULL;
ALTER TABLE "%1$s"."workflow_events_history" ADD COLUMN "serialization" TEXT DEFAULT NULL;
ALTER TABLE "%1$s"."operation_outputs" ADD COLUMN "serialization" TEXT DEFAULT NULL;
ALTER TABLE "%1$s"."streams" ADD COLUMN "serialization" TEXT DEFAULT NULL;
""";

static final String migration12 =
"""
ALTER TABLE "%1$s"."notifications" ADD COLUMN "consumed" BOOLEAN NOT NULL DEFAULT FALSE;
CREATE INDEX "idx_notifications_unconsumed" ON "%1$s"."notifications" ("destination_uuid", "topic") WHERE consumed = FALSE;
""";
}
118 changes: 118 additions & 0 deletions transact/src/main/resources/db/migrations/migration1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE "%1$s".workflow_status (
workflow_uuid TEXT PRIMARY KEY,
status TEXT,
name TEXT,
authenticated_user TEXT,
assumed_role TEXT,
authenticated_roles TEXT,
request TEXT,
output TEXT,
error TEXT,
executor_id TEXT,
created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
updated_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
application_version TEXT,
application_id TEXT,
class_name VARCHAR(255) DEFAULT NULL,
config_name VARCHAR(255) DEFAULT NULL,
recovery_attempts BIGINT DEFAULT 0,
queue_name TEXT,
workflow_timeout_ms BIGINT,
workflow_deadline_epoch_ms BIGINT,
inputs TEXT,
started_at_epoch_ms BIGINT,
deduplication_id TEXT,
priority INT4 NOT NULL DEFAULT 0
);

CREATE INDEX workflow_status_created_at_index ON "%1$s".workflow_status (created_at);
CREATE INDEX workflow_status_executor_id_index ON "%1$s".workflow_status (executor_id);
CREATE INDEX workflow_status_status_index ON "%1$s".workflow_status (status);

ALTER TABLE "%1$s".workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);

CREATE TABLE "%1$s".operation_outputs (
workflow_uuid TEXT NOT NULL,
function_id INT4 NOT NULL,
function_name TEXT NOT NULL DEFAULT '',
output TEXT,
error TEXT,
child_workflow_id TEXT,
PRIMARY KEY (workflow_uuid, function_id),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE "%1$s".notifications (
message_uuid TEXT NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, -- Built-in function
destination_uuid TEXT NOT NULL,
topic TEXT,
message TEXT NOT NULL,
created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint,
FOREIGN KEY (destination_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX idx_workflow_topic ON "%1$s".notifications (destination_uuid, topic);

-- Create notification function
CREATE OR REPLACE FUNCTION "%1$s".notifications_function() RETURNS TRIGGER AS $$
DECLARE
payload text := NEW.destination_uuid || '::' || NEW.topic;
BEGIN
PERFORM pg_notify('dbos_notifications_channel', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create notification trigger
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON "%1$s".notifications
FOR EACH ROW EXECUTE FUNCTION "%1$s".notifications_function();

CREATE TABLE "%1$s".workflow_events (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (workflow_uuid, key),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

-- Create events function
CREATE OR REPLACE FUNCTION "%1$s".workflow_events_function() RETURNS TRIGGER AS $$
DECLARE
payload text := NEW.workflow_uuid || '::' || NEW.key;
BEGIN
PERFORM pg_notify('dbos_workflow_events_channel', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create events trigger
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON "%1$s".workflow_events
FOR EACH ROW EXECUTE FUNCTION "%1$s".workflow_events_function();

CREATE TABLE "%1$s".streams (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
"offset" INT4 NOT NULL,
PRIMARY KEY (workflow_uuid, key, "offset"),
FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE "%1$s".event_dispatch_kv (
service_name TEXT NOT NULL,
workflow_fn_name TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
update_seq NUMERIC(38,0),
update_time NUMERIC(38,15),
PRIMARY KEY (service_name, workflow_fn_name, key)
);
Loading
Loading