Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/sysdb_migrations/internal/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,5 +268,109 @@ export function allMigrations(
)`,
],
},
{
pg: [
`CREATE FUNCTION "${schemaName}".enqueue_workflow(
workflow_name TEXT,
queue_name TEXT,
positional_args JSON[] DEFAULT ARRAY[]::JSON[],
named_args JSON DEFAULT '{}'::JSON,
class_name TEXT DEFAULT NULL,
config_name TEXT DEFAULT NULL,
workflow_id TEXT DEFAULT NULL,
app_version TEXT DEFAULT NULL,
timeout_ms BIGINT DEFAULT NULL,
deadline_epoch_ms BIGINT DEFAULT NULL,
deduplication_id TEXT DEFAULT NULL,
priority INTEGER DEFAULT NULL,
queue_partition_key TEXT DEFAULT NULL
) RETURNS TEXT AS $$
DECLARE
v_workflow_id TEXT;
v_serialized_inputs TEXT;
v_owner_xid TEXT;
v_now BIGINT;
v_recovery_attempts INTEGER := 0;
v_priority INTEGER;
BEGIN

-- Validate required parameters
IF workflow_name IS NULL OR workflow_name = '' THEN
RAISE EXCEPTION 'Workflow name cannot be null or empty';
END IF;
IF queue_name IS NULL OR queue_name = '' THEN
RAISE EXCEPTION 'Queue name cannot be null or empty';
END IF;
IF named_args IS NOT NULL AND jsonb_typeof(named_args::jsonb) != 'object' THEN
RAISE EXCEPTION 'Named args must be a JSON object';
END IF;
IF workflow_id IS NOT NULL AND workflow_id = '' THEN
RAISE EXCEPTION 'Workflow ID cannot be an empty string if provided.';
END IF;

v_workflow_id := COALESCE(workflow_id, gen_random_uuid()::TEXT);
v_owner_xid := gen_random_uuid()::TEXT;
v_priority := COALESCE(priority, 0);
v_serialized_inputs := json_build_object(
'positionalArgs', positional_args,
'namedArgs', named_args
)::TEXT;
v_now := EXTRACT(epoch FROM now()) * 1000;

INSERT INTO "${schemaName}".workflow_status (
workflow_uuid, status, inputs,
name, class_name, config_name,
queue_name, deduplication_id, priority, queue_partition_key,
application_version,
created_at, updated_at, recovery_attempts,
workflow_timeout_ms, workflow_deadline_epoch_ms,
parent_workflow_id, owner_xid, serialization
) VALUES (
v_workflow_id, 'ENQUEUED', v_serialized_inputs,
workflow_name, class_name, config_name,
queue_name, deduplication_id, v_priority, queue_partition_key,
app_version,
v_now, v_now, v_recovery_attempts,
timeout_ms, deadline_epoch_ms,
NULL, v_owner_xid, 'portable_json'
)
ON CONFLICT (workflow_uuid)
DO UPDATE SET
updated_at = EXCLUDED.updated_at;

RETURN v_workflow_id;

EXCEPTION
WHEN unique_violation THEN
RAISE EXCEPTION 'DBOS queue duplicated'
USING DETAIL = format('Workflow %s with queue %s and deduplication ID %s already exists', v_workflow_id, queue_name, deduplication_id),
ERRCODE = 'unique_violation';
END;
$$ LANGUAGE plpgsql;`,
`CREATE FUNCTION "${schemaName}".send_message(
destination_id TEXT,
message JSON,
topic TEXT DEFAULT NULL,
message_id TEXT DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_topic TEXT := COALESCE(topic, '__null__topic__');
v_message_id TEXT := COALESCE(message_id, gen_random_uuid()::TEXT);
BEGIN
INSERT INTO "${schemaName}".notifications (
destination_uuid, topic, message, message_uuid, serialization
) VALUES (
destination_id, v_topic, message, v_message_id, 'portable_json'
)
ON CONFLICT (message_uuid) DO NOTHING;
EXCEPTION
WHEN foreign_key_violation THEN
RAISE EXCEPTION 'DBOS non-existent workflow'
USING DETAIL = format('Destination workflow %s does not exist', destination_id),
ERRCODE = 'foreign_key_violation';
END;
$$ LANGUAGE plpgsql;`,
],
},
];
}
52 changes: 52 additions & 0 deletions tests/cockroachdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,56 @@ describeIf('cockroachdb', () => {
}
expect(readValues).toEqual(testValues);
});

test('pg-enqueue', async () => {
const client = new Client(config.systemDatabaseUrl);
let wfid: string;

try {
await client.connect();

// Use PostgreSQL function to enqueue
const enqueueResult = await client.query<{ enqueue_workflow: string }>(
`
SELECT dbos.enqueue_workflow(
'testWorkflow',
'crdb-test-queue',
ARRAY[$1::JSON],
'{}'::JSON,
'CRDBTestClass'
)
`,
[JSON.stringify('queued')],
);

expect(enqueueResult.rowCount).toEqual(1);
wfid = enqueueResult.rows[0].enqueue_workflow;
} finally {
await client.end();
}

const handle = DBOS.retrieveWorkflow<string>(wfid);
const status = await handle.getStatus();
expect(status).toBeDefined();

const result = await handle.getResult();
expect(result).toBe('QUEUED');
});

test('pg-send', async () => {
const handle = await DBOS.startWorkflow(CRDBTestClass).receiveWorkflow();

const client = new Client(config.systemDatabaseUrl);
try {
await client.connect();

await client.query<{ enqueue_workflow: string }>(`SELECT dbos.send_message($1, $2)`, [
handle.workflowID,
JSON.stringify('hello'),
]);
} finally {
await client.end();
}
expect(await handle.getResult()).toBe('hello');
}, 10000);
});
208 changes: 208 additions & 0 deletions tests/pg-client-functions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { workflow_status } from '../schemas/system_db_schema';
import { DBOS, WorkflowQueue } from '../src';
import { generateDBOSTestConfig, setUpDBOSTestSysDb } from './helpers';
import { Client, PoolConfig } from 'pg';
import { DBOSConfig } from '../src/dbos-executor';

const testQueue = new WorkflowQueue('test-queue');

class ClientTest {
@DBOS.workflow()
static async enqueueTest(
numVal: number,
strVal: string,
objVal: { first: string; last: string; age: number },
): Promise<string> {
return Promise.resolve(`${numVal}-${strVal}-${JSON.stringify(objVal)}`);
}

@DBOS.workflow()
static async sendTest(topic?: string) {
return await DBOS.recv<string>(topic, 60);
}
}

describe('PostgreSQL Client Functions', () => {
let config: DBOSConfig;
let poolConfig: PoolConfig;

beforeAll(async () => {
config = generateDBOSTestConfig();
expect(config.systemDatabaseUrl).toBeDefined();
poolConfig = { connectionString: config.systemDatabaseUrl };
await setUpDBOSTestSysDb(config);
DBOS.setConfig(config);
});

// Test PostgreSQL send message function
test('pg-send-message-function', async () => {
const message = `Hello from PG function! ${Date.now()}`;
const topic = `test-topic-${Date.now()}`;

try {
await DBOS.launch();

const handle = await DBOS.startWorkflow(ClientTest).sendTest(topic);

// Send message using PostgreSQL function
const dbClient = new Client(poolConfig);
await dbClient.connect();

await dbClient.query(
`
SELECT dbos.send_message(
destination_id => $1,
message => $2::JSON,
topic => $3
)
`,
[handle.workflowID, JSON.stringify(message), topic],
);

await dbClient.end();

const result = await handle.getResult();
expect(result).toBe(message);
} finally {
await DBOS.shutdown();
}
}, 20000);

// Test error handling for non-existent workflow
test('pg-send-to-nonexistent-workflow', async () => {
const nonExistentID = `nonexistent-${Date.now()}`;
const message = 'Test message';
const dbClient = new Client(poolConfig);

try {
await dbClient.connect();

await expect(
dbClient.query(
`
SELECT dbos.send_message(
destination_id => $1,
message => $2::JSON
)
`,
[nonExistentID, JSON.stringify(message)],
),
).rejects.toThrow();
} finally {
await dbClient.end();
}
});

// Test PostgreSQL enqueue function
test('pg-enqueue', async () => {
const wfid = `pg-db-test-${Date.now()}`;
const dbClient = new Client(poolConfig);

try {
await dbClient.connect();

// Use PostgreSQL function to enqueue
const enqueueResult = await dbClient.query<{ enqueue_workflow: string }>(
`
SELECT dbos.enqueue_workflow(
workflow_name => 'enqueueTest',
queue_name => $5,
class_name => 'ClientTest',
workflow_id => $1,
positional_args => ARRAY[$2::JSON, $3::JSON, $4::JSON]
)
`,
[
wfid,
JSON.stringify(42),
JSON.stringify('test'),
JSON.stringify({ first: 'John', last: 'Doe', age: 30 }),
testQueue.name,
],
);

expect(enqueueResult.rowCount).toEqual(1);
expect(enqueueResult.rows[0].enqueue_workflow).toEqual(wfid);

// Verify workflow was enqueued in database
const checkResult = await dbClient.query<workflow_status>(
'SELECT * FROM dbos.workflow_status WHERE workflow_uuid = $1',
[wfid],
);

expect(checkResult.rows).toHaveLength(1);
expect(checkResult.rows[0].workflow_uuid).toBe(wfid);
expect(checkResult.rows[0].status).toBe('ENQUEUED');
} finally {
await dbClient.end();
}

try {
await DBOS.launch();

const handle = DBOS.retrieveWorkflow<string>(wfid);
const status = await handle.getStatus();
expect(status).toBeDefined();

const result = await handle.getResult();
expect(result).toBe('42-test-{"first":"John","last":"Doe","age":30}');
} finally {
await DBOS.shutdown();
}
}, 20000);

test('pg-enqueue-gen-wfid', async () => {
const dbClient = new Client(poolConfig);
let wfid: string;

try {
await dbClient.connect();

// Use PostgreSQL function to enqueue
const enqueueResult = await dbClient.query<{ enqueue_workflow: string }>(
`
SELECT dbos.enqueue_workflow(
workflow_name => 'enqueueTest',
queue_name => $4,
class_name => 'ClientTest',
positional_args => ARRAY[$1::JSON, $2::JSON, $3::JSON]
)
`,
[
JSON.stringify(42),
JSON.stringify('test'),
JSON.stringify({ first: 'John', last: 'Doe', age: 30 }),
testQueue.name,
],
);

expect(enqueueResult.rowCount).toEqual(1);
wfid = enqueueResult.rows[0].enqueue_workflow;

// Verify workflow was enqueued in database
const checkResult = await dbClient.query<workflow_status>(
'SELECT * FROM dbos.workflow_status WHERE workflow_uuid = $1',
[wfid],
);

expect(checkResult.rows).toHaveLength(1);
expect(checkResult.rows[0].workflow_uuid).toBe(wfid);
expect(checkResult.rows[0].status).toBe('ENQUEUED');
} finally {
await dbClient.end();
}

try {
await DBOS.launch();

const handle = DBOS.retrieveWorkflow<string>(wfid);
const status = await handle.getStatus();
expect(status).toBeDefined();

const result = await handle.getResult();
expect(result).toBe('42-test-{"first":"John","last":"Doe","age":30}');
} finally {
await DBOS.shutdown();
}
}, 20000);
});