Skip to content

Commit af4073d

Browse files
authored
Fix child steps ids and get result recording (#48)
- Fix child step ID generation - Fix test - Rename some dangling names - Fix recording of getResult as a step in a polling handle
1 parent e60bb0d commit af4073d

File tree

3 files changed

+233
-72
lines changed

3 files changed

+233
-72
lines changed

dbos/system_database.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -975,8 +975,8 @@ func (s *systemDatabase) CheckOperationExecution(ctx context.Context, input chec
975975
}
976976

977977
type stepInfo struct {
978-
FunctionID int
979-
FunctionName string
978+
StepID int
979+
StepName string
980980
Output any
981981
Error error
982982
ChildWorkflowID string
@@ -1000,7 +1000,7 @@ func (s *systemDatabase) GetWorkflowSteps(ctx context.Context, workflowID string
10001000
var errorString *string
10011001
var childWorkflowID *string
10021002

1003-
err := rows.Scan(&step.FunctionID, &step.FunctionName, &outputString, &errorString, &childWorkflowID)
1003+
err := rows.Scan(&step.StepID, &step.StepName, &outputString, &errorString, &childWorkflowID)
10041004
if err != nil {
10051005
return nil, fmt.Errorf("failed to scan step row: %w", err)
10061006
}

dbos/workflow.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,24 @@ func (h *workflowHandle[R]) GetResult() (R, error) {
9797
return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?")
9898
}
9999
// If we are calling GetResult inside a workflow, record the result as a step result
100-
parentWorkflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState)
101-
isChildWorkflow := ok && parentWorkflowState != nil
102-
if isChildWorkflow {
100+
workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState)
101+
isWithinWorkflow := ok && workflowState != nil
102+
if isWithinWorkflow {
103103
encodedOutput, encErr := serialize(outcome.result)
104104
if encErr != nil {
105-
return *new(R), newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr))
105+
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr))
106106
}
107107
recordGetResultInput := recordChildGetResultDBInput{
108-
parentWorkflowID: parentWorkflowState.workflowID,
108+
parentWorkflowID: workflowState.workflowID,
109109
childWorkflowID: h.workflowID,
110-
stepID: parentWorkflowState.NextStepID(),
110+
stepID: workflowState.NextStepID(),
111111
output: encodedOutput,
112112
err: outcome.err,
113113
}
114114
recordResultErr := h.dbosContext.(*dbosContext).systemDB.RecordChildGetResult(h.dbosContext, recordGetResultInput)
115115
if recordResultErr != nil {
116116
h.dbosContext.(*dbosContext).logger.Error("failed to record get result", "error", recordResultErr)
117-
return *new(R), newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("recording child workflow result: %v", recordResultErr))
117+
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Sprintf("recording child workflow result: %v", recordResultErr))
118118
}
119119
}
120120
return outcome.result, outcome.err
@@ -144,8 +144,6 @@ type workflowPollingHandle[R any] struct {
144144
}
145145

146146
func (h *workflowPollingHandle[R]) GetResult() (R, error) {
147-
// FIXME this should use a context available to the user, so they can cancel it instead of infinite waiting
148-
ctx := context.Background()
149147
result, err := h.dbosContext.(*dbosContext).systemDB.AwaitWorkflowResult(h.dbosContext, h.workflowID)
150148
if result != nil {
151149
typedResult, ok := result.(R)
@@ -154,17 +152,17 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) {
154152
return *new(R), newWorkflowUnexpectedResultType(h.workflowID, fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", result))
155153
}
156154
// If we are calling GetResult inside a workflow, record the result as a step result
157-
parentWorkflowState, ok := ctx.Value(workflowStateKey).(*workflowState)
158-
isChildWorkflow := ok && parentWorkflowState != nil
159-
if isChildWorkflow {
155+
workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState)
156+
isWithinWorkflow := ok && workflowState != nil
157+
if isWithinWorkflow {
160158
encodedOutput, encErr := serialize(typedResult)
161159
if encErr != nil {
162-
return *new(R), newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr))
160+
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr))
163161
}
164162
recordGetResultInput := recordChildGetResultDBInput{
165-
parentWorkflowID: parentWorkflowState.workflowID,
163+
parentWorkflowID: workflowState.workflowID,
166164
childWorkflowID: h.workflowID,
167-
stepID: parentWorkflowState.NextStepID(),
165+
stepID: workflowState.NextStepID(),
168166
output: encodedOutput,
169167
err: err,
170168
}
@@ -491,11 +489,16 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
491489
parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState)
492490
isChildWorkflow := ok && parentWorkflowState != nil
493491

492+
if isChildWorkflow {
493+
// Advance step ID if we are a child workflow
494+
parentWorkflowState.NextStepID()
495+
}
496+
494497
// Generate an ID for the workflow if not provided
495498
var workflowID string
496499
if params.workflowID == "" {
497500
if isChildWorkflow {
498-
stepID := parentWorkflowState.NextStepID()
501+
stepID := parentWorkflowState.stepID
499502
workflowID = fmt.Sprintf("%s-%d", parentWorkflowState.workflowID, stepID)
500503
} else {
501504
workflowID = uuid.New().String()
@@ -588,12 +591,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
588591
// Record child workflow relationship if this is a child workflow
589592
if isChildWorkflow {
590593
// Get the step ID that was used for generating the child workflow ID
591-
stepID := parentWorkflowState.stepID
592594
childInput := recordChildWorkflowDBInput{
593595
parentWorkflowID: parentWorkflowState.workflowID,
594-
childWorkflowID: workflowStatus.ID,
596+
childWorkflowID: workflowID,
595597
stepName: params.workflowName,
596-
stepID: stepID,
598+
stepID: parentWorkflowState.stepID,
597599
tx: tx,
598600
}
599601
err = c.systemDB.RecordChildWorkflow(uncancellableCtx, childInput)
@@ -611,12 +613,12 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
611613
// Create workflow state to track step execution
612614
wfState := &workflowState{
613615
workflowID: workflowID,
614-
stepID: -1,
616+
stepID: -1, // Steps are O-indexed
615617
}
616618

617619
workflowCtx := WithValue(c, workflowStateKey, wfState)
618620

619-
// If the workflow has a deadline, set it in the context. We use what was returned by InsertWorkflowStatus
621+
// If the workflow has a durable deadline, set it in the context.
620622
var stopFunc func() bool
621623
cancelFuncCompleted := make(chan struct{})
622624
if !insertStatusResult.workflowDeadline.IsZero() {

0 commit comments

Comments
 (0)