Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@
public class MigrationManager {

private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
private static final List<String> IGNORABLE_SQL_STATES =
List.of(
// Relation / object already exists
"42P07", // duplicate_table
"42710", // duplicate_object (e.g., index)
"42701", // duplicate_column
"42P06", // duplicate_schema
// Uniqueness (e.g., insert seed rows twice)
"23505" // unique_violation
);

public static void runMigrations(DBOSConfig config) {
Objects.requireNonNull(config, "DBOS Config must not be null");
Expand Down Expand Up @@ -197,28 +187,19 @@ static void runDbosMigrations(Connection conn, String schema, List<String> migra
try (var stmt = conn.createStatement()) {
stmt.execute(migrations.get(i));
} catch (SQLException e) {
if (IGNORABLE_SQL_STATES.contains(e.getSQLState())) {
logger.warn(
"Ignoring migration {} error; Migration was likely already applied. Occurred while executing {}",
migrationIndex,
migrations.get(i));
} else {
throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e);
}
throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e);
}

try {
int rowCount = 0;
var updateSQL = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema);
try (var stmt = conn.prepareStatement(updateSQL)) {
stmt.setLong(1, migrationIndex);
rowCount = stmt.executeUpdate();
}

if (rowCount == 0) {
var insertSql =
"INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema);
try (var stmt = conn.prepareStatement(insertSql)) {
if (lastApplied == 0) {
var sql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema);
try (var stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, migrationIndex);
stmt.executeUpdate();
}
} else {
var sql = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema);
try (var stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, migrationIndex);
stmt.executeUpdate();
}
Expand Down Expand Up @@ -246,7 +227,9 @@ public static List<String> getMigrations(String schema) {
migration9,
migration10,
migration11,
migration12);
migration12,
migration13,
migration14);
return migrations.stream().map(m -> m.formatted(schema)).toList();
}

Expand Down Expand Up @@ -461,4 +444,141 @@ IF NOT EXISTS (
ALTER TABLE "%1$s"."notifications" ADD COLUMN "consumed" BOOLEAN NOT NULL DEFAULT FALSE;
CREATE INDEX "idx_notifications_unconsumed" ON "%1$s"."notifications" ("destination_uuid", "topic") WHERE consumed = FALSE;
""";

static final String migration13 =
"""
CREATE TABLE "%1$s".application_versions (
version_id TEXT NOT NULL PRIMARY KEY,
version_name TEXT NOT NULL UNIQUE,
version_timestamp BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint,
created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint
);
""";

static final String migration14 =
"""
CREATE FUNCTION "%1$s".enqueue_workflow(
workflow_name TEXT,
queue_name TEXT,
positional_args JSON[] DEFAULT ARRAY[]::JSON[],
named_args JSON DEFAULT '{}'::JSON,
class_name TEXT DEFAULT NULL,
config_name TEXT DEFAULT NULL,
workflow_id TEXT DEFAULT NULL,
app_version TEXT DEFAULT NULL,
timeout_ms BIGINT DEFAULT NULL,
deadline_epoch_ms BIGINT DEFAULT NULL,
deduplication_id TEXT DEFAULT NULL,
priority INTEGER DEFAULT NULL,
queue_partition_key TEXT DEFAULT NULL
) RETURNS TEXT AS $$
DECLARE
v_workflow_id TEXT;
v_serialized_inputs TEXT;
v_owner_xid TEXT;
v_now BIGINT;
v_recovery_attempts INTEGER := 0;
v_priority INTEGER;
v_existing_name TEXT;
v_existing_class_name TEXT;
v_existing_config_name TEXT;
BEGIN

-- Validate required parameters
IF workflow_name IS NULL OR workflow_name = '' THEN
RAISE EXCEPTION 'Workflow name cannot be null or empty';
END IF;
IF queue_name IS NULL OR queue_name = '' THEN
RAISE EXCEPTION 'Queue name cannot be null or empty';
END IF;
IF named_args IS NOT NULL AND jsonb_typeof(named_args::jsonb) != 'object' THEN
RAISE EXCEPTION 'Named args must be a JSON object';
END IF;
IF workflow_id IS NOT NULL AND workflow_id = '' THEN
RAISE EXCEPTION 'Workflow ID cannot be an empty string if provided.';
END IF;

v_workflow_id := COALESCE(workflow_id, gen_random_uuid()::TEXT);
v_owner_xid := gen_random_uuid()::TEXT;
v_priority := COALESCE(priority, 0);
v_serialized_inputs := json_build_object(
'positionalArgs', positional_args,
'namedArgs', named_args
)::TEXT;
v_now := EXTRACT(epoch FROM now()) * 1000;

INSERT INTO "%1$s".workflow_status (
workflow_uuid, status, inputs,
name, class_name, config_name,
queue_name, deduplication_id, priority, queue_partition_key,
application_version,
created_at, updated_at, recovery_attempts,
workflow_timeout_ms, workflow_deadline_epoch_ms,
parent_workflow_id, owner_xid, serialization
) VALUES (
v_workflow_id, 'ENQUEUED', v_serialized_inputs,
workflow_name, class_name, config_name,
queue_name, deduplication_id, v_priority, queue_partition_key,
app_version,
v_now, v_now, v_recovery_attempts,
timeout_ms, deadline_epoch_ms,
NULL, v_owner_xid, 'portable_json'
)
ON CONFLICT (workflow_uuid)
DO UPDATE SET
updated_at = EXCLUDED.updated_at
RETURNING workflow_status.name, workflow_status.class_name, workflow_status.config_name
INTO v_existing_name, v_existing_class_name, v_existing_config_name;

-- Validate workflow metadata matches
IF v_existing_name IS DISTINCT FROM workflow_name THEN
RAISE EXCEPTION 'Conflicting DBOS workflow name'
USING DETAIL = format('Workflow %%s exists with name %%s, but the provided workflow name is: %%s', v_workflow_id, v_existing_name, workflow_name),
ERRCODE = 'invalid_parameter_value';
END IF;
IF v_existing_class_name IS DISTINCT FROM class_name THEN
RAISE EXCEPTION 'Conflicting DBOS workflow class_name'
USING DETAIL = format('Workflow %%s exists with class_name %%s, but the provided class_name is: %%s', v_workflow_id, v_existing_class_name, class_name),
ERRCODE = 'invalid_parameter_value';
END IF;
IF v_existing_config_name IS DISTINCT FROM config_name THEN
RAISE EXCEPTION 'Conflicting DBOS workflow config_name'
USING DETAIL = format('Workflow %%s exists with config_name %%s, but the provided config_name is: %%s', v_workflow_id, v_existing_config_name, config_name),
ERRCODE = 'invalid_parameter_value';
END IF;

RETURN v_workflow_id;

EXCEPTION
WHEN unique_violation THEN
RAISE EXCEPTION 'DBOS queue duplicated'
USING DETAIL = format('Workflow %%s with queue %%s and deduplication ID %%s already exists', v_workflow_id, queue_name, deduplication_id),
ERRCODE = 'unique_violation';
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION "%1$s".send_message(
destination_id TEXT,
message JSON,
topic TEXT DEFAULT NULL,
idempotency_key TEXT DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_topic TEXT := COALESCE(topic, '__null__topic__');
v_message_id TEXT := COALESCE(idempotency_key, gen_random_uuid()::TEXT);
BEGIN
INSERT INTO "%1$s".notifications (
destination_uuid, topic, message, message_uuid, serialization
) VALUES (
destination_id, v_topic, message, v_message_id, 'portable_json'
)
ON CONFLICT (message_uuid) DO NOTHING;
EXCEPTION
WHEN foreign_key_violation THEN
RAISE EXCEPTION 'DBOS non-existent workflow'
USING DETAIL = format('Destination workflow %%s does not exist', destination_id),
ERRCODE = 'foreign_key_violation';
END;
$$ LANGUAGE plpgsql;
""";
}
14 changes: 14 additions & 0 deletions transact/src/test/java/dev/dbos/transact/client/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException;
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
import dev.dbos.transact.utils.DBUtils;
import dev.dbos.transact.workflow.Queue;

Expand Down Expand Up @@ -192,4 +193,17 @@ public void clientEnqueueTimeouts() throws Exception {
stat2.orElseThrow(() -> new AssertionError("Workflow status not found")).status());
}
}

@Test
public void invalidSend() throws Exception {
var invalidWorkflowId = UUID.randomUUID().toString();

try (var client = new DBOSClient(dbUrl, dbUser, dbPassword)) {
var ex =
assertThrows(
DBOSNonExistentWorkflowException.class,
() -> client.send(invalidWorkflowId, "test.message", null, null));
assertTrue(ex.getMessage().contains(invalidWorkflowId));
}
}
}
Loading