Skip to content

Commit e9398ca

Browse files
committed
refactor: change Go function to return a channel of stepOutcome and simplify result handling
1 parent a478228 commit e9398ca

File tree

1 file changed

+13
-45
lines changed

1 file changed

+13
-45
lines changed

dbos/workflow.go

Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,12 @@ func WithNextStepID(stepID int) StepOption {
914914
}
915915
}
916916

917+
// StepOutcome holds the result and error from a step execution
918+
type stepOutcome[R any] struct {
919+
result R
920+
err error
921+
}
922+
917923
// RunAsStep executes a function as a durable step within a workflow.
918924
// Steps provide at-least-once execution guarantees and automatic retry capabilities.
919925
// If a step has already been executed (e.g., during workflow recovery), its recorded
@@ -1102,65 +1108,27 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11021108
}
11031109

11041110
// TODO: Add docs --- will add once I get the implementation right
1105-
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
1106-
if ctx == nil {
1107-
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
1108-
}
1109-
1110-
if fn == nil {
1111-
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
1112-
}
1113-
1114-
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
1115-
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
1116-
opts = append(opts, WithStepName(stepName))
1117-
1111+
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) {
11181112
// create a determistic step ID
1113+
// can we refactor this too?
11191114
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
11201115
if !ok || wfState == nil {
1121-
return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?")
1116+
return nil, newStepExecutionError("", "", "workflow state not found in context: are you running this step within a workflow?")
11221117
}
11231118
stepID := wfState.NextStepID()
11241119
opts = append(opts, WithNextStepID(stepID))
11251120

1126-
// Type-erase the function
1127-
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
1128-
11291121
// run step inside a Go routine by passing stepID
1130-
result, err := ctx.Go(ctx, typeErasedFn, opts...)
1131-
1132-
// Step function could return a nil result
1133-
if result == nil {
1134-
return *new(R), err
1135-
}
1136-
// Otherwise type-check and cast the result
1137-
typedResult, ok := result.(R)
1138-
if !ok {
1139-
return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result)
1140-
}
1141-
return typedResult, err
1142-
}
1143-
1144-
// TODO: move type above -- keeping it here for in case I need to modify it quickly
1145-
type stepResultChan struct {
1146-
result any
1147-
err error
1148-
}
1149-
1150-
// TODO: Add docs --- will add once I get the implementation right
1151-
func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (any, error) {
1152-
result := make(chan stepResultChan, 1)
1122+
result := make(chan stepOutcome[R], 1)
11531123
go func() {
1154-
res, err := c.RunAsStep(ctx, fn, opts...)
1155-
result <- stepResultChan{
1124+
res, err := RunAsStep(ctx, fn, opts...)
1125+
result <- stepOutcome[R]{
11561126
result: res,
11571127
err: err,
11581128
}
11591129
}()
11601130

1161-
resultChan := <-result
1162-
close(result)
1163-
return resultChan.result, resultChan.err
1131+
return result, nil
11641132
}
11651133

11661134
/****************************************/

0 commit comments

Comments
 (0)