Skip to content

Commit 219a647

Browse files
committed
internal stepInfo with *string step output
1 parent 5443e4f commit 219a647

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

dbos/system_database.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type systemDatabase interface {
5151
// Steps
5252
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
5353
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
54-
getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error)
54+
getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]stepInfo, error)
5555

5656
// Communication (special steps)
5757
send(ctx context.Context, input WorkflowSendInput) error
@@ -1238,8 +1238,8 @@ type recordChildWorkflowDBInput struct {
12381238

12391239
func (s *sysDB) recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error {
12401240
query := fmt.Sprintf(`INSERT INTO %s.operation_outputs
1241-
(workflow_uuid, function_id, function_name, child_workflow_id, output)
1242-
VALUES ($1, $2, $3, $4, '')`, pgx.Identifier{s.schema}.Sanitize())
1241+
(workflow_uuid, function_id, function_name, child_workflow_id)
1242+
VALUES ($1, $2, $3, $4)`, pgx.Identifier{s.schema}.Sanitize())
12431243

12441244
var commandTag pgconn.CommandTag
12451245
var err error
@@ -1415,20 +1415,20 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
14151415
}
14161416

14171417
// StepInfo contains information about a workflow step execution.
1418-
type StepInfo struct {
1419-
StepID int // The sequential ID of the step within the workflow
1420-
StepName string // The name of the step function
1421-
Output any // The output returned by the step (if any)
1422-
Error error // The error returned by the step (if any)
1423-
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
1418+
type stepInfo struct {
1419+
StepID int // The sequential ID of the step within the workflow
1420+
StepName string // The name of the step function
1421+
Output *string // The output returned by the step (if any)
1422+
Error error // The error returned by the step (if any)
1423+
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
14241424
}
14251425

14261426
type getWorkflowStepsInput struct {
14271427
workflowID string
14281428
loadOutput bool
14291429
}
14301430

1431-
func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error) {
1431+
func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]stepInfo, error) {
14321432
query := fmt.Sprintf(`SELECT function_id, function_name, output, error, child_workflow_id
14331433
FROM %s.operation_outputs
14341434
WHERE workflow_uuid = $1
@@ -1440,9 +1440,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInpu
14401440
}
14411441
defer rows.Close()
14421442

1443-
var steps []StepInfo
1443+
var steps []stepInfo
14441444
for rows.Next() {
1445-
var step StepInfo
1445+
var step stepInfo
14461446
var outputString *string
14471447
var errorString *string
14481448
var childWorkflowID *string

dbos/workflow.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2144,6 +2144,14 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
21442144
return ctx.ListWorkflows(ctx, opts...)
21452145
}
21462146

2147+
type StepInfo struct {
2148+
StepID int // The sequential ID of the step within the workflow
2149+
StepName string // The name of the step function
2150+
Output any // The output returned by the step (if any)
2151+
Error error // The error returned by the step (if any)
2152+
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
2153+
}
2154+
21472155
func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
21482156
var loadOutput bool
21492157
if c.launched.Load() {
@@ -2156,12 +2164,12 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step
21562164
loadOutput: loadOutput,
21572165
}
21582166

2159-
var steps []StepInfo
2167+
var steps []stepInfo
21602168
var err error
21612169
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
21622170
isWithinWorkflow := ok && workflowState != nil
21632171
if isWithinWorkflow {
2164-
steps, err = RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) {
2172+
steps, err = RunAsStep(c, func(ctx context.Context) ([]stepInfo, error) {
21652173
return c.systemDB.getWorkflowSteps(ctx, getWorkflowStepsInput)
21662174
}, WithStepName("DBOS.getWorkflowSteps"))
21672175
} else {
@@ -2170,24 +2178,31 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step
21702178
if err != nil {
21712179
return nil, err
21722180
}
2181+
stepInfos := make([]StepInfo, len(steps))
2182+
for i, step := range steps {
2183+
stepInfos[i] = StepInfo{
2184+
StepID: step.StepID,
2185+
StepName: step.StepName,
2186+
Output: step.Output,
2187+
Error: step.Error,
2188+
ChildWorkflowID: step.ChildWorkflowID,
2189+
}
2190+
}
21732191

21742192
// Deserialize outputs if asked to
21752193
if loadOutput {
21762194
serializer := newGobSerializer[any]()
21772195
for i := range steps {
2178-
encodedOutput, ok := steps[i].Output.(*string)
2179-
if !ok {
2180-
return nil, fmt.Errorf("step output must be encoded string, got %T", steps[i].Output)
2181-
}
2196+
encodedOutput := steps[i].Output
21822197
decodedOutput, err := serializer.Decode(encodedOutput)
21832198
if err != nil {
21842199
return nil, fmt.Errorf("failed to deserialize step output for step %d: %w", steps[i].StepID, err)
21852200
}
2186-
steps[i].Output = decodedOutput
2201+
stepInfos[i].Output = decodedOutput
21872202
}
21882203
}
21892204

2190-
return steps, nil
2205+
return stepInfos, nil
21912206
}
21922207

21932208
// GetWorkflowSteps retrieves the execution steps of a workflow.

0 commit comments

Comments
 (0)