Skip to content

Commit 0a801e5

Browse files
committed
add client GetWorkflowSteps
1 parent 3a00f5a commit 0a801e5

File tree

4 files changed

+95
-7
lines changed

4 files changed

+95
-7
lines changed

dbos/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Client interface {
3131
CancelWorkflow(workflowID string) error
3232
ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
3333
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
34+
GetWorkflowSteps(workflowID string) ([]StepInfo, error)
3435
Shutdown(timeout time.Duration) // Simply close the system DB connection pool
3536
}
3637

@@ -295,6 +296,11 @@ func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], err
295296
return c.dbosCtx.ForkWorkflow(c.dbosCtx, input)
296297
}
297298

299+
// GetWorkflowSteps retrieves the execution steps of a workflow.
300+
func (c *client) GetWorkflowSteps(workflowID string) ([]StepInfo, error) {
301+
return c.dbosCtx.GetWorkflowSteps(c.dbosCtx, workflowID)
302+
}
303+
298304
// Shutdown gracefully shuts down the client and closes the system database connection.
299305
func (c *client) Shutdown(timeout time.Duration) {
300306
// Get the concrete dbosContext to access internal fields

dbos/client_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,3 +928,69 @@ func TestListWorkflows(t *testing.T) {
928928
// Verify all queue entries are cleaned up
929929
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests")
930930
}
931+
932+
func TestGetWorkflowSteps(t *testing.T) {
933+
// Setup server context
934+
serverCtx := setupDBOS(t, true, true)
935+
936+
// Create queue for communication
937+
queue := NewWorkflowQueue(serverCtx, "get-workflow-steps-queue")
938+
939+
// Workflow with one step
940+
stepFunction := func(ctx context.Context) (string, error) {
941+
return "abc", nil
942+
}
943+
944+
testWorkflow := func(ctx DBOSContext, input string) (string, error) {
945+
result, err := RunAsStep(ctx, stepFunction)
946+
if err != nil {
947+
return "", err
948+
}
949+
return result, nil
950+
}
951+
RegisterWorkflow(serverCtx, testWorkflow, WithWorkflowName("TestWorkflow"))
952+
953+
// Launch server
954+
err := Launch(serverCtx)
955+
require.NoError(t, err)
956+
957+
// Setup client
958+
databaseURL := getDatabaseURL()
959+
config := ClientConfig{
960+
DatabaseURL: databaseURL,
961+
}
962+
client, err := NewClient(context.Background(), config)
963+
require.NoError(t, err)
964+
t.Cleanup(func() {
965+
if client != nil {
966+
client.Shutdown(30 * time.Second)
967+
}
968+
})
969+
970+
// Enqueue and run the workflow
971+
workflowID := "test-get-workflow-steps"
972+
handle, err := Enqueue[string, string](client, queue.Name, "TestWorkflow", "test-input", WithEnqueueWorkflowID(workflowID))
973+
require.NoError(t, err)
974+
975+
// Wait for workflow to complete
976+
result, err := handle.GetResult()
977+
require.NoError(t, err)
978+
assert.Equal(t, "abc", result)
979+
980+
// Test GetWorkflowSteps with loadOutput = true
981+
stepsWithOutput, err := client.GetWorkflowSteps(workflowID)
982+
require.NoError(t, err)
983+
require.Len(t, stepsWithOutput, 1, "expected exactly 1 step")
984+
985+
step := stepsWithOutput[0]
986+
assert.Equal(t, 0, step.StepID, "expected step ID to be 0")
987+
assert.NotEmpty(t, step.StepName, "expected step name to be set")
988+
assert.Nil(t, step.Error, "expected no error in step")
989+
assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID")
990+
991+
// Verify the output wasn't loaded
992+
require.Nil(t, step.Output, "expected output not to be loaded")
993+
994+
// Verify all queue entries are cleaned up
995+
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after get workflow steps test")
996+
}

dbos/system_database.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type systemDatabase interface {
5151
// Steps
5252
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
5353
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
54-
getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)
54+
getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error)
5555

5656
// Communication (special steps)
5757
send(ctx context.Context, input WorkflowSendInput) error
@@ -1457,13 +1457,18 @@ type StepInfo struct {
14571457
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
14581458
}
14591459

1460-
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
1460+
type getWorkflowStepsInput struct {
1461+
workflowID string
1462+
loadOutput bool
1463+
}
1464+
1465+
func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error) {
14611466
query := fmt.Sprintf(`SELECT function_id, function_name, output, error, child_workflow_id
14621467
FROM %s.operation_outputs
14631468
WHERE workflow_uuid = $1
14641469
ORDER BY function_id ASC`, pgx.Identifier{s.schema}.Sanitize())
14651470

1466-
rows, err := s.pool.Query(ctx, query, workflowID)
1471+
rows, err := s.pool.Query(ctx, query, input.workflowID)
14671472
if err != nil {
14681473
return nil, fmt.Errorf("failed to query workflow steps: %w", err)
14691474
}
@@ -1481,8 +1486,8 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]Step
14811486
return nil, fmt.Errorf("failed to scan step row: %w", err)
14821487
}
14831488

1484-
// Deserialize output if present
1485-
if outputString != nil {
1489+
// Deserialize output if present and loadOutput is true
1490+
if input.loadOutput && outputString != nil {
14861491
output, err := deserialize(outputString)
14871492
if err != nil {
14881493
return nil, fmt.Errorf("failed to deserialize output: %w", err)

dbos/workflow.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,14 +1956,25 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
19561956
}
19571957

19581958
func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
1959+
var loadOutput bool
1960+
if c.launched.Load() {
1961+
loadOutput = true
1962+
} else {
1963+
loadOutput = false
1964+
}
1965+
getWorkflowStepsInput := getWorkflowStepsInput{
1966+
workflowID: workflowID,
1967+
loadOutput: loadOutput,
1968+
}
1969+
19591970
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
19601971
isWithinWorkflow := ok && workflowState != nil
19611972
if isWithinWorkflow {
19621973
return RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) {
1963-
return c.systemDB.getWorkflowSteps(ctx, workflowID)
1974+
return c.systemDB.getWorkflowSteps(ctx, getWorkflowStepsInput)
19641975
}, WithStepName("DBOS.getWorkflowSteps"))
19651976
} else {
1966-
return c.systemDB.getWorkflowSteps(c, workflowID)
1977+
return c.systemDB.getWorkflowSteps(c, getWorkflowStepsInput)
19671978
}
19681979
}
19691980

0 commit comments

Comments
 (0)