-
Notifications
You must be signed in to change notification settings - Fork 41
Toward executor objects #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kraftp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting better. Really want to see what it looks like in code. One thing that jumps out is that we're now passing name into RunAsStep and RunAsWorkflow. That's clunky and probably not necessary--even if the type-erased function has the wrong name, we can extract the right name in the wrapper and pass it through the context.
Correct and I've been looking into doing exactly this. However, setting it in the context means using the The solution is likely to pass it in the options -- thus allowing user to optionally set workflow and step names, which we planned to do eventually. This works well for workflows but, won't work with steps if we want to remove functional step options for a revisited step interface accepting a variadic |
There has to be a workaround for this--worst case, an in-memory map from type-erased functions to names. |
Yup, that's one option, but it won't work when we move all the functional options to the context. Anything the users -- or us, inside the Generic function -- set through the context has to go through values like |
I'm sure there's a way to circumvent that, but we don't need to figure it out right now--let's get this interface built out, then fix steps, then we'll clean up the details. |
Fair enough -- the topics are sufficiently intertwined though that I took the time to find a basic solution that we will improve on. Specifically:
This seems to work well and I can now almost call steps without parameters. Workflow names are passed through a new functional option. I did have to fix the way I implemented the Generic RunAsWorkflow. To return a typed handle to the user, this unfortunately as to create a new goroutine to pass in the workflow outcome from an untyped channel to a typed one. Maybe there's way to improve on that too. Overall the solution is coming together and I am now updating the tests. |
… do not have the right concrete type
…all interface method directly during recovery / scheduled wfs
…cancellable context
| // FIXME: cancellation now has to go through the DBOSContext | ||
| ctx, cancel := context.WithCancel(c.ctx) | ||
| c.queueRunnerCtx = ctx | ||
| c.queueRunnerCancelFunc = cancel | ||
| c.queueRunnerDone = make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a next PR I'll tighten up the usage of DBOSContext to manage its owned resources
| getLogger().Warn("NewWorkflowQueue called after DBOS initialization, dynamic registration is not supported") | ||
| return WorkflowQueue{} | ||
| } | ||
| // TODO: Add runtime check for post-initialization registration if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add this in a next PR + prevention of post Launch wf registration
| } | ||
| } | ||
|
|
||
| // XXX this demonstrate why contexts cannot be used globally -- the task does not inherit the context used in the program that enqueued it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here ctx is the root context created by the user.
|
|
||
| // Queues | ||
| DequeueWorkflows(ctx context.Context, queue WorkflowQueue) ([]dequeuedWorkflow, error) | ||
| DequeueWorkflows(ctx context.Context, queue WorkflowQueue, executorID, applicationVersion string) ([]dequeuedWorkflow, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will cleanup inputs (for a struct) in later PRs
| select { | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| default: | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AwaitWorkflowResult should be cancellable -- work for another PR but placing this here in the meantime.
| destinationID string | ||
| message any | ||
| topic string | ||
| type WorkflowSendInputInternal struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Terrible name that must be made public for the DBOSContext to be implementable (DBOSContext.Send must take a typeless input)
| err = Launch() | ||
| if err != nil { | ||
| t.Fatalf("failed to launch DBOS instance: %v", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most tests don't need Launch(). Only useful for queues, recovery thread, and notifications
| // RegisterWorkflow is generically typed, allowing us to register the workflow input and output types for gob encoding | ||
| // The registered workflow is wrapped in a typed-erased wrapper which performs runtime type checks and conversions | ||
| // To execute the workflow, use DBOSContext.RunAsWorkflow | ||
| func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], opts ...workflowRegistrationOption) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One drawback of not wrapping anymore and returning the wrapped function, is that we can't automatically inject, at runtime, the registration options (today only max retries), when the user calls RunAsWorkflow.
This is very annoying because it means that, if we want the users to not have to pass it for every workflow, we must add registration option to the registry and allow RunAsWorkflow, to, at runtime, fetch the registration option (from the registry).
The registry is keyed by the provided workflow name, here the FQN -- maybe a user provided name in the future.
Now, consider that only the package-level RunAsWorkflow nows the FQN. The interface version receives a typeless wrapper that has another name. The registry is attached to the interface, so the package-level object cannot make any assumption about it.
The way we solve this is by adding a private withWorkflowName option to the WorkflowOptions. The package-level RunAsWorkflow injects that option, such that the Interface version of RunAsWorkflow can always retrieve its registered name from the parameters and get its register-time options. The wrappers we create during registration (for recovery and scheduled workflows) does the same (they could inject the the max retry option, but this way we have a unified way to get the name from the interface method.)
| // Create a typed channel for the user to get a typed handle | ||
| if handle, ok := handle.(*workflowHandle[any]); ok { | ||
| typedOutcomeChan := make(chan workflowOutcome[R], 1) | ||
|
|
||
| go func() { | ||
| defer close(typedOutcomeChan) | ||
| outcome := <-handle.outcomeChan | ||
|
|
||
| resultErr := outcome.err | ||
| var typedResult R | ||
| if typedRes, ok := outcome.result.(R); ok { | ||
| typedResult = typedRes | ||
| } else { // This should never happen | ||
| typedResult = *new(R) | ||
| typeErr := fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), outcome.result) | ||
| resultErr = errors.Join(resultErr, typeErr) | ||
| } | ||
|
|
||
| typedOutcomeChan <- workflowOutcome[R]{ | ||
| result: typedResult, | ||
| err: resultErr, | ||
| } | ||
| }() | ||
|
|
||
| typedHandle := &workflowHandle[R]{ | ||
| workflowID: handle.workflowID, | ||
| outcomeChan: typedOutcomeChan, | ||
| dbosContext: handle.dbosContext, | ||
| } | ||
|
|
||
| return typedHandle, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot return a generically typed handle directly from the interface RunAsWorkflow. This is because the outcome channel itself is typed. This solution has an intermediate goroutine pipe the untyped result from the interface RunAsWorfklow to a typed channel, so we can create & return a typed handle to the user. Would love to be suggested alternative ideas.
|
|
||
| // Check if we are within a workflow (and thus a child workflow) | ||
| parentWorkflowState, ok := ctx.Value(workflowStateKey).(*workflowState) | ||
| parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still in the process of figuring of whether this will work during child workflows recovery. Because the recovery thread will use the root context, which today doesn't not have a workflowStateKey. The recovery thread might be able to reconstruct this, however.
| type StepFunc func(ctx context.Context, input any) (any, error) | ||
| type GenericStepFunc[P any, R any] func(ctx context.Context, input P) (R, error) | ||
|
|
||
| const StepParamsKey DBOSContextKey = "stepParams" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how users can set step parameters, without passing functional options nor a new input parameter.
| func WithMaxInterval(maxInterval time.Duration) stepOption { | ||
| return func(p *StepParams) { | ||
| p.MaxInterval = maxInterval | ||
| var typeErasedStepNameToStepName = make(map[string]string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again some pain:
- The user might or might not provide a custom step name
- The package level method knows the true name of the function
- The interface method sees only the typed erased wrapper name
- We want package-level and interface methods to have the same signature
- No functional options, no "step params" to smooth the step UX
--> We use an in-memory map to pass the function name to the interface method, which will use it for a default name if the user didn't provide any.
| } | ||
|
|
||
| type WorkflowSetEventInput[R any] struct { | ||
| type WorkflowSetEventInputGeneric[R any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sadge
| stepCtx := WithValue(dbosCtx, StepParamsKey, &StepParams{ | ||
| MaxRetries: 5, | ||
| BaseInterval: 1 * time.Millisecond, | ||
| MaxInterval: 10 * time.Millisecond, | ||
| }) | ||
|
|
||
| var ( | ||
| stepWithinAStepWf = WithWorkflow(stepWithinAStepWorkflow) | ||
| stepRetryWf = WithWorkflow(stepRetryWorkflow) | ||
| ) | ||
| return RunAsStep(stepCtx, stepRetryAlwaysFailsStep, input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
step params in use
This PR:
DBOSContexttype which holds both DBOS executor functionalities and extends the nativecontext.Contextinterface.The mirror functions are made with the same signature so it is more intuitive for users when mocking.
Few things to note/improve/think about
DBOSContextinstead of acontext.Context. This is because all operations need a context. Steps on the other hand can accept a normal context.Context if they wish to, and upcast it toDBOSContextif they need to do DBOS stuff.context.Contextfunc checkoutEndpoint(c *gin.Context, dbosCtx dbos.DBOSContext, logger *logrus.Logger) {Another thing this PR does, on the path of improving the step interface, is allowing users to pass step parameters through the context, using the
WithValuemethod. We'll have to improve on this as well: right nowWithValuedoes nothing if the provided interface is not a concretedbosContext(our internal struct implementingDBOSContext), so a user cannot rely on this in their tests if they mockDBOSContext.Immediate next PRs:
DBOSContextexposes all the DBOS methods an end-user is expected to write in their code:New library usage:
Starting workflows:
The testing can be done as: