From d15de6ea92eae6e20f399f43f0a949ce7526d097 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 18 Aug 2025 18:26:13 -0700 Subject: [PATCH 01/12] add contributing.md file and licence readme badge --- CONTRIBUTING.md | 45 +++++++++++++++++++++++++++++++++++++++++++++ README.md | 1 + 2 files changed, 46 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..12e75f6d --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,45 @@ +# Contributing to DBOS Transact Python + +Thank you for considering contributing to DBOS Transact. We welcome contributions from everyone, including bug fixes, feature enhancements, documentation improvements, or any other form of contribution. + +## How to Contribute + +To get started with DBOS Transact, please read the [README](README.md). + +You can contribute in many ways. Some simple ways are: +* Use the SDK and open issues to report any bugs, questions, concern with the SDK, samples or documentation. +* Respond to issues with advice or suggestions. +* Participate in discussions in our [Discord](https://discord.gg/fMwQjeW5zg) channel. +* Contribute fixes and improvement to code, samples or documentation. + +### To contribute code, please follow these steps: + +1. Fork this github repository to your own account. + +2. Clone the forked repository to your local machine. + +3. Create a branch. + +4. Make the necessary change to code, samples or documentation. + +5. Write tests. + +6. Commit the changes to your forked repository. + +7. Submit a pull request to this repository. +In the PR description please include: +* Description of the fix/feature. +* Brief description of implementation. +* Description of how you tested the fix. + +## Requesting features + +If you have a feature request or an idea for an enhancement, feel free to open an issue on GitHub. Describe the feature or enhancement you'd like to see and why it would be valuable. Discuss it with the community on the [Discord](https://discord.gg/fMwQjeW5zg) channel. + +## Discuss with the community + +If you are stuck, need help, or wondering if a certain contribution will be welcome, please ask! You can reach out to us on [Discord](https://discord.gg/fMwQjeW5zg) or Github discussions. + +## Code of conduct + +It is important to us that contributing to DBOS will be a pleasant experience, if necessary, please refer to our [code of conduct](CODE_OF_CONDUCT.md) for participation guidelines. \ No newline at end of file diff --git a/README.md b/README.md index 39e773ed..bbee5a12 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/dbos-inc/dbos-transact-go)](https://goreportcard.com/report/github.com/dbos-inc/dbos-transact-go) [![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/dbos-inc/dbos-transact-go?sort=semver)](https://github.com/dbos-inc/dbos-transact-go/releases) [![Join Discord](https://img.shields.io/badge/Discord-Join%20Chat-5865F2?logo=discord&logoColor=white)](https://discord.com/invite/jsmC6pXGgX) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) # DBOS Transact: Lightweight Durable Workflow Orchestration with Postgres From 7f0217d32aa28d6a1b6badaeaaf0daadcb8ab0c4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 18 Aug 2025 18:26:26 -0700 Subject: [PATCH 02/12] casting checks --- dbos/admin_server_test.go | 3 ++- dbos/logger_test.go | 3 ++- dbos/utils_test.go | 6 +++++- dbos/workflow.go | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index dc169ace..9ba3c24c 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -44,7 +44,8 @@ func TestAdminServer(t *testing.T) { // Verify the DBOS executor doesn't have an admin server instance require.NotNil(t, ctx, "Expected DBOS instance to be created") - exec := ctx.(*dbosContext) + exec, ok := ctx.(*dbosContext) + require.True(t, ok, "Expected ctx to be of type *dbosContext") require.Nil(t, exec.adminServer, "Expected admin server to be nil when not configured") }) diff --git a/dbos/logger_test.go b/dbos/logger_test.go index 62bffa31..4921b294 100644 --- a/dbos/logger_test.go +++ b/dbos/logger_test.go @@ -26,7 +26,8 @@ func TestLogger(t *testing.T) { } }) - ctx := dbosCtx.(*dbosContext) + ctx, ok := dbosCtx.(*dbosContext) + require.True(t, ok, "Expected dbosCtx to be of type *dbosContext") require.NotNil(t, ctx.logger) // Test logger access diff --git a/dbos/utils_test.go b/dbos/utils_test.go index a1a7eaf9..9a8bb20c 100644 --- a/dbos/utils_test.go +++ b/dbos/utils_test.go @@ -134,7 +134,11 @@ func queueEntriesAreCleanedUp(ctx DBOSContext) bool { success := false for range maxTries { // Begin transaction - exec := ctx.(*dbosContext) + exec, ok := ctx.(*dbosContext) + if !ok { + fmt.Println("Expected ctx to be of type *dbosContext in queueEntriesAreCleanedUp") + return false + } tx, err := exec.systemDB.(*sysDB).pool.Begin(ctx) if err != nil { return false diff --git a/dbos/workflow.go b/dbos/workflow.go index d13fcefe..2d45bb04 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -536,7 +536,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], return fn(ctx, input.(P)) }) - handle, err := ctx.(*dbosContext).RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...) + handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...) if err != nil { return nil, err } From 84c28275b00ca65d08ecb09ade234c1a6a38bd09 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 18 Aug 2025 18:40:25 -0700 Subject: [PATCH 03/12] privatize a few types --- dbos/dbos.go | 8 ++++---- dbos/workflow.go | 34 +++++++++++++++++----------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 501dca60..ab9adc20 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -75,12 +75,12 @@ type DBOSContext interface { Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources // Workflow operations - RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow - RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution + RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow + RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow - Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow + Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow - GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow + GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) GetStepID() (int, error) // Get the current step ID (only available within workflows) diff --git a/dbos/workflow.go b/dbos/workflow.go index 2d45bb04..c80f43f0 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -257,7 +257,7 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFun } } -func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn WorkflowFunc, cronSchedule string) { +func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn workflowFunc, cronSchedule string) { // Skip if we don't have a concrete dbosContext c, ok := ctx.(*dbosContext) if !ok { @@ -365,7 +365,7 @@ func WithWorkflowName(name string) workflowRegistrationOption { // dbos.WithMaxRetries(5), // dbos.WithSchedule("0 0 * * *")) // daily at midnight // dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow -func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], opts ...workflowRegistrationOption) { +func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts ...workflowRegistrationOption) { if ctx == nil { panic("ctx cannot be nil") } @@ -391,7 +391,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R gob.Register(r) // Register a type-erased version of the durable workflow for recovery - typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) { + typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) { typedInput, ok := input.(P) if !ok { wfID, err := ctx.GetWorkflowID() @@ -438,13 +438,13 @@ type DBOSContextKey string const workflowStateKey DBOSContextKey = "workflowState" -// GenericWorkflowFunc represents a type-safe workflow function with specific input and output types. +// WorkflowFunc represents a type-safe workflow function with specific input and output types. // P is the input parameter type and R is the return type. // All workflow functions must accept a DBOSContext as their first parameter. -type GenericWorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error) +type WorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error) -// WorkflowFunc represents a type-erased workflow function used internally. -type WorkflowFunc func(ctx DBOSContext, input any) (any, error) +// workflowFunc represents a type-erased workflow function used internally. +type workflowFunc func(ctx DBOSContext, input any) (any, error) type workflowParams struct { workflowName string @@ -524,7 +524,7 @@ func withWorkflowName(name string) WorkflowOption { // } else { // log.Printf("Result: %v", result) // } -func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) { +func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) { if ctx == nil { return nil, fmt.Errorf("ctx cannot be nil") } @@ -532,7 +532,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], // Add the fn name to the options so we can communicate it with DBOSContext.RunAsWorkflow opts = append(opts, withWorkflowName(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())) - typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) { + typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) { return fn(ctx, input.(P)) }) @@ -581,7 +581,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], return nil, fmt.Errorf("unexpected workflow handle type: %T", handle) } -func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { +func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { // Apply options to build params params := workflowParams{ applicationVersion: c.GetApplicationVersion(), @@ -825,11 +825,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o /******* STEP FUNCTIONS *******/ /******************************/ -// StepFunc represents a type-erased step function used internally. -type StepFunc func(ctx context.Context) (any, error) +// stepFunc represents a type-erased step function used internally. +type stepFunc func(ctx context.Context) (any, error) -// GenericStepFunc represents a type-safe step function with a specific output type R. -type GenericStepFunc[R any] func(ctx context.Context) (R, error) +// StepFunc represents a type-safe step function with a specific output type R. +type StepFunc[R any] func(ctx context.Context) (R, error) // stepOptions holds the configuration for step execution using functional options pattern. type stepOptions struct { @@ -940,7 +940,7 @@ func WithMaxInterval(interval time.Duration) StepOption { // Note that the function passed to RunAsStep must accept a context.Context as its first parameter // and this context *must* be the one specified in the function's signature (not the context passed to RunAsStep). // Under the hood, DBOS will augment the step's context and pass it to the function when executing it durably. -func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption) (R, error) { +func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, error) { if ctx == nil { return *new(R), newStepExecutionError("", "", "ctx cannot be nil") } @@ -954,7 +954,7 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption opts = append(opts, WithStepName(stepName)) // Type-erase the function - typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + typeErasedFn := stepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) result, err := ctx.RunAsStep(ctx, typeErasedFn, opts...) // Step function could return a nil result @@ -969,7 +969,7 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption return typedResult, err } -func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { +func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) { // Process functional options stepOpts := &stepOptions{} for _, opt := range opts { From 4db4d341ccc74565c0be09363adfe45746c2b5b4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 18 Aug 2025 18:40:40 -0700 Subject: [PATCH 04/12] fix package level RetrieveWorkflow (now calls interface RetrieveWorkflow) --- dbos/workflow.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index c80f43f0..ba08aebe 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1354,16 +1354,14 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPolli var r R gob.Register(r) - workflowStatus, err := ctx.(*dbosContext).systemDB.listWorkflows(ctx, listWorkflowsDBInput{ - workflowIDs: []string{workflowID}, - }) + // Call the interface method + handle, err := ctx.RetrieveWorkflow(ctx, workflowID) if err != nil { - return nil, fmt.Errorf("failed to retrieve workflow status: %w", err) - } - if len(workflowStatus) == 0 { - return nil, newNonExistentWorkflowError(workflowID) + return nil, err } - return newWorkflowPollingHandle[R](ctx, workflowID), nil + + // Convert to typed polling handle + return newWorkflowPollingHandle[R](ctx, handle.GetWorkflowID()), nil } type EnqueueOptions struct { From d4ff36a93b3586216a95e6ba0eacf2058db7ae3f Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 18 Aug 2025 18:50:08 -0700 Subject: [PATCH 05/12] more typing --- dbos/workflow.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index ba08aebe..570d75ec 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -212,17 +212,17 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) { /**********************************/ /******* WORKFLOW REGISTRY *******/ /**********************************/ -type GenericWrappedWorkflowFunc[P any, R any] func(ctx DBOSContext, input P, opts ...WorkflowOption) (WorkflowHandle[R], error) -type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) +type WrappedWorkflowFunc[P any, R any] func(ctx DBOSContext, input P, opts ...WorkflowOption) (WorkflowHandle[R], error) +type wrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) type workflowRegistryEntry struct { - wrappedFunction WrappedWorkflowFunc + wrappedFunction wrappedWorkflowFunc maxRetries int name string } // Register adds a workflow function to the registry (thread-safe, only once per name) -func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) { +func registerWorkflow(ctx DBOSContext, workflowFQN string, fn wrappedWorkflowFunc, maxRetries int, customName string) { // Skip if we don't have a concrete dbosContext c, ok := ctx.(*dbosContext) if !ok { @@ -411,7 +411,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts return fn(ctx, typedInput) }) - typeErasedWrapper := WrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { + typeErasedWrapper := wrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { opts = append(opts, withWorkflowName(fqn)) // Append the name so ctx.RunAsWorkflow can look it up from the registry to apply registration-time options handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...) if err != nil { @@ -831,8 +831,8 @@ type stepFunc func(ctx context.Context) (any, error) // StepFunc represents a type-safe step function with a specific output type R. type StepFunc[R any] func(ctx context.Context) (R, error) -// stepOptions holds the configuration for step execution using functional options pattern. -type stepOptions struct { +// StepOptions holds the configuration for step execution using functional options pattern. +type StepOptions struct { MaxRetries int // Maximum number of retry attempts (0 = no retries) BackoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) BaseInterval time.Duration // Initial delay between retries (default: 100ms) @@ -841,7 +841,7 @@ type stepOptions struct { } // setDefaults applies default values to stepOptions -func (opts *stepOptions) setDefaults() { +func (opts *StepOptions) setDefaults() { if opts.BackoffFactor == 0 { opts.BackoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR } @@ -854,13 +854,13 @@ func (opts *stepOptions) setDefaults() { } // StepOption is a functional option for configuring step execution parameters. -type StepOption func(*stepOptions) +type StepOption func(*StepOptions) // WithStepName sets a custom name for the step. If the step name has already been set // by a previous call to WithStepName, this option will be ignored to allow // multiple WithStepName calls without overriding the first one. func WithStepName(name string) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { if opts.StepName == "" { opts.StepName = name } @@ -870,7 +870,7 @@ func WithStepName(name string) StepOption { // WithStepMaxRetries sets the maximum number of retry attempts for the step. // A value of 0 means no retries (default behavior). func WithStepMaxRetries(maxRetries int) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.MaxRetries = maxRetries } } @@ -879,7 +879,7 @@ func WithStepMaxRetries(maxRetries int) StepOption { // The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1)) // Default value is 2.0. func WithBackoffFactor(factor float64) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.BackoffFactor = factor } } @@ -887,7 +887,7 @@ func WithBackoffFactor(factor float64) StepOption { // WithBaseInterval sets the initial delay between retries. // Default value is 100ms. func WithBaseInterval(interval time.Duration) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.BaseInterval = interval } } @@ -895,7 +895,7 @@ func WithBaseInterval(interval time.Duration) StepOption { // WithMaxInterval sets the maximum delay between retries. // Default value is 5s. func WithMaxInterval(interval time.Duration) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.MaxInterval = interval } } @@ -971,7 +971,7 @@ func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, e func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) { // Process functional options - stepOpts := &stepOptions{} + stepOpts := &StepOptions{} for _, opt := range opts { opt(stepOpts) } From 02f570f7d99fe136ae49a2cffdaabdccf7321c47 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 10:52:35 -0700 Subject: [PATCH 06/12] cleaner EnqueueOptions --- dbos/client_test.go | 158 +++++++++++++------------------------------- dbos/dbos.go | 12 ++-- dbos/workflow.go | 156 ++++++++++++++++++++++++------------------- 3 files changed, 141 insertions(+), 185 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 31870189..a076ae1e 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -68,12 +68,8 @@ func TestEnqueue(t *testing.T) { t.Run("EnqueueAndGetResult", func(t *testing.T) { // Client enqueues a task using the new Enqueue method - handle, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err) // Verify we got a polling handle @@ -102,12 +98,8 @@ func TestEnqueue(t *testing.T) { customWorkflowID := "custom-client-workflow-id" // Client enqueues a task with a custom workflow ID - _, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowID: customWorkflowID, - WorkflowInput: wfInput{Input: "test-input"}, - }) + _, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueWorkflowID(customWorkflowID)) require.NoError(t, err) // Verify the workflow ID is what we set @@ -123,12 +115,8 @@ func TestEnqueue(t *testing.T) { }) t.Run("EnqueueWithTimeout", func(t *testing.T) { - handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "BlockingWorkflow", - QueueName: queue.Name, - WorkflowInput: "blocking-input", - WorkflowTimeout: 500 * time.Millisecond, - }) + handle, err := Enqueue[string, string](clientCtx, queue.Name, "BlockingWorkflow", "blocking-input", + WithEnqueueTimeout(500*time.Millisecond)) require.NoError(t, err) // Should timeout when trying to get result @@ -154,32 +142,20 @@ func TestEnqueue(t *testing.T) { mu.Unlock() // Enqueue workflow without priority (will use default priority of 0) - handle1, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "PriorityWorkflow", - QueueName: priorityQueue.Name, - WorkflowInput: "abc", - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle1, err := Enqueue[string, string](clientCtx, priorityQueue.Name, "PriorityWorkflow", "abc", + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow without priority") // Enqueue with a lower priority (higher number = lower priority) - handle2, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "PriorityWorkflow", - QueueName: priorityQueue.Name, - WorkflowInput: "def", - Priority: 5, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle2, err := Enqueue[string, string](clientCtx, priorityQueue.Name, "PriorityWorkflow", "def", + WithEnqueuePriority(5), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow with priority 5") // Enqueue with a higher priority (lower number = higher priority) - handle3, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "PriorityWorkflow", - QueueName: priorityQueue.Name, - WorkflowInput: "ghi", - Priority: 1, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle3, err := Enqueue[string, string](clientCtx, priorityQueue.Name, "PriorityWorkflow", "ghi", + WithEnqueuePriority(1), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow with priority 1") // Get results @@ -212,25 +188,17 @@ func TestEnqueue(t *testing.T) { wfid2 := "client-dedup-wf2" // First workflow with deduplication ID - should succeed - handle1, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowID: wfid1, - DeduplicationID: dedupID, - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle1, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueWorkflowID(wfid1), + WithEnqueueDeduplicationID(dedupID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue first workflow with deduplication ID") // Second workflow with same deduplication ID but different workflow ID - should fail - _, err = Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowID: wfid2, - DeduplicationID: dedupID, - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + _, err = Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueWorkflowID(wfid2), + WithEnqueueDeduplicationID(dedupID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID") // Check that it's the correct error type and message @@ -242,22 +210,14 @@ func TestEnqueue(t *testing.T) { assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information") // Third workflow with different deduplication ID - should succeed - handle3, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - DeduplicationID: "different-dedup-id", - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle3, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueDeduplicationID("different-dedup-id"), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow with different deduplication ID") // Fourth workflow without deduplication ID - should succeed - handle4, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle4, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow without deduplication ID") // Wait for all successful workflows to complete @@ -274,14 +234,10 @@ func TestEnqueue(t *testing.T) { assert.Equal(t, "processed: test-input", result4) // After first workflow completes, we should be able to enqueue with same deduplication ID - handle5, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ - WorkflowName: "ServerWorkflow", - QueueName: queue.Name, - WorkflowID: wfid2, // Reuse the workflow ID that failed before - DeduplicationID: dedupID, // Same deduplication ID as first workflow - WorkflowInput: wfInput{Input: "test-input"}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, + WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before + WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion") result5, err := handle5.GetResult() @@ -366,13 +322,9 @@ func TestCancelResume(t *testing.T) { workflowID := "test-cancel-resume-workflow" // Start the workflow - it will execute step one and then wait - handle, err := Enqueue[int, int](clientCtx, GenericEnqueueOptions[int]{ - WorkflowName: "CancelResumeWorkflow", - QueueName: queue.Name, - WorkflowID: workflowID, - WorkflowInput: input, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err := Enqueue[int, int](clientCtx, queue.Name, "CancelResumeWorkflow", input, + WithEnqueueWorkflowID(workflowID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow from client") // Wait for workflow to signal it has started and step one completed @@ -435,14 +387,10 @@ func TestCancelResume(t *testing.T) { workflowTimeout := 2 * time.Second // Start the workflow with a 2-second timeout - handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "TimeoutBlockingWorkflow", - QueueName: queue.Name, - WorkflowID: workflowID, - WorkflowInput: "timeout-test", - WorkflowTimeout: workflowTimeout, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err := Enqueue[string, string](clientCtx, queue.Name, "TimeoutBlockingWorkflow", "timeout-test", + WithEnqueueWorkflowID(workflowID), + WithEnqueueTimeout(workflowTimeout), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue timeout blocking workflow") // Wait 500ms (well before the timeout expires) @@ -615,13 +563,9 @@ func TestForkWorkflow(t *testing.T) { originalWorkflowID := "original-workflow-fork-test" // 1. Run the entire workflow first and check counters are 1 - handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "ParentWorkflow", - QueueName: queue.Name, - WorkflowID: originalWorkflowID, - WorkflowInput: "test", - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err := Enqueue[string, string](clientCtx, queue.Name, "ParentWorkflow", "test", + WithEnqueueWorkflowID(originalWorkflowID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue original workflow") // Wait for the original workflow to complete @@ -758,13 +702,9 @@ func TestListWorkflows(t *testing.T) { if i < 5 { // First 5 workflows: use prefix "test-batch-" and succeed workflowID = fmt.Sprintf("test-batch-%d", i) - handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{ - WorkflowName: "SimpleWorkflow", - QueueName: queue.Name, - WorkflowID: workflowID, - WorkflowInput: testInput{Value: i, ID: fmt.Sprintf("success-%d", i)}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err = Enqueue[testInput, string](clientCtx, queue.Name, "SimpleWorkflow", testInput{Value: i, ID: fmt.Sprintf("success-%d", i)}, + WithEnqueueWorkflowID(workflowID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) } else { // Last 5 workflows: use prefix "test-other-" and some will fail workflowID = fmt.Sprintf("test-other-%d", i) @@ -772,13 +712,9 @@ func TestListWorkflows(t *testing.T) { if i >= 8 { value = -i // These will fail } - handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{ - WorkflowName: "SimpleWorkflow", - QueueName: queue.Name, - WorkflowID: workflowID, - WorkflowInput: testInput{Value: value, ID: fmt.Sprintf("test-%d", i)}, - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) + handle, err = Enqueue[testInput, string](clientCtx, queue.Name, "SimpleWorkflow", testInput{Value: value, ID: fmt.Sprintf("test-%d", i)}, + WithEnqueueWorkflowID(workflowID), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) } require.NoError(t, err, "failed to enqueue workflow %d", i) diff --git a/dbos/dbos.go b/dbos/dbos.go index ab9adc20..72227c80 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -86,12 +86,12 @@ type DBOSContext interface { GetStepID() (int, error) // Get the current step ID (only available within workflows) // Workflow management - RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow - Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters - CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED - ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow - ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step - ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria + RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow + Enqueue(_ DBOSContext, queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error) // Enqueue a workflow to a named queue + CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED + ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow + ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step + ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/workflow.go b/dbos/workflow.go index 570d75ec..0a17007f 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1365,42 +1365,89 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPolli } type EnqueueOptions struct { - WorkflowName string - QueueName string - WorkflowID string - ApplicationVersion string - DeduplicationID string - Priority uint - WorkflowTimeout time.Duration - WorkflowInput any -} - -func (c *dbosContext) Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) { - workflowID := params.WorkflowID + workflowName string + workflowID string + applicationVersion string + deduplicationID string + priority uint + workflowTimeout time.Duration + workflowInput any +} + +// EnqueueOption is a functional option for configuring workflow enqueue parameters. +type EnqueueOption func(*EnqueueOptions) + +// WithEnqueueWorkflowID sets a custom workflow ID instead of generating one automatically. +func WithEnqueueWorkflowID(id string) EnqueueOption { + return func(opts *EnqueueOptions) { + opts.workflowID = id + } +} + +// WithEnqueueApplicationVersion overrides the application version for the enqueued workflow. +func WithEnqueueApplicationVersion(version string) EnqueueOption { + return func(opts *EnqueueOptions) { + opts.applicationVersion = version + } +} + +// WithEnqueueDeduplicationID sets a deduplication ID for the enqueued workflow. +func WithEnqueueDeduplicationID(id string) EnqueueOption { + return func(opts *EnqueueOptions) { + opts.deduplicationID = id + } +} + +// WithEnqueuePriority sets the execution priority for the enqueued workflow. +func WithEnqueuePriority(priority uint) EnqueueOption { + return func(opts *EnqueueOptions) { + opts.priority = priority + } +} + +// WithEnqueueTimeout sets the maximum execution time for the enqueued workflow. +func WithEnqueueTimeout(timeout time.Duration) EnqueueOption { + return func(opts *EnqueueOptions) { + opts.workflowTimeout = timeout + } +} + +func (c *dbosContext) Enqueue(_ DBOSContext, queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error) { + // Process options + params := &EnqueueOptions{ + workflowName: workflowName, + applicationVersion: c.GetApplicationVersion(), + workflowInput: input, + } + for _, opt := range opts { + opt(params) + } + + workflowID := params.workflowID if workflowID == "" { workflowID = uuid.New().String() } var deadline time.Time - if params.WorkflowTimeout > 0 { - deadline = time.Now().Add(params.WorkflowTimeout) + if params.workflowTimeout > 0 { + deadline = time.Now().Add(params.workflowTimeout) } - if params.Priority > uint(math.MaxInt) { - return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.Priority, math.MaxInt) + if params.priority > uint(math.MaxInt) { + return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt) } status := WorkflowStatus{ - Name: params.WorkflowName, - ApplicationVersion: params.ApplicationVersion, + Name: params.workflowName, + ApplicationVersion: params.applicationVersion, Status: WorkflowStatusEnqueued, ID: workflowID, CreatedAt: time.Now(), Deadline: deadline, - Timeout: params.WorkflowTimeout, - Input: params.WorkflowInput, - QueueName: params.QueueName, - DeduplicationID: params.DeduplicationID, - Priority: int(params.Priority), + Timeout: params.workflowTimeout, + Input: params.workflowInput, + QueueName: queueName, + DeduplicationID: params.deduplicationID, + Priority: int(params.priority), } uncancellableCtx := WithoutCancel(c) @@ -1429,33 +1476,23 @@ func (c *dbosContext) Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHan return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil } -type GenericEnqueueOptions[P any] struct { - WorkflowName string - QueueName string - WorkflowID string - ApplicationVersion string - DeduplicationID string - Priority uint - WorkflowTimeout time.Duration - WorkflowInput P -} - // Enqueue adds a workflow to a named queue for later execution with type safety. // The workflow will be persisted with ENQUEUED status until picked up by a DBOS process. // This provides asynchronous workflow execution with durability guarantees. // // Parameters: // - ctx: DBOS context for the operation -// - params: Configuration parameters including workflow name, queue name, input, and options +// - queueName: Name of the queue to enqueue the workflow to +// - workflowName: Name of the registered workflow function to execute +// - input: Input parameters to pass to the workflow (type P) +// - opts: Optional configuration options // -// The params struct contains: -// - WorkflowName: Name of the registered workflow function to execute (required) -// - QueueName: Name of the queue to enqueue the workflow to (required) -// - WorkflowID: Custom workflow ID (optional, auto-generated if empty) -// - ApplicationVersion: Application version override (optional) -// - DeduplicationID: Deduplication identifier for idempotent enqueuing (optional) -// - WorkflowTimeout: Maximum execution time for the workflow (optional) -// - WorkflowInput: Input parameters to pass to the workflow (type P) +// Available options: +// - WithEnqueueWorkflowID: Custom workflow ID (auto-generated if not provided) +// - WithEnqueueApplicationVersion: Application version override +// - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing +// - WithEnqueuePriority: Execution priority +// - WithEnqueueTimeout: Maximum execution time for the workflow // // Returns a typed workflow handle that can be used to check status and retrieve results. // The handle uses polling to check workflow completion since the execution is asynchronous. @@ -1463,12 +1500,8 @@ type GenericEnqueueOptions[P any] struct { // Example usage: // // // Enqueue a workflow with string input and int output -// handle, err := dbos.Enqueue[string, int](ctx, dbos.GenericEnqueueOptions[string]{ -// WorkflowName: "ProcessDataWorkflow", -// QueueName: "data-processing", -// WorkflowInput: "input data", -// WorkflowTimeout: 30 * time.Minute, -// }) +// handle, err := dbos.Enqueue[string, int](ctx, "data-processing", "ProcessDataWorkflow", "input data", +// dbos.WithEnqueueTimeout(30 * time.Minute)) // if err != nil { // log.Fatal(err) // } @@ -1488,14 +1521,10 @@ type GenericEnqueueOptions[P any] struct { // } // // // Enqueue with deduplication and custom workflow ID -// handle, err := dbos.Enqueue[MyInputType, MyOutputType](ctx, dbos.GenericEnqueueOptions[MyInputType]{ -// WorkflowName: "MyWorkflow", -// QueueName: "my-queue", -// WorkflowID: "custom-workflow-id", -// DeduplicationID: "unique-operation-id", -// WorkflowInput: MyInputType{Field: "value"}, -// }) -func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (WorkflowHandle[R], error) { +// handle, err := dbos.Enqueue[MyInputType, MyOutputType](ctx, "my-queue", "MyWorkflow", MyInputType{Field: "value"}, +// dbos.WithEnqueueWorkflowID("custom-workflow-id"), +// dbos.WithEnqueueDeduplicationID("unique-operation-id")) +func Enqueue[P any, R any](ctx DBOSContext, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error) { if ctx == nil { return nil, errors.New("ctx cannot be nil") } @@ -1506,17 +1535,8 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo var typedOutput R gob.Register(typedOutput) - // Call typed erased enqueue - handle, err := ctx.Enqueue(ctx, EnqueueOptions{ - WorkflowName: params.WorkflowName, - QueueName: params.QueueName, - WorkflowID: params.WorkflowID, - ApplicationVersion: params.ApplicationVersion, - DeduplicationID: params.DeduplicationID, - Priority: params.Priority, - WorkflowInput: params.WorkflowInput, - WorkflowTimeout: params.WorkflowTimeout, - }) + // Call the interface method with the same signature + handle, err := ctx.Enqueue(ctx, queueName, workflowName, input, opts...) if err != nil { return nil, err } From 2510be30d5f74a36729d296faf8120b616b9009b Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 11:00:10 -0700 Subject: [PATCH 07/12] cleanup ListWorkflows --- dbos/admin_server.go | 6 +++--- dbos/dbos.go | 2 +- dbos/workflow.go | 40 ++++++++++++++++++++-------------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index a95dc62a..4953a247 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -287,7 +287,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } } - workflows, err := ctx.ListWorkflows(req.toListWorkflowsOptions()...) + workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...) if err != nil { ctx.logger.Error("Failed to list workflows", "error", err) http.Error(w, fmt.Sprintf("Failed to list workflows: %v", err), http.StatusInternalServerError) @@ -313,7 +313,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { // Use ListWorkflows with the specific workflow ID filter opts := []ListWorkflowsOption{WithWorkflowIDs([]string{workflowID})} - workflows, err := ctx.ListWorkflows(opts...) + workflows, err := ListWorkflows(ctx, opts...) if err != nil { ctx.logger.Error("Failed to get workflow", "workflow_id", workflowID, "error", err) http.Error(w, fmt.Sprintf("Failed to get workflow: %v", err), http.StatusInternalServerError) @@ -346,7 +346,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } } - workflows, err := ctx.ListWorkflows(req.toListWorkflowsOptions()...) + workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...) if err != nil { ctx.logger.Error("Failed to list queued workflows", "error", err) http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError) diff --git a/dbos/dbos.go b/dbos/dbos.go index 72227c80..4539a2a2 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -91,7 +91,7 @@ type DBOSContext interface { CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step - ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria + ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/workflow.go b/dbos/workflow.go index 0a17007f..b08eceaf 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1713,8 +1713,8 @@ func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHand return newWorkflowPollingHandle[R](ctx, handle.GetWorkflowID()), nil } -// listWorkflowsParams holds configuration parameters for listing workflows -type listWorkflowsParams struct { +// ListWorkflowsOptions holds configuration parameters for listing workflows +type ListWorkflowsOptions struct { workflowIDs []string status []WorkflowStatusType startTime time.Time @@ -1732,7 +1732,7 @@ type listWorkflowsParams struct { } // ListWorkflowsOption is a functional option for configuring workflow listing parameters. -type ListWorkflowsOption func(*listWorkflowsParams) +type ListWorkflowsOption func(*ListWorkflowsOptions) // WithWorkflowIDs filters workflows by the specified workflow IDs. // @@ -1741,7 +1741,7 @@ type ListWorkflowsOption func(*listWorkflowsParams) // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"})) func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.workflowIDs = workflowIDs } } @@ -1754,7 +1754,7 @@ func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess, dbos.WorkflowStatusError})) func WithStatus(status []WorkflowStatusType) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.status = status } } @@ -1766,7 +1766,7 @@ func WithStatus(status []WorkflowStatusType) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithStartTime(time.Now().Add(-24*time.Hour))) func WithStartTime(startTime time.Time) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.startTime = startTime } } @@ -1778,7 +1778,7 @@ func WithStartTime(startTime time.Time) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithEndTime(time.Now())) func WithEndTime(endTime time.Time) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.endTime = endTime } } @@ -1790,7 +1790,7 @@ func WithEndTime(endTime time.Time) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithName("MyWorkflowFunction")) func WithName(name string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.name = name } } @@ -1802,7 +1802,7 @@ func WithName(name string) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithAppVersion("v1.0.0")) func WithAppVersion(appVersion string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.appVersion = appVersion } } @@ -1814,7 +1814,7 @@ func WithAppVersion(appVersion string) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithUser("john.doe")) func WithUser(user string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.user = user } } @@ -1826,7 +1826,7 @@ func WithUser(user string) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithLimit(100)) func WithLimit(limit int) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.limit = &limit } } @@ -1838,7 +1838,7 @@ func WithLimit(limit int) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithOffset(50), dbos.WithLimit(25)) func WithOffset(offset int) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.offset = &offset } } @@ -1850,7 +1850,7 @@ func WithOffset(offset int) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithSortDesc(true)) func WithSortDesc(sortDesc bool) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.sortDesc = sortDesc } } @@ -1862,7 +1862,7 @@ func WithSortDesc(sortDesc bool) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithWorkflowIDPrefix("batch-")) func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.workflowIDPrefix = prefix } } @@ -1874,7 +1874,7 @@ func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithLoadInput(false)) func WithLoadInput(loadInput bool) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.loadInput = loadInput } } @@ -1886,7 +1886,7 @@ func WithLoadInput(loadInput bool) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithLoadOutput(false)) func WithLoadOutput(loadOutput bool) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.loadOutput = loadOutput } } @@ -1899,7 +1899,7 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // workflows, err := dbos.ListWorkflows(ctx, // dbos.WithQueueName("data-processing")) func WithQueueName(queueName string) ListWorkflowsOption { - return func(p *listWorkflowsParams) { + return func(p *ListWorkflowsOptions) { p.queueName = queueName } } @@ -1948,10 +1948,10 @@ func WithQueueName(queueName string) ListWorkflowsOption { // if err != nil { // log.Fatal(err) // } -func (c *dbosContext) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { +func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { // Initialize parameters with defaults - params := &listWorkflowsParams{ + params := &ListWorkflowsOptions{ loadInput: true, // Default to loading input loadOutput: true, // Default to loading output } @@ -2027,5 +2027,5 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat if ctx == nil { return nil, errors.New("ctx cannot be nil") } - return ctx.ListWorkflows(opts...) + return ctx.ListWorkflows(ctx, opts...) } From f8439b0b5b24b396b451ee0593e374d337cb5bb6 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 11:00:30 -0700 Subject: [PATCH 08/12] cleanup WorkflowRegistrationOptions --- dbos/workflow.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index b08eceaf..4f4187f4 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -299,13 +299,13 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn workflow c.logger.Info("Registered scheduled workflow", "fqn", workflowName, "cron_schedule", cronSchedule) } -type workflowRegistrationParams struct { +type WorkflowRegistrationOptions struct { cronSchedule string maxRetries int name string } -type workflowRegistrationOption func(*workflowRegistrationParams) +type WorkflowRegistrationOption func(*WorkflowRegistrationOptions) const ( _DEFAULT_MAX_RECOVERY_ATTEMPTS = 100 @@ -319,8 +319,8 @@ const ( // WithMaxRetries sets the maximum number of retry attempts for workflow recovery. // If a workflow fails or is interrupted, it will be retried up to this many times. // After exceeding max retries, the workflow status becomes RETRIES_EXCEEDED. -func WithMaxRetries(maxRetries int) workflowRegistrationOption { - return func(p *workflowRegistrationParams) { +func WithMaxRetries(maxRetries int) WorkflowRegistrationOption { + return func(p *WorkflowRegistrationOptions) { p.maxRetries = maxRetries } } @@ -328,14 +328,14 @@ func WithMaxRetries(maxRetries int) workflowRegistrationOption { // WithSchedule registers the workflow as a scheduled workflow using cron syntax. // The schedule string follows standard cron format with second precision. // Scheduled workflows automatically receive a time.Time input parameter. -func WithSchedule(schedule string) workflowRegistrationOption { - return func(p *workflowRegistrationParams) { +func WithSchedule(schedule string) WorkflowRegistrationOption { + return func(p *WorkflowRegistrationOptions) { p.cronSchedule = schedule } } -func WithWorkflowName(name string) workflowRegistrationOption { - return func(p *workflowRegistrationParams) { +func WithWorkflowName(name string) WorkflowRegistrationOption { + return func(p *WorkflowRegistrationOptions) { p.name = name } } @@ -365,7 +365,7 @@ func WithWorkflowName(name string) workflowRegistrationOption { // dbos.WithMaxRetries(5), // dbos.WithSchedule("0 0 * * *")) // daily at midnight // dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow -func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts ...workflowRegistrationOption) { +func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts ...WorkflowRegistrationOption) { if ctx == nil { panic("ctx cannot be nil") } @@ -374,7 +374,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts panic("workflow function cannot be nil") } - registrationParams := workflowRegistrationParams{ + registrationParams := WorkflowRegistrationOptions{ maxRetries: _DEFAULT_MAX_RECOVERY_ATTEMPTS, } From df98273f26990b97cafc55fdd76fd5e21c8c238a Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 11:01:02 -0700 Subject: [PATCH 09/12] cleanup WorkflowOptions and StepOptions --- dbos/workflow.go | 82 ++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 4f4187f4..217b001e 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -446,7 +446,7 @@ type WorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error) // workflowFunc represents a type-erased workflow function used internally. type workflowFunc func(ctx DBOSContext, input any) (any, error) -type workflowParams struct { +type WorkflowOptions struct { workflowName string workflowID string queueName string @@ -457,12 +457,12 @@ type workflowParams struct { } // WorkflowOption is a functional option for configuring workflow execution parameters. -type WorkflowOption func(*workflowParams) +type WorkflowOption func(*WorkflowOptions) // WithWorkflowID sets a custom workflow ID instead of generating one automatically. // This is useful for idempotent workflow execution and workflow retrieval. func WithWorkflowID(id string) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.workflowID = id } } @@ -470,7 +470,7 @@ func WithWorkflowID(id string) WorkflowOption { // WithQueue enqueues the workflow to the specified queue instead of executing immediately. // Queued workflows will be processed by the queue runner according to the queue's configuration. func WithQueue(queueName string) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.queueName = queueName } } @@ -478,28 +478,28 @@ func WithQueue(queueName string) WorkflowOption { // WithApplicationVersion overrides the DBOS Context application version for this workflow. // This affects workflow recovery. func WithApplicationVersion(version string) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.applicationVersion = version } } // WithDeduplicationID sets a deduplication ID for the workflow. func WithDeduplicationID(id string) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.deduplicationID = id } } // WithPriority sets the execution priority for the workflow. func WithPriority(priority uint) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.priority = priority } } // An internal option we use to map the reflection function name to the registration options. func withWorkflowName(name string) WorkflowOption { - return func(p *workflowParams) { + return func(p *WorkflowOptions) { p.workflowName = name } } @@ -583,7 +583,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { // Apply options to build params - params := workflowParams{ + params := WorkflowOptions{ applicationVersion: c.GetApplicationVersion(), } for _, opt := range opts { @@ -833,23 +833,23 @@ type StepFunc[R any] func(ctx context.Context) (R, error) // StepOptions holds the configuration for step execution using functional options pattern. type StepOptions struct { - MaxRetries int // Maximum number of retry attempts (0 = no retries) - BackoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) - BaseInterval time.Duration // Initial delay between retries (default: 100ms) - MaxInterval time.Duration // Maximum delay between retries (default: 5s) - StepName string // Custom name for the step (defaults to function name) + maxRetries int // Maximum number of retry attempts (0 = no retries) + backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) + baseInterval time.Duration // Initial delay between retries (default: 100ms) + maxInterval time.Duration // Maximum delay between retries (default: 5s) + stepName string // Custom name for the step (defaults to function name) } // setDefaults applies default values to stepOptions func (opts *StepOptions) setDefaults() { - if opts.BackoffFactor == 0 { - opts.BackoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR + if opts.backoffFactor == 0 { + opts.backoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR } - if opts.BaseInterval == 0 { - opts.BaseInterval = _DEFAULT_STEP_BASE_INTERVAL + if opts.baseInterval == 0 { + opts.baseInterval = _DEFAULT_STEP_BASE_INTERVAL } - if opts.MaxInterval == 0 { - opts.MaxInterval = _DEFAULT_STEP_MAX_INTERVAL + if opts.maxInterval == 0 { + opts.maxInterval = _DEFAULT_STEP_MAX_INTERVAL } } @@ -861,8 +861,8 @@ type StepOption func(*StepOptions) // multiple WithStepName calls without overriding the first one. func WithStepName(name string) StepOption { return func(opts *StepOptions) { - if opts.StepName == "" { - opts.StepName = name + if opts.stepName == "" { + opts.stepName = name } } } @@ -871,7 +871,7 @@ func WithStepName(name string) StepOption { // A value of 0 means no retries (default behavior). func WithStepMaxRetries(maxRetries int) StepOption { return func(opts *StepOptions) { - opts.MaxRetries = maxRetries + opts.maxRetries = maxRetries } } @@ -880,7 +880,7 @@ func WithStepMaxRetries(maxRetries int) StepOption { // Default value is 2.0. func WithBackoffFactor(factor float64) StepOption { return func(opts *StepOptions) { - opts.BackoffFactor = factor + opts.backoffFactor = factor } } @@ -888,7 +888,7 @@ func WithBackoffFactor(factor float64) StepOption { // Default value is 100ms. func WithBaseInterval(interval time.Duration) StepOption { return func(opts *StepOptions) { - opts.BaseInterval = interval + opts.baseInterval = interval } } @@ -896,7 +896,7 @@ func WithBaseInterval(interval time.Duration) StepOption { // Default value is 5s. func WithMaxInterval(interval time.Duration) StepOption { return func(opts *StepOptions) { - opts.MaxInterval = interval + opts.maxInterval = interval } } @@ -980,12 +980,12 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) // Get workflow state from context wfState, ok := c.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return nil, newStepExecutionError("", stepOpts.StepName, "workflow state not found in context: are you running this step within a workflow?") + return nil, newStepExecutionError("", stepOpts.stepName, "workflow state not found in context: are you running this step within a workflow?") } // This should not happen when called from the package-level RunAsStep if fn == nil { - return nil, newStepExecutionError(wfState.workflowID, stepOpts.StepName, "step function cannot be nil") + return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, "step function cannot be nil") } // If within a step, just run the function directly @@ -1007,10 +1007,10 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) recordedOutput, err := c.systemDB.checkOperationExecution(uncancellableCtx, checkOperationExecutionDBInput{ workflowID: stepState.workflowID, stepID: stepState.stepID, - stepName: stepOpts.StepName, + stepName: stepOpts.stepName, }) if err != nil { - return nil, newStepExecutionError(stepState.workflowID, stepOpts.StepName, fmt.Sprintf("checking operation execution: %v", err)) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("checking operation execution: %v", err)) } if recordedOutput != nil { return recordedOutput.output, recordedOutput.err @@ -1023,23 +1023,23 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) // Retry if MaxRetries > 0 and the first execution failed var joinedErrors error - if stepError != nil && stepOpts.MaxRetries > 0 { + if stepError != nil && stepOpts.maxRetries > 0 { joinedErrors = errors.Join(joinedErrors, stepError) - for retry := 1; retry <= stepOpts.MaxRetries; retry++ { + for retry := 1; retry <= stepOpts.maxRetries; retry++ { // Calculate delay for exponential backoff - delay := stepOpts.BaseInterval + delay := stepOpts.baseInterval if retry > 1 { - exponentialDelay := float64(stepOpts.BaseInterval) * math.Pow(stepOpts.BackoffFactor, float64(retry-1)) - delay = time.Duration(math.Min(exponentialDelay, float64(stepOpts.MaxInterval))) + exponentialDelay := float64(stepOpts.baseInterval) * math.Pow(stepOpts.backoffFactor, float64(retry-1)) + delay = time.Duration(math.Min(exponentialDelay, float64(stepOpts.maxInterval))) } - c.logger.Error("step failed, retrying", "step_name", stepOpts.StepName, "retry", retry, "max_retries", stepOpts.MaxRetries, "delay", delay, "error", stepError) + c.logger.Error("step failed, retrying", "step_name", stepOpts.stepName, "retry", retry, "max_retries", stepOpts.maxRetries, "delay", delay, "error", stepError) // Wait before retry select { case <-c.Done(): - return nil, newStepExecutionError(stepState.workflowID, stepOpts.StepName, fmt.Sprintf("context cancelled during retry: %v", c.Err())) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("context cancelled during retry: %v", c.Err())) case <-time.After(delay): // Continue to retry } @@ -1056,8 +1056,8 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) joinedErrors = errors.Join(joinedErrors, stepError) // If max retries reached, create MaxStepRetriesExceeded error - if retry == stepOpts.MaxRetries { - stepError = newMaxStepRetriesExceededError(stepState.workflowID, stepOpts.StepName, stepOpts.MaxRetries, joinedErrors) + if retry == stepOpts.maxRetries { + stepError = newMaxStepRetriesExceededError(stepState.workflowID, stepOpts.stepName, stepOpts.maxRetries, joinedErrors) break } } @@ -1066,14 +1066,14 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) // Record the final result dbInput := recordOperationResultDBInput{ workflowID: stepState.workflowID, - stepName: stepOpts.StepName, + stepName: stepOpts.stepName, stepID: stepState.stepID, err: stepError, output: stepOutput, } recErr := c.systemDB.recordOperationResult(uncancellableCtx, dbInput) if recErr != nil { - return nil, newStepExecutionError(stepState.workflowID, stepOpts.StepName, fmt.Sprintf("recording step outcome: %v", recErr)) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("recording step outcome: %v", recErr)) } return stepOutput, stepError From af650e47d30d68c77d2eb8d6ae5a5d4918c04b0f Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 11:01:11 -0700 Subject: [PATCH 10/12] nit --- dbos/workflow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 217b001e..65df0c76 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1191,7 +1191,7 @@ type getEventInput struct { Timeout time.Duration // Maximum time to wait for the event to be set } -func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) { +func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID, key string, timeout time.Duration) (any, error) { input := getEventInput{ TargetWorkflowID: targetWorkflowID, Key: key, @@ -1214,7 +1214,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID string, key strin // return err // } // log.Printf("Status: %s", status) -func GetEvent[R any](ctx DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (R, error) { +func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error) { if ctx == nil { return *new(R), errors.New("ctx cannot be nil") } From f36da0242ce3398ae53a761ae48a7be4b7e2b55b Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 12:36:36 -0700 Subject: [PATCH 11/12] stepfunc /wf func must be exported in the interface --- dbos/dbos.go | 4 ++-- dbos/workflow.go | 34 +++++++++++++++++----------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 4539a2a2..f3c35e08 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -75,8 +75,8 @@ type DBOSContext interface { Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources // Workflow operations - RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow - RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution + RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow + RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow diff --git a/dbos/workflow.go b/dbos/workflow.go index 65df0c76..fae163e9 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -257,7 +257,7 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn wrappedWorkflowFun } } -func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn workflowFunc, cronSchedule string) { +func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn WorkflowFunc, cronSchedule string) { // Skip if we don't have a concrete dbosContext c, ok := ctx.(*dbosContext) if !ok { @@ -365,7 +365,7 @@ func WithWorkflowName(name string) WorkflowRegistrationOption { // dbos.WithMaxRetries(5), // dbos.WithSchedule("0 0 * * *")) // daily at midnight // dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow -func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts ...WorkflowRegistrationOption) { +func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption) { if ctx == nil { panic("ctx cannot be nil") } @@ -391,7 +391,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts gob.Register(r) // Register a type-erased version of the durable workflow for recovery - typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) { + typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) { typedInput, ok := input.(P) if !ok { wfID, err := ctx.GetWorkflowID() @@ -438,13 +438,13 @@ type DBOSContextKey string const workflowStateKey DBOSContextKey = "workflowState" -// WorkflowFunc represents a type-safe workflow function with specific input and output types. +// Workflow represents a type-safe workflow function with specific input and output types. // P is the input parameter type and R is the return type. // All workflow functions must accept a DBOSContext as their first parameter. -type WorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error) +type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error) -// workflowFunc represents a type-erased workflow function used internally. -type workflowFunc func(ctx DBOSContext, input any) (any, error) +// WorkflowFunc represents a type-erased workflow function used internally. +type WorkflowFunc func(ctx DBOSContext, input any) (any, error) type WorkflowOptions struct { workflowName string @@ -524,7 +524,7 @@ func withWorkflowName(name string) WorkflowOption { // } else { // log.Printf("Result: %v", result) // } -func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) { +func RunAsWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) { if ctx == nil { return nil, fmt.Errorf("ctx cannot be nil") } @@ -532,7 +532,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P // Add the fn name to the options so we can communicate it with DBOSContext.RunAsWorkflow opts = append(opts, withWorkflowName(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())) - typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) { + typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) { return fn(ctx, input.(P)) }) @@ -581,7 +581,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P return nil, fmt.Errorf("unexpected workflow handle type: %T", handle) } -func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { +func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { // Apply options to build params params := WorkflowOptions{ applicationVersion: c.GetApplicationVersion(), @@ -825,11 +825,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, o /******* STEP FUNCTIONS *******/ /******************************/ -// stepFunc represents a type-erased step function used internally. -type stepFunc func(ctx context.Context) (any, error) +// StepFunc represents a type-erased step function used internally. +type StepFunc func(ctx context.Context) (any, error) -// StepFunc represents a type-safe step function with a specific output type R. -type StepFunc[R any] func(ctx context.Context) (R, error) +// Step represents a type-safe step function with a specific output type R. +type Step[R any] func(ctx context.Context) (R, error) // StepOptions holds the configuration for step execution using functional options pattern. type StepOptions struct { @@ -940,7 +940,7 @@ func WithMaxInterval(interval time.Duration) StepOption { // Note that the function passed to RunAsStep must accept a context.Context as its first parameter // and this context *must* be the one specified in the function's signature (not the context passed to RunAsStep). // Under the hood, DBOS will augment the step's context and pass it to the function when executing it durably. -func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, error) { +func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { if ctx == nil { return *new(R), newStepExecutionError("", "", "ctx cannot be nil") } @@ -954,7 +954,7 @@ func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, e opts = append(opts, WithStepName(stepName)) // Type-erase the function - typeErasedFn := stepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) result, err := ctx.RunAsStep(ctx, typeErasedFn, opts...) // Step function could return a nil result @@ -969,7 +969,7 @@ func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, e return typedResult, err } -func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) { +func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { // Process functional options stepOpts := &StepOptions{} for _, opt := range opts { From 45912ba0f0fe9a9acc2a6b28fea5e2cf7728ca6f Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 12:41:01 -0700 Subject: [PATCH 12/12] remove unused type --- dbos/workflow.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index fae163e9..dcf47596 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -212,7 +212,6 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) { /**********************************/ /******* WORKFLOW REGISTRY *******/ /**********************************/ -type WrappedWorkflowFunc[P any, R any] func(ctx DBOSContext, input P, opts ...WorkflowOption) (WorkflowHandle[R], error) type wrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) type workflowRegistryEntry struct {