Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b65e443
Schema
chuck-dbos Jan 26, 2026
d03c0fe
Schemas
chuck-dbos Jan 26, 2026
cc242c4
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Jan 26, 2026
4dc8584
Separate concerns
chuck-dbos Jan 26, 2026
1f6450f
Add new serializer
chuck-dbos Jan 27, 2026
eaf584e
More s11n
chuck-dbos Jan 27, 2026
ac214db
Deserializer
chuck-dbos Jan 27, 2026
9671aae
Consolidate
chuck-dbos Jan 28, 2026
42067c4
Consolidate error parse
chuck-dbos Jan 28, 2026
1bba73d
Portable send
chuck-dbos Jan 28, 2026
cf07796
Event
chuck-dbos Jan 28, 2026
afc32e1
Improve factoring
chuck-dbos Jan 28, 2026
bad4cb8
Factoring
chuck-dbos Jan 28, 2026
2d6b7d3
WF serialized too
chuck-dbos Jan 28, 2026
2b27fe0
Cleanups
chuck-dbos Jan 28, 2026
12b0575
First client test
chuck-dbos Jan 28, 2026
8c6cd1f
Fix issues found by inspection
chuck-dbos Jan 29, 2026
748d299
Forgot to retrieve
chuck-dbos Jan 29, 2026
816c387
Include serialization in more spots
chuck-dbos Jan 29, 2026
296b3ec
Run 2 ways
chuck-dbos Jan 29, 2026
8242ed3
Test direct insert
chuck-dbos Jan 29, 2026
ec7b516
Verify event serialization
chuck-dbos Jan 29, 2026
4336711
Check stream serialization
chuck-dbos Jan 29, 2026
cd7afff
Use TEXT
chuck-dbos Jan 29, 2026
3804338
Test WF C+P
chuck-dbos Jan 29, 2026
7ea0685
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Jan 29, 2026
4d9d180
Portable workflow error
chuck-dbos Jan 29, 2026
d10ea9c
Snagged by code reformatter; improved
chuck-dbos Jan 29, 2026
40c02ec
Drive w/ client
chuck-dbos Jan 30, 2026
d7108f1
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Jan 31, 2026
e539d97
Portable enqueue (not type safe)
chuck-dbos Feb 3, 2026
4c81ccb
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Feb 4, 2026
cc4961c
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Feb 5, 2026
4b0292f
Portable enqueue via DBOS (vs client)
chuck-dbos Feb 5, 2026
14f3e96
Throw deserialized error
chuck-dbos Feb 5, 2026
c1db94c
Fix output mode for portable WFs
chuck-dbos Feb 5, 2026
b37ccc3
Throw the portable error in the portability case
chuck-dbos Feb 6, 2026
6559b47
Merge remote-tracking branch 'origin/main' into chuck/interop
chuck-dbos Feb 9, 2026
2d5f7bc
Remove `DBOS.enqueuePortable`
chuck-dbos Feb 9, 2026
42e5718
Forgot to remove unused includes
chuck-dbos Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export type SysDBSerializationFormat = string;

export interface workflow_status {
workflow_uuid: string;
status: string;
Expand Down Expand Up @@ -26,18 +28,21 @@ export interface workflow_status {
queue_partition_key?: string; // Partition key for partitioned queues.
forked_from?: string;
owner_xid?: string;
serialization: SysDBSerializationFormat | null;
}

export interface notifications {
destination_uuid: string;
topic: string;
message: string;
serialization: SysDBSerializationFormat | null;
}

export interface workflow_events {
workflow_uuid: string;
key: string;
value: string;
serialization: SysDBSerializationFormat | null;
}

export interface operation_outputs {
Expand All @@ -49,6 +54,7 @@ export interface operation_outputs {
function_name?: string;
started_at_epoch_ms?: number;
completed_at_epoch_ms?: number;
serialization: SysDBSerializationFormat | null; // Relevant only to getEvent / recv / etc.
}

export interface event_dispatch_kv {
Expand All @@ -69,13 +75,15 @@ export interface streams {
value: string;
offset: number;
function_id: number;
serialization: SysDBSerializationFormat | null;
}

export interface workflow_events_history {
workflow_uuid: string;
function_id: number;
key: string;
value: string;
serialization: SysDBSerializationFormat | null;
}

// This is the deserialized version of operation_outputs
Expand All @@ -88,3 +96,42 @@ export interface step_info {
started_at_epoch_ms?: number;
completed_at_epoch_ms?: number;
}

// This is system DB schema for portable inputs / outputs / messages / events / errors

// ---------- Canonical JSON value space ----------
// Note the absensce of "Date", etc.
// Canonical Date = RFC 3339 / ISO-8601 UTC string: YYYY-MM-DDTHH:mm:ss(.sss)Z
// This can be fixed with AJV (applied later)
export type JsonPrimitive = null | boolean | number | string;
export type JsonValue = JsonPrimitive | JsonObject | JsonArray;
export type JsonObject = { [k: string]: JsonValue };
export type JsonArray = JsonValue[];

// ---------- Workflow args + result ----------
export type JsonWorkflowArgs = {
positionalArgs?: JsonArray;
namedArgs?: JsonObject;
};
export type JsonWorkflowResult = JsonValue;
export interface JsonWorkflowErrorData {
name: string; // Error name
message: string; // Human-readable ;-) string
code?: number | string;
data?: JsonValue; // structured details (retryable, origin, etc.)
}

export class PortableWorkflowError extends Error {
constructor(
message: string,
readonly name: string,
readonly code?: number | string,
readonly data?: JsonValue,
) {
super(message);
}
}

// --------- Notification(Message) and WF event
export type JsonMessage = JsonValue;
export type JsonEvent = JsonValue;
65 changes: 56 additions & 9 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ import {
StatusString,
type StepInfo,
type WorkflowHandle,
WorkflowSerializationFormat,
type WorkflowStatus,
} from './workflow';
import { sleepms } from './utils';
import { DBOSJSON, DBOSSerializer } from './serialization';
import {
DBOSJSON,
DBOSSerializer,
deserializePositionalArgs,
deserializeValue,
serializeArgs,
serializeValue,
} from './serialization';
import {
forkWorkflow,
getWorkflow,
Expand Down Expand Up @@ -75,6 +83,14 @@ interface ClientEnqueueOptions {
*/
deduplicationID?: string;

/**
* Serialization to use for enqueued request
* Default is to use the serialization for JS/TS, as this is the most flexible
* If `portable_json` is specified, a more limited JSON serialization is used,
* allowing cross-language enqueues of workflows with simple semantics
*/
serializationType?: WorkflowSerializationFormat;

/**
* An optional priority for the workflow.
* Workflows with higher priority will be dequeued first.
Expand All @@ -87,6 +103,19 @@ interface ClientEnqueueOptions {
queuePartitionKey?: string;
}

/**
* Options for client send
*/
interface ClientSendOptions {
/**
* Serialization to use for sent message
* Default is to use the serialization for TS/JS, as this is the most flexible
* If `portable_json` is specified, a more limited JSON serialization is used,
* allowing cross-language message sends
*/
serializationType?: WorkflowSerializationFormat;
}

export class ClientHandle<R> implements WorkflowHandle<R> {
constructor(
readonly systemDatabase: SystemDatabase,
Expand Down Expand Up @@ -116,7 +145,7 @@ export class ClientHandle<R> implements WorkflowHandle<R> {

async getWorkflowInputs<T extends unknown[]>(): Promise<T> {
const status = (await this.systemDatabase.getWorkflowStatus(this.workflowUUID)) as WorkflowStatusInternal;
return this.systemDatabase.getSerializer().parse(status.input) as T;
return deserializePositionalArgs(status.input, status.serialization, this.systemDatabase.getSerializer()) as T;
}
}

Expand Down Expand Up @@ -184,6 +213,7 @@ export class DBOSClient {
const { workflowName, workflowClassName, workflowConfigName, queueName, appVersion } = options;
const workflowUUID = options.workflowID ?? randomUUID();

const serparam = serializeArgs(args, undefined, this.serializer, options?.serializationType);
const internalStatus: WorkflowStatusInternal = {
workflowUUID: workflowUUID,
status: StatusString.ENQUEUED,
Expand All @@ -203,10 +233,11 @@ export class DBOSClient {
createdAt: Date.now(),
timeoutMS: options.workflowTimeoutMS,
deadlineEpochMS: undefined,
input: this.serializer.stringify(args),
input: serparam.serializedValue,
deduplicationID: options.deduplicationID,
priority: options.priority ?? 0,
queuePartitionKey: options.queuePartitionKey,
serialization: serparam.serialization,
};

await this.systemDatabase.initWorkflowStatus(internalStatus, null);
Expand All @@ -222,8 +253,21 @@ export class DBOSClient {
* @param idempotencyKey - An optional idempotency key to ensure that the message is only sent once.
* @returns A Promise that resolves when the message has been sent.
*/
async send<T>(destinationID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void> {
async send<T>(
destinationID: string,
message: T,
topic?: string,
idempotencyKey?: string,
options?: ClientSendOptions,
): Promise<void> {
idempotencyKey ??= randomUUID();
const sermsg = serializeValue(message, this.serializer, options?.serializationType);
const srwfp = serializeArgs(
[destinationID, message, topic, options?.serializationType],
undefined,
this.serializer,
options?.serializationType,
);
const internalStatus: WorkflowStatusInternal = {
workflowUUID: `${destinationID}-${idempotencyKey}`,
status: StatusString.SUCCESS,
Expand All @@ -239,18 +283,20 @@ export class DBOSClient {
executorId: '',
applicationID: '',
createdAt: Date.now(),
input: this.serializer.stringify([destinationID, message, topic]),
input: srwfp.serializedValue,
deduplicationID: undefined,
priority: 0,
queuePartitionKey: undefined,
serialization: srwfp.serialization,
};
await this.systemDatabase.initWorkflowStatus(internalStatus, null);
await this.systemDatabase.send(
internalStatus.workflowUUID,
0,
destinationID,
this.serializer.stringify(message),
sermsg.serializedValue,
topic,
sermsg.serialization,
);
}

Expand All @@ -262,7 +308,8 @@ export class DBOSClient {
* @returns A Promise that resolves with the event payload.
*/
async getEvent<T>(workflowID: string, key: string, timeoutSeconds?: number): Promise<T | null> {
return this.serializer.parse(await this.systemDatabase.getEvent(workflowID, key, timeoutSeconds ?? 60)) as T;
const evt = await this.systemDatabase.getEvent(workflowID, key, timeoutSeconds ?? 60);
return deserializeValue(evt.serializedValue, evt.serialization, this.serializer) as T;
}

/**
Expand Down Expand Up @@ -320,10 +367,10 @@ export class DBOSClient {
while (true) {
try {
const value = await this.systemDatabase.readStream(workflowID, key, offset);
if (value === DBOS_STREAM_CLOSED_SENTINEL) {
if (value.serializedValue === DBOS_STREAM_CLOSED_SENTINEL) {
break;
}
yield value as T;
yield deserializeValue(value.serializedValue, value.serialization, this.serializer) as T;
offset += 1;
} catch (error: unknown) {
if (error instanceof Error && error.message.includes('No value found')) {
Expand Down
2 changes: 2 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AsyncLocalStorage } from 'async_hooks';
import { DBOSInvalidWorkflowTransitionError } from './error';
import Koa from 'koa';
import { DBOSExecutor } from './dbos-executor';
import { WorkflowSerializationFormat } from './workflow';

export interface StepStatus {
stepID: number;
Expand All @@ -23,6 +24,7 @@ export interface DBOSContextOptions {
operationType?: string; // A custom helper for users to set a operation type of their choice. Intended for functions setting a pctx to run DBOS operations from.
operationCaller?: string; // This is made to pass through the operationName to DBOS contexts, and potentially the caller span name.
workflowTimeoutMS?: number | null;
serializationType?: WorkflowSerializationFormat;
}

export interface DBOSLocalCtx extends DBOSContextOptions {
Expand Down
Loading