Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
mux.HandleFunc(_WORKFLOW_STEPS_PATTERN, func(w http.ResponseWriter, r *http.Request) {
workflowID := r.PathValue("id")

steps, err := ctx.systemDB.getWorkflowSteps(ctx, workflowID)
steps, err := GetWorkflowSteps(ctx, workflowID)
if err != nil {
ctx.logger.Error("Failed to list workflow steps", "workflow_id", workflowID, "error", err)
http.Error(w, fmt.Sprintf("Failed to list steps: %v", err), http.StatusInternalServerError)
Expand Down
4 changes: 2 additions & 2 deletions dbos/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
}
c.logger.Debug("Handling list steps request", "request", req)

// Get workflow steps using the existing systemDB method
steps, err := c.dbosCtx.systemDB.getWorkflowSteps(c.dbosCtx, req.WorkflowID)
// Get workflow steps using the public GetWorkflowSteps method
steps, err := GetWorkflowSteps(c.dbosCtx, req.WorkflowID)
if err != nil {
c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err)
errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions dbos/conductor_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ type listStepsConductorResponse struct {
Output *[]workflowStepsConductorResponseBody `json:"output,omitempty"`
}

// formatWorkflowStepsResponseBody converts stepInfo to workflowStepsConductorResponseBody for the conductor protocol
func formatWorkflowStepsResponseBody(step stepInfo) workflowStepsConductorResponseBody {
// formatWorkflowStepsResponseBody converts StepInfo to workflowStepsConductorResponseBody for the conductor protocol
func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorResponseBody {
output := workflowStepsConductorResponseBody{
FunctionID: step.StepID,
FunctionName: step.StepName,
Expand Down
19 changes: 10 additions & 9 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ type DBOSContext interface {
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

// Workflow management
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
Expand All @@ -118,6 +118,7 @@ type DBOSContext interface {
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow

// Accessors
GetApplicationVersion() string // Get the application version for this context
Expand Down Expand Up @@ -328,7 +329,6 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {

// Initialize the queue runner and register DBOS internal queue
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)

// Initialize conductor if API key is provided
if config.ConductorAPIKey != "" {
Expand Down Expand Up @@ -383,6 +383,7 @@ func (c *dbosContext) Launch() error {
}

// Start the queue runner in a goroutine
NewWorkflowQueue(c, _DBOS_INTERNAL_QUEUE_NAME)
go func() {
c.queueRunner.run(c)
}()
Expand Down
64 changes: 47 additions & 17 deletions dbos/migrations/000001_initial_dbos_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE SCHEMA IF NOT EXISTS dbos;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Create workflow_status table
CREATE TABLE dbos.workflow_status (
CREATE TABLE IF NOT EXISTS dbos.workflow_status (
Copy link
Member

@kraftp kraftp Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should replace this entirely with the new migration system. Doesn't have to be in this PR, but should be done before release.

workflow_uuid TEXT PRIMARY KEY,
status TEXT,
name TEXT,
Expand All @@ -34,17 +34,27 @@ CREATE TABLE dbos.workflow_status (
);

-- Create indexes for workflow_status
CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at);
CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status);
CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at);
CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status);

-- Create unique constraint for queue_name and deduplication_id
ALTER TABLE dbos.workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id'
AND table_name = 'workflow_status'
AND table_schema = 'dbos'
) THEN
ALTER TABLE dbos.workflow_status
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
UNIQUE (queue_name, deduplication_id);
END IF;
END $$;

-- Create operation_outputs table
CREATE TABLE dbos.operation_outputs (
CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
workflow_uuid TEXT NOT NULL,
function_id INTEGER NOT NULL,
function_name TEXT NOT NULL DEFAULT '',
Expand All @@ -56,7 +66,7 @@ CREATE TABLE dbos.operation_outputs (
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TABLE dbos.notifications (
CREATE TABLE IF NOT EXISTS dbos.notifications (
destination_uuid TEXT NOT NULL,
topic TEXT,
message TEXT NOT NULL,
Expand All @@ -66,7 +76,7 @@ CREATE TABLE dbos.notifications (
ON UPDATE CASCADE ON DELETE CASCADE
);
-- Create index for notifications
CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic);

-- Create notification function
CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$
Expand All @@ -79,12 +89,22 @@ END;
$$ LANGUAGE plpgsql;

-- Create notification trigger
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON dbos.notifications
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.triggers
WHERE trigger_name = 'dbos_notifications_trigger'
AND event_object_table = 'notifications'
AND event_object_schema = 'dbos'
) THEN
CREATE TRIGGER dbos_notifications_trigger
AFTER INSERT ON dbos.notifications
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
END IF;
END $$;

-- Create workflow_events table
CREATE TABLE dbos.workflow_events (
CREATE TABLE IF NOT EXISTS dbos.workflow_events (
workflow_uuid TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
Expand All @@ -104,6 +124,16 @@ END;
$$ LANGUAGE plpgsql;

-- Create events trigger
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON dbos.workflow_events
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.triggers
WHERE trigger_name = 'dbos_workflow_events_trigger'
AND event_object_table = 'workflow_events'
AND event_object_schema = 'dbos'
) THEN
CREATE TRIGGER dbos_workflow_events_trigger
AFTER INSERT ON dbos.workflow_events
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
END IF;
END $$;
10 changes: 5 additions & 5 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestWorkflowQueues(t *testing.T) {
assert.Equal(t, "test-input", res)

// List steps: the workflow should have 1 step
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err)
assert.Len(t, steps, 1)
assert.Equal(t, 0, steps[0].StepID)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestWorkflowQueues(t *testing.T) {
assert.Equal(t, expectedResult, res)

// List steps: the workflow should have 2 steps (Start the child and GetResult)
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err)
assert.Len(t, steps, 2)
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
Expand All @@ -230,7 +230,7 @@ func TestWorkflowQueues(t *testing.T) {
assert.Equal(t, expectedResult, res)

// List steps: the workflow should have 2 steps (Start the child and GetResult)
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err)
assert.Len(t, steps, 2)
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
Expand All @@ -253,7 +253,7 @@ func TestWorkflowQueues(t *testing.T) {
assert.Equal(t, expectedResult, res)

// List steps: the workflow should have 2 steps (Start the child and GetResult)
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err)
assert.Len(t, steps, 2)
assert.Equal(t, "custom-name", steps[0].StepName)
Expand All @@ -279,7 +279,7 @@ func TestWorkflowQueues(t *testing.T) {

// Check that the parent workflow (the one we ran directly) has 2 steps:
// one for enqueueing the child and one for calling GetResult
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err)
assert.Len(t, steps, 2)
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
Expand Down
4 changes: 2 additions & 2 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestWorkflowEncoding(t *testing.T) {
assert.Equal(t, "workflow error: step error", workflow.Error.Error())

// Test results from GetWorkflowSteps
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
require.NoError(t, err)
require.Len(t, steps, 1)
step := steps[0]
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestWorkflowEncoding(t *testing.T) {
assert.Equal(t, "processed by encodingStepStruct", workflowOutput.B)

// Test results from GetWorkflowSteps
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
require.NoError(t, err)
require.Len(t, steps, 1)
step := steps[0]
Expand Down
25 changes: 15 additions & 10 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type systemDatabase interface {
// Steps
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error)
getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)

// Communication (special steps)
send(ctx context.Context, input WorkflowSendInput) error
Expand Down Expand Up @@ -1326,15 +1326,16 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
return result, nil
}

type stepInfo struct {
StepID int
StepName string
Output any
Error error
ChildWorkflowID string
// StepInfo contains information about a workflow step execution.
type StepInfo struct {
StepID int // The sequential ID of the step within the workflow
StepName string // The name of the step function
Output any // The output returned by the step (if any)
Error error // The error returned by the step (if any)
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
}

func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) {
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
query := `SELECT function_id, function_name, output, error, child_workflow_id
FROM dbos.operation_outputs
WHERE workflow_uuid = $1
Expand All @@ -1346,9 +1347,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]step
}
defer rows.Close()

var steps []stepInfo
var steps []StepInfo
for rows.Next() {
var step stepInfo
var step StepInfo
var outputString *string
var errorString *string
var childWorkflowID *string
Expand Down Expand Up @@ -2078,6 +2079,10 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
}
}

if maxTasks <= 0 {
return nil, nil
}

// Build the query to select workflows for dequeueing
// Use SKIP LOCKED when no global concurrency is set to avoid blocking,
// otherwise use NOWAIT to ensure consistent view across processes
Expand Down
31 changes: 30 additions & 1 deletion dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,7 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
// Call the context method to list workflows
workflows, err := c.systemDB.listWorkflows(c, dbInput)
if err != nil {
return nil, fmt.Errorf("failed to list workflows: %w", err)
return nil, err
}

return workflows, nil
Expand Down Expand Up @@ -2062,3 +2062,32 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
}
return ctx.ListWorkflows(ctx, opts...)
}

func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
return c.systemDB.getWorkflowSteps(c, workflowID)
}

// GetWorkflowSteps retrieves the execution steps of a workflow.
// Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs.
//
// Parameters:
// - ctx: DBOS context for the operation
// - workflowID: The unique identifier of the workflow
//
// Returns a slice of StepInfo structs containing information about each executed step.
//
// Example:
//
// steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id")
// if err != nil {
// log.Fatal(err)
// }
// for _, step := range steps {
// log.Printf("Step %d: %s", step.StepID, step.StepName)
// }
func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) {
if ctx == nil {
return nil, errors.New("ctx cannot be nil")
}
return ctx.GetWorkflowSteps(ctx, workflowID)
}
Loading
Loading