From 7c5e70bf4c924fdabcaf1b0ebb092e4108fc53db Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 18:12:49 -0700 Subject: [PATCH 01/20] client enqueue --- dbos/client_test.go | 174 +++++++++++++++++++++++++++++++++++++ dbos/dbos.go | 8 +- dbos/queue.go | 10 ++- dbos/recovery.go | 8 +- dbos/serialization_test.go | 4 +- dbos/system_database.go | 36 ++++++-- dbos/workflow.go | 137 ++++++++++++++++++++++++++--- dbos/workflows_test.go | 28 +++--- 8 files changed, 362 insertions(+), 43 deletions(-) create mode 100644 dbos/client_test.go diff --git a/dbos/client_test.go b/dbos/client_test.go new file mode 100644 index 00000000..ba77efd3 --- /dev/null +++ b/dbos/client_test.go @@ -0,0 +1,174 @@ +package dbos + +import ( + "fmt" + "testing" + "time" +) + +// Simple workflow that will be executed on the server +func serverWorkflow(ctx DBOSContext, input string) (string, error) { + if input != "test-input" { + return "", fmt.Errorf("unexpected input: %s", input) + } + return "processed: " + input, nil +} + +func TestClientEnqueue(t *testing.T) { + // Setup server context - this will process tasks + serverCtx := setupDBOS(t, true, true) + + // Create queue for communication between client and server + queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue") + + // Register workflows with custom names so client can reference them + RegisterWorkflow(serverCtx, serverWorkflow, WithWorkflowName("ServerWorkflow")) + + // Workflow that blocks until cancelled (for timeout test) + blockingWorkflow := func(ctx DBOSContext, _ string) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(10 * time.Second): + return "should-never-complete", nil + } + } + RegisterWorkflow(serverCtx, blockingWorkflow, WithWorkflowName("BlockingWorkflow")) + + // Launch the server context to start processing tasks + err := serverCtx.Launch() + if err != nil { + t.Fatalf("failed to launch server DBOS instance: %v", err) + } + + // Setup client context - this will enqueue tasks + clientCtx := setupDBOS(t, false, false) // Don't drop DB, don't check for leaks + + t.Run("EnqueueAndGetResult", func(t *testing.T) { + // Client enqueues a task using the new Enqueue method + handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowInput: "test-input", + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + if err != nil { + t.Fatalf("failed to enqueue workflow from client: %v", err) + } + + // Verify we got a polling handle + _, ok := handle.(*workflowPollingHandle[string]) + if !ok { + t.Fatalf("expected handle to be of type workflowPollingHandle, got %T", handle) + } + + // Client retrieves the result + result, err := handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from enqueued workflow: %v", err) + } + + expectedResult := "processed: test-input" + if result != expectedResult { + t.Fatalf("expected result to be '%s', got '%s'", expectedResult, result) + } + + // Verify the workflow status + status, err := handle.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status: %v", err) + } + + if status.Status != WorkflowStatusSuccess { + t.Fatalf("expected workflow status to be SUCCESS, got %v", status.Status) + } + + if status.Name != "ServerWorkflow" { + t.Fatalf("expected workflow name to be 'ServerWorkflow', got '%s'", status.Name) + } + + if status.QueueName != queue.Name { + t.Fatalf("expected queue name to be '%s', got '%s'", queue.Name, status.QueueName) + } + + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after global concurrency test") + } + }) + + t.Run("EnqueueWithCustomWorkflowID", func(t *testing.T) { + customWorkflowID := "custom-client-workflow-id" + + // Client enqueues a task with a custom workflow ID + _, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowID: customWorkflowID, + WorkflowInput: "test-input", + }) + if err != nil { + t.Fatalf("failed to enqueue workflow with custom ID: %v", err) + } + + // Verify the workflow ID is what we set + retrieveHandle, err := RetrieveWorkflow[string](clientCtx, customWorkflowID) + if err != nil { + t.Fatalf("failed to retrieve workflow by custom ID: %v", err) + } + + result, err := retrieveHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from retrieved workflow: %v", err) + } + + if result != "processed: test-input" { + t.Fatalf("expected retrieved workflow result to be 'processed: test-input', got '%s'", result) + } + + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after global concurrency test") + } + }) + + t.Run("EnqueueWithTimeout", func(t *testing.T) { + handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "BlockingWorkflow", + QueueName: queue.Name, + WorkflowInput: "blocking-input", + WorkflowTimeout: 500 * time.Millisecond, + }) + if err != nil { + t.Fatalf("failed to enqueue blocking workflow: %v", err) + } + + // Should timeout when trying to get result + _, err = handle.GetResult() + if err == nil { + t.Fatal("expected timeout error, but got none") + } + + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != AwaitedWorkflowCancelled { + t.Fatalf("expected error code to be AwaitedWorkflowCancelled, got %v", dbosErr.Code) + } + + // Verify workflow is cancelled + status, err := handle.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status: %v", err) + } + + if status.Status != WorkflowStatusCancelled { + t.Fatalf("expected workflow status to be CANCELLED, got %v", status.Status) + } + }) + + // Verify all queue entries are cleaned up + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after client tests") + } +} diff --git a/dbos/dbos.go b/dbos/dbos.go index fbdc6a72..f7758bdc 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -76,7 +76,7 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc) (any, error) // Execute a function as a durable step within a workflow RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution - Send(_ DBOSContext, input WorkflowSendInputInternal) error // Send a message to another workflow + Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow @@ -85,6 +85,7 @@ type DBOSContext interface { // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow + Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters // Accessors GetApplicationVersion() string // Get the application version for this context @@ -114,8 +115,9 @@ type dbosContext struct { workflowsWg *sync.WaitGroup // Workflow registry - workflowRegistry map[string]workflowRegistryEntry - workflowRegMutex *sync.RWMutex + workflowRegistry map[string]workflowRegistryEntry + workflowRegMutex *sync.RWMutex + workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN. // Workflow scheduler workflowScheduler *cron.Cron diff --git a/dbos/queue.go b/dbos/queue.go index 8b0846fd..320c1562 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -199,10 +199,16 @@ func (qr *queueRunner) run(ctx *dbosContext) { } for _, workflow := range dequeuedWorkflows { // Find the workflow in the registry - registeredWorkflow, exists := ctx.workflowRegistry[workflow.name] + + wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name) + if !ok { + ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.name) + continue + } + + registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)] if !exists { ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name) - continue } // Deserialize input diff --git a/dbos/recovery.go b/dbos/recovery.go index 63b48182..f3db7c00 100644 --- a/dbos/recovery.go +++ b/dbos/recovery.go @@ -36,7 +36,13 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow continue } - registeredWorkflow, exists := ctx.workflowRegistry[workflow.Name] + wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.Name) + if !ok { + ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.Name) + continue + } + + registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)] if !exists { ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name) continue diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 1071899a..4349a1ad 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -317,7 +317,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err }, } - err := SetEvent(ctx, WorkflowSetEventInputGeneric[UserDefinedEventData]{Key: input, Message: eventData}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData}) if err != nil { return "", err } @@ -394,7 +394,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string, // Send should automatically register this type with gob // Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input - err := Send(ctx, WorkflowSendInput[UserDefinedEventData]{ + err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{ DestinationID: destinationID, Topic: "user-defined-topic", Message: sendData, diff --git a/dbos/system_database.go b/dbos/system_database.go index a577b390..3329cf20 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -47,7 +47,7 @@ type systemDatabase interface { getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) // Communication (special steps) - send(ctx context.Context, input WorkflowSendInputInternal) error + send(ctx context.Context, input WorkflowSendInput) error recv(ctx context.Context, input WorkflowRecvInput) (any, error) setEvent(ctx context.Context, input WorkflowSetEventInput) error getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) @@ -297,6 +297,17 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt return nil, fmt.Errorf("failed to serialize input: %w", err) } + // Our DB works with NULL values + var applicationVersion *string + if len(input.status.ApplicationVersion) > 0 { + applicationVersion = &input.status.ApplicationVersion + } + + var deduplicationID *string + if len(input.status.DeduplicationID) > 0 { + deduplicationID = &input.status.DeduplicationID + } + query := `INSERT INTO dbos.workflow_status ( workflow_uuid, status, @@ -342,7 +353,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt input.status.AssumedRole, input.status.AuthenticatedRoles, input.status.ExecutorID, - input.status.ApplicationVersion, + applicationVersion, input.status.ApplicationID, input.status.CreatedAt.UnixMilli(), attempts, @@ -350,7 +361,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt timeoutMs, deadline, inputString, - input.status.DeduplicationID, + deduplicationID, input.status.Priority, WorkflowStatusEnqueued, WorkflowStatusEnqueued, @@ -526,13 +537,15 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( var deadlineMs, startedAtMs *int64 var outputString, inputString *string var errorStr *string + var deduplicationID *string + var applicationVersion *string err := rows.Scan( &wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole, &wf.AuthenticatedRoles, &outputString, &errorStr, &wf.ExecutorID, &createdAtMs, - &updatedAtMs, &wf.ApplicationVersion, &wf.ApplicationID, + &updatedAtMs, &applicationVersion, &wf.ApplicationID, &wf.Attempts, &queueName, &timeoutMs, - &deadlineMs, &startedAtMs, &wf.DeduplicationID, + &deadlineMs, &startedAtMs, &deduplicationID, &inputString, &wf.Priority, ) if err != nil { @@ -543,6 +556,15 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.QueueName = *queueName } + // We work with strings -- the DB could return NULL values + if applicationVersion != nil && len(*applicationVersion) > 0 { + wf.ApplicationVersion = *applicationVersion + } + + if deduplicationID != nil && len(*deduplicationID) > 0 { + wf.DeduplicationID = *deduplicationID + } + // Convert milliseconds to time.Time wf.CreatedAt = time.Unix(0, createdAtMs*int64(time.Millisecond)) wf.UpdatedAt = time.Unix(0, updatedAtMs*int64(time.Millisecond)) @@ -1166,7 +1188,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { const _DBOS_NULL_TOPIC = "__null__topic__" -type WorkflowSendInputInternal struct { +type WorkflowSendInput struct { DestinationID string Message any Topic string @@ -1175,7 +1197,7 @@ type WorkflowSendInputInternal struct { // Send is a special type of step that sends a message to another workflow. // Can be called both within a workflow (as a step) or outside a workflow (directly). // When called within a workflow: durability and the function run in the same transaction, and we forbid nested step execution -func (s *sysDB) send(ctx context.Context, input WorkflowSendInputInternal) error { +func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { functionName := "DBOS.send" // Get workflow state from context (optional for Send as we can send from outside a workflow) diff --git a/dbos/workflow.go b/dbos/workflow.go index 3dc2f966..40b617ec 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -52,7 +52,7 @@ type WorkflowStatus struct { Timeout time.Duration `json:"timeout"` // Workflow timeout duration Deadline time.Time `json:"deadline"` // Absolute deadline for workflow completion StartedAt time.Time `json:"started_at"` // When the workflow execution actually started - DeduplicationID *string `json:"deduplication_id"` // Deduplication identifier (if applicable) + DeduplicationID string `json:"deduplication_id"` // Deduplication identifier (if applicable) Input any `json:"input"` // Input parameters passed to the workflow Priority int `json:"priority"` // Execution priority (lower numbers have higher priority) } @@ -234,11 +234,19 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFun panic(newConflictingRegistrationError(workflowFQN)) } + // We must keep the registry indexed by FQN (because RunAsWorkflow uses reflection to find the function name and uses that to look it up in the registry) c.workflowRegistry[workflowFQN] = workflowRegistryEntry{ wrappedFunction: fn, maxRetries: maxRetries, name: customName, } + + // We need to get a mapping from custom name to FQN for registry lookups that might not know the FQN (queue, recovery) + if len(customName) > 0 { + c.workflowCustomNametoFQN.Store(customName, workflowFQN) + } else { + c.workflowCustomNametoFQN.Store(workflowFQN, workflowFQN) // Store the FQN as the custom name if none was provided + } } func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn WorkflowFunc, cronSchedule string) { @@ -1000,14 +1008,14 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc) (any, error) { /******* WORKFLOW COMMUNICATIONS ********/ /****************************************/ -// WorkflowSendInput defines the parameters for sending a message to another workflow. -type WorkflowSendInput[R any] struct { +// GenericWorkflowSendInput defines the parameters for sending a message to another workflow. +type GenericWorkflowSendInput[P any] struct { DestinationID string // Workflow ID to send the message to - Message R // Message payload (must be gob-encodable) + Message P // Message payload (must be gob-encodable) Topic string // Optional topic for message filtering } -func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInputInternal) error { +func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInput) error { return c.systemDB.send(c, input) } @@ -1024,13 +1032,13 @@ func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInputInternal) error // Message: "Hello from sender", // Topic: "notifications", // }) -func Send[R any](ctx DBOSContext, input WorkflowSendInput[R]) error { +func Send[P any](ctx DBOSContext, input GenericWorkflowSendInput[P]) error { if ctx == nil { return errors.New("ctx cannot be nil") } - var typedMessage R + var typedMessage P gob.Register(typedMessage) - return ctx.Send(ctx, WorkflowSendInputInternal{ + return ctx.Send(ctx, WorkflowSendInput{ DestinationID: input.DestinationID, Message: input.Message, Topic: input.Topic, @@ -1085,10 +1093,10 @@ func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) { return typedMessage, nil } -// WorkflowSetEventInputGeneric defines the parameters for setting a workflow event. -type WorkflowSetEventInputGeneric[R any] struct { +// GenericWorkflowSetEventInput defines the parameters for setting a workflow event. +type GenericWorkflowSetEventInput[P any] struct { Key string // Event key identifier - Message R // Event value (must be gob-encodable) + Message P // Event value (must be gob-encodable) } func (c *dbosContext) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error { @@ -1108,11 +1116,11 @@ func (c *dbosContext) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Key: "status", // Message: "processing-complete", // }) -func SetEvent[R any](ctx DBOSContext, input WorkflowSetEventInputGeneric[R]) error { +func SetEvent[P any](ctx DBOSContext, input GenericWorkflowSetEventInput[P]) error { if ctx == nil { return errors.New("ctx cannot be nil") } - var typedMessage R + var typedMessage P gob.Register(typedMessage) return ctx.SetEvent(ctx, WorkflowSetEventInput{ Key: input.Key, @@ -1151,7 +1159,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, // log.Printf("Status: %s", status) func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) { if ctx == nil { - return *new(R), errors.New("dbosCtx cannot be nil") + return *new(R), errors.New("ctx cannot be nil") } value, err := ctx.GetEvent(ctx, input) if err != nil { @@ -1239,3 +1247,104 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (workflowPollin } return workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil } + +type EnqueueOptions struct { + WorkflowName string + QueueName string + WorkflowID string + ApplicationVersion string + DeduplicationID string + WorkflowTimeout time.Duration + WorkflowInput any +} + +func (c *dbosContext) Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) { + workflowID := params.WorkflowID + if workflowID == "" { + workflowID = uuid.New().String() + } + + var deadline time.Time + if params.WorkflowTimeout > 0 { + deadline = time.Now().Add(params.WorkflowTimeout) + } + + status := WorkflowStatus{ + Name: params.WorkflowName, + ApplicationVersion: params.ApplicationVersion, + Status: WorkflowStatusEnqueued, + ID: workflowID, + CreatedAt: time.Now(), + Deadline: deadline, + Timeout: params.WorkflowTimeout, + Input: params.WorkflowInput, + QueueName: params.QueueName, + DeduplicationID: params.DeduplicationID, + } + + uncancellableCtx := WithoutCancel(c) + + tx, err := c.systemDB.(*sysDB).pool.Begin(uncancellableCtx) + if err != nil { + return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to begin transaction: %v", err)) + } + defer tx.Rollback(uncancellableCtx) // Rollback if not committed + + // Insert workflow status with transaction + insertInput := insertWorkflowStatusDBInput{ + status: status, + tx: tx, + } + _, err = c.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput) + if err != nil { + c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID) + return nil, err + } + + if err := tx.Commit(uncancellableCtx); err != nil { + return nil, fmt.Errorf("failed to commit transaction: %w", err) + } + + return &workflowPollingHandle[any]{ + workflowID: workflowID, + dbosContext: uncancellableCtx, + }, nil +} + +type GenericEnqueueOptions[P any] struct { + WorkflowName string + QueueName string + WorkflowID string + ApplicationVersion string + DeduplicationID string + WorkflowTimeout time.Duration + WorkflowInput P +} + +func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (WorkflowHandle[R], error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + + // Register the input and outputs for gob encoding + var typedInput P + gob.Register(typedInput) + var typedOutput R + gob.Register(typedOutput) + + // Call typed erased enqueue + handle, err := ctx.Enqueue(ctx, EnqueueOptions{ + WorkflowName: params.WorkflowName, + QueueName: params.QueueName, + WorkflowID: params.WorkflowID, + ApplicationVersion: params.ApplicationVersion, + DeduplicationID: params.DeduplicationID, + WorkflowInput: params.WorkflowInput, + WorkflowTimeout: params.WorkflowTimeout, + }) + + return &workflowPollingHandle[R]{ + workflowID: handle.GetWorkflowID(), + dbosContext: ctx, + }, err +} diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 816bab48..153b0fcf 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1346,7 +1346,7 @@ type sendWorkflowInput struct { } func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { - err := Send(ctx, WorkflowSendInput[string]{ + err := Send(ctx, GenericWorkflowSendInput[string]{ DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1", @@ -1354,11 +1354,11 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { if err != nil { return "", err } - err = Send(ctx, WorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message2"}) + err = Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message2"}) if err != nil { return "", err } - err = Send(ctx, WorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message3"}) + err = Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message3"}) if err != nil { return "", err } @@ -1402,7 +1402,7 @@ func receiveWorkflowCoordinated(ctx DBOSContext, input struct { func sendStructWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} - err := Send(ctx, WorkflowSendInput[sendRecvType]{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) + err := Send(ctx, GenericWorkflowSendInput[sendRecvType]{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) return "", err } @@ -1411,7 +1411,7 @@ func receiveStructWorkflow(ctx DBOSContext, topic string) (sendRecvType, error) } func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { - err := Send(ctx, WorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"}) + err := Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"}) if err != nil { return "", err } @@ -1432,7 +1432,7 @@ func receiveIdempotencyWorkflow(ctx DBOSContext, topic string) (string, error) { } func stepThatCallsSend(ctx context.Context, input sendWorkflowInput) (string, error) { - err := Send(ctx.(DBOSContext), WorkflowSendInput[string]{ + err := Send(ctx.(DBOSContext), GenericWorkflowSendInput[string]{ DestinationID: input.DestinationID, Topic: input.Topic, Message: "message-from-step", @@ -1678,7 +1678,7 @@ func TestSendRecv(t *testing.T) { // Send messages from outside a workflow context (should work now) for i := range 3 { - err = Send(dbosCtx, WorkflowSendInput[string]{ + err = Send(dbosCtx, GenericWorkflowSendInput[string]{ DestinationID: receiveHandle.GetWorkflowID(), Topic: "outside-workflow-topic", Message: fmt.Sprintf("message%d", i+1), @@ -1935,7 +1935,7 @@ type setEventWorkflowInput struct { } func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{Key: input.Key, Message: input.Message}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: input.Key, Message: input.Message}) if err != nil { return "", err } @@ -1956,7 +1956,7 @@ func getEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, err func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { // Set the first event - err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{Key: "event1", Message: "first-event-message"}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: "event1", Message: "first-event-message"}) if err != nil { return "", err } @@ -1965,7 +1965,7 @@ func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, setSecondEventSignal.Wait() // Set the second event - err = SetEvent(ctx, WorkflowSetEventInputGeneric[string]{Key: "event2", Message: "second-event-message"}) + err = SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: "event2", Message: "second-event-message"}) if err != nil { return "", err } @@ -1974,7 +1974,7 @@ func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, } func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{Key: input.Key, Message: input.Message}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: input.Key, Message: input.Message}) if err != nil { return "", err } @@ -2342,7 +2342,7 @@ func TestSetGetEvent(t *testing.T) { t.Run("SetGetEventMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run SetEvent outside of a workflow context - err := SetEvent(dbosCtx, WorkflowSetEventInputGeneric[string]{Key: "test-key", Message: "test-message"}) + err := SetEvent(dbosCtx, GenericWorkflowSetEventInput[string]{Key: "test-key", Message: "test-message"}) if err == nil { t.Fatal("expected error when running SetEvent outside of workflow context, but got none") } @@ -3018,7 +3018,7 @@ func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { } func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { - err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{ + err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{ Key: "event-key", Message: fmt.Sprintf("notification-message-%d", pairID), }) @@ -3040,7 +3040,7 @@ func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { } func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) { - err := Send(ctx, WorkflowSendInput[string]{ + err := Send(ctx, GenericWorkflowSendInput[string]{ DestinationID: fmt.Sprintf("send-recv-receiver-%d", pairID), Topic: "send-recv-topic", Message: fmt.Sprintf("send-recv-message-%d", pairID), From 9263eb11a83edde29f3f6cc4130799a9d4094c3e Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 11:08:14 -0700 Subject: [PATCH 02/20] check udf sez/dez on client enqueue path --- dbos/client_test.go | 25 +++++++++++++------------ dbos/workflow.go | 2 ++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index ba77efd3..51a6aaca 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -6,14 +6,6 @@ import ( "time" ) -// Simple workflow that will be executed on the server -func serverWorkflow(ctx DBOSContext, input string) (string, error) { - if input != "test-input" { - return "", fmt.Errorf("unexpected input: %s", input) - } - return "processed: " + input, nil -} - func TestClientEnqueue(t *testing.T) { // Setup server context - this will process tasks serverCtx := setupDBOS(t, true, true) @@ -22,6 +14,15 @@ func TestClientEnqueue(t *testing.T) { queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue") // Register workflows with custom names so client can reference them + type wfInput struct { + Input string + } + serverWorkflow := func(ctx DBOSContext, input wfInput) (string, error) { + if input.Input != "test-input" { + return "", fmt.Errorf("unexpected input: %s", input.Input) + } + return "processed: " + input.Input, nil + } RegisterWorkflow(serverCtx, serverWorkflow, WithWorkflowName("ServerWorkflow")) // Workflow that blocks until cancelled (for timeout test) @@ -46,10 +47,10 @@ func TestClientEnqueue(t *testing.T) { t.Run("EnqueueAndGetResult", func(t *testing.T) { // Client enqueues a task using the new Enqueue method - handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + handle, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ WorkflowName: "ServerWorkflow", QueueName: queue.Name, - WorkflowInput: "test-input", + WorkflowInput: wfInput{Input: "test-input"}, ApplicationVersion: serverCtx.GetApplicationVersion(), }) if err != nil { @@ -100,11 +101,11 @@ func TestClientEnqueue(t *testing.T) { customWorkflowID := "custom-client-workflow-id" // Client enqueues a task with a custom workflow ID - _, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + _, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ WorkflowName: "ServerWorkflow", QueueName: queue.Name, WorkflowID: customWorkflowID, - WorkflowInput: "test-input", + WorkflowInput: wfInput{Input: "test-input"}, }) if err != nil { t.Fatalf("failed to enqueue workflow with custom ID: %v", err) diff --git a/dbos/workflow.go b/dbos/workflow.go index 40b617ec..77babdd5 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -379,6 +379,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R // This type check is redundant with the one in the wrapper, but I'd better be safe than sorry typedInput, ok := input.(P) if !ok { + // FIXME: we need to record the error in the database here return nil, newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input)) } return fn(ctx, typedInput) @@ -387,6 +388,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R typeErasedWrapper := WrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) { typedInput, ok := input.(P) if !ok { + // FIXME: we need to record the error in the database here return nil, newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input)) } From 3b82d401879ed4a4455312326bbf3700d004062f Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 11:51:21 -0700 Subject: [PATCH 03/20] cancel/resume --- dbos/client_test.go | 210 +++++++++++++++++++++++++++++++++++++++- dbos/dbos.go | 2 + dbos/system_database.go | 57 +++++++++++ dbos/workflow.go | 51 ++++++++++ 4 files changed, 319 insertions(+), 1 deletion(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 51a6aaca..420e61ab 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -1,12 +1,13 @@ package dbos import ( + "context" "fmt" "testing" "time" ) -func TestClientEnqueue(t *testing.T) { +func TestEnqueue(t *testing.T) { // Setup server context - this will process tasks serverCtx := setupDBOS(t, true, true) @@ -173,3 +174,210 @@ func TestClientEnqueue(t *testing.T) { t.Fatal("expected queue entries to be cleaned up after client tests") } } + +func TestCancelResume(t *testing.T) { + var stepsCompleted int + + // Setup server context - this will process tasks + serverCtx := setupDBOS(t, true, true) + + // Create queue for communication between client and server + queue := NewWorkflowQueue(serverCtx, "cancel-resume-queue") + + // Step functions + step := func(ctx context.Context) (string, error) { + stepsCompleted++ + return "step-complete", nil + } + + // Events for synchronization + workflowStarted := NewEvent() + proceedSignal := NewEvent() + + // Workflow that executes steps with blocking behavior + cancelResumeWorkflow := func(ctx DBOSContext, input int) (int, error) { + // Execute step one + _, err := RunAsStep(ctx, step) + if err != nil { + return 0, err + } + + // Signal that workflow has started and step one completed + workflowStarted.Set() + + // Wait for signal from main test to proceed + proceedSignal.Wait() + + // Execute step two (will only happen if not cancelled) + _, err = RunAsStep(ctx, step) + if err != nil { + return 0, err + } + + return input, nil + } + RegisterWorkflow(serverCtx, cancelResumeWorkflow, WithWorkflowName("CancelResumeWorkflow")) + + // Launch the server context to start processing tasks + err := serverCtx.Launch() + if err != nil { + t.Fatalf("failed to launch server DBOS instance: %v", err) + } + + // Setup client context - this will enqueue tasks + clientCtx := setupDBOS(t, false, false) // Don't drop DB, don't check for leaks + + t.Run("CancelAndResume", func(t *testing.T) { + // Reset the global counter + stepsCompleted = 0 + input := 5 + workflowID := "test-cancel-resume-workflow" + + // Start the workflow - it will execute step one and then wait + handle, err := Enqueue[int, int](clientCtx, GenericEnqueueOptions[int]{ + WorkflowName: "CancelResumeWorkflow", + QueueName: queue.Name, + WorkflowID: workflowID, + WorkflowInput: input, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + if err != nil { + t.Fatalf("failed to enqueue workflow from client: %v", err) + } + + // Wait for workflow to signal it has started and step one completed + workflowStarted.Wait() + + // Verify step one completed but step two hasn't + if stepsCompleted != 1 { + t.Fatalf("expected steps completed to be 1, got %d", stepsCompleted) + } + + // Cancel the workflow + err = clientCtx.CancelWorkflow(workflowID) + if err != nil { + t.Fatalf("failed to cancel workflow: %v", err) + } + + // Verify workflow is cancelled + cancelStatus, err := handle.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status: %v", err) + } + + if cancelStatus.Status != WorkflowStatusCancelled { + t.Fatalf("expected workflow status to be CANCELLED, got %v", cancelStatus.Status) + } + + // Resume the workflow + resumeHandle, err := ResumeWorkflow[int](clientCtx, workflowID) + if err != nil { + t.Fatalf("failed to resume workflow: %v", err) + } + + // Wait for workflow completion + proceedSignal.Set() // Allow the workflow to proceed to step two + result, err := resumeHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from resumed workflow: %v", err) + } + + // Verify the result + if result != input { + t.Fatalf("expected result to be %d, got %d", input, result) + } + + // Verify both steps completed + if stepsCompleted != 2 { + t.Fatalf("expected steps completed to be 2, got %d", stepsCompleted) + } + + // Check final status + finalStatus, err := resumeHandle.GetStatus() + if err != nil { + t.Fatalf("failed to get final workflow status: %v", err) + } + + if finalStatus.Status != WorkflowStatusSuccess { + t.Fatalf("expected final workflow status to be SUCCESS, got %v", finalStatus.Status) + } + + // After resume, the queue name should change to the internal queue name + if finalStatus.QueueName != _DBOS_INTERNAL_QUEUE_NAME { + t.Fatalf("expected queue name to be %s, got '%s'", _DBOS_INTERNAL_QUEUE_NAME, finalStatus.QueueName) + } + + // Resume the workflow again - should not run again + resumeAgainHandle, err := ResumeWorkflow[int](clientCtx, workflowID) + if err != nil { + t.Fatalf("failed to resume workflow again: %v", err) + } + + resultAgain, err := resumeAgainHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from second resume: %v", err) + } + + if resultAgain != input { + t.Fatalf("expected second resume result to be %d, got %d", input, resultAgain) + } + + // Verify steps didn't run again + if stepsCompleted != 2 { + t.Fatalf("expected steps completed to remain 2 after second resume, got %d", stepsCompleted) + } + + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after cancel/resume test") + } + }) + + t.Run("CancelNonExistentWorkflow", func(t *testing.T) { + nonExistentWorkflowID := "non-existent-workflow-id" + + // Try to cancel a non-existent workflow + err := clientCtx.CancelWorkflow(nonExistentWorkflowID) + if err == nil { + t.Fatal("expected error when canceling non-existent workflow, but got none") + } + + // Verify error type and code + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != NonExistentWorkflowError { + t.Fatalf("expected error code to be NonExistentWorkflowError, got %v", dbosErr.Code) + } + + if dbosErr.DestinationID != nonExistentWorkflowID { + t.Fatalf("expected DestinationID to be %s, got %s", nonExistentWorkflowID, dbosErr.DestinationID) + } + }) + + t.Run("ResumeNonExistentWorkflow", func(t *testing.T) { + nonExistentWorkflowID := "non-existent-resume-workflow-id" + + // Try to resume a non-existent workflow + _, err := ResumeWorkflow[int](clientCtx, nonExistentWorkflowID) + fmt.Println(err) + if err == nil { + t.Fatal("expected error when resuming non-existent workflow, but got none") + } + + // Verify error type and code + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != NonExistentWorkflowError { + t.Fatalf("expected error code to be NonExistentWorkflowError, got %v", dbosErr.Code) + } + + if dbosErr.DestinationID != nonExistentWorkflowID { + t.Fatalf("expected DestinationID to be %s, got %s", nonExistentWorkflowID, dbosErr.DestinationID) + } + }) +} diff --git a/dbos/dbos.go b/dbos/dbos.go index f7758bdc..ee4f94dd 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -86,6 +86,8 @@ type DBOSContext interface { // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters + CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED + ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/system_database.go b/dbos/system_database.go index 3329cf20..70cd3f40 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -35,6 +35,7 @@ type systemDatabase interface { updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) cancelWorkflow(ctx context.Context, workflowID string) error + resumeWorkflow(ctx context.Context, workflowID string) error // Child workflows recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error @@ -693,6 +694,62 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error { return nil } +func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error { + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) + + // Execute with snapshot isolation in case of concurrent calls on the same workflow + _, err = tx.Exec(ctx, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + if err != nil { + return fmt.Errorf("failed to set transaction isolation level: %w", err) + } + + // Check the status of the workflow. If it is complete, do nothing. + listInput := listWorkflowsDBInput{ + workflowIDs: []string{workflowID}, + tx: tx, + } + wfs, err := s.listWorkflows(ctx, listInput) + if err != nil { + s.logger.Error("ResumeWorkflow: failed to list workflows", "error", err) + return err + } + if len(wfs) == 0 { + return newNonExistentWorkflowError(workflowID) + } + + wf := wfs[0] + if wf.Status == WorkflowStatusSuccess || wf.Status == WorkflowStatusError { + return nil // Workflow is complete, do nothing + } + + // Set the workflow's status to ENQUEUED and clear its recovery attempts and deadline + updateStatusQuery := `UPDATE dbos.workflow_status + SET status = $1, queue_name = $2, recovery_attempts = $3, + workflow_deadline_epoch_ms = NULL, deduplication_id = NULL, + started_at_epoch_ms = NULL, updated_at = $4 + WHERE workflow_uuid = $5` + + _, err = tx.Exec(ctx, updateStatusQuery, + WorkflowStatusEnqueued, + _DBOS_INTERNAL_QUEUE_NAME, + 0, + time.Now().UnixMilli(), + workflowID) + if err != nil { + return fmt.Errorf("failed to update workflow status to ENQUEUED: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) { query := `SELECT status, output, error FROM dbos.workflow_status WHERE workflow_uuid = $1` var status WorkflowStatusType diff --git a/dbos/workflow.go b/dbos/workflow.go index 77babdd5..291a7e11 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1350,3 +1350,54 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo dbosContext: ctx, }, err } + +// CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED. +// Once cancelled, the workflow will stop executing and cannot be resumed. +// If the workflow has already completed (SUCCESS or ERROR), this operation has no effect. +// The workflow's final status and any partial results remain accessible through its handle. +// +// Parameters: +// - workflowID: The unique identifier of the workflow to cancel +// +// Returns an error if the workflow does not exist or if the cancellation operation fails. +func (c *dbosContext) CancelWorkflow(workflowID string) error { + return c.systemDB.cancelWorkflow(c, workflowID) +} + +func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) { + err := c.systemDB.resumeWorkflow(c, workflowID) + if err != nil { + return nil, err + } + return &workflowPollingHandle[any]{workflowID: workflowID, dbosContext: c}, nil +} + +// ResumeWorkflow resumes a cancelled workflow by setting its status back to ENQUEUED. +// The workflow will be picked up by the queue processor and execution will continue +// from where it left off. If the workflow is already completed, this is a no-op. +// Returns a handle that can be used to wait for completion and retrieve results. +// Returns an error if the workflow does not exist or if the cancellation operation fails. +// +// Example: +// +// handle, err := dbos.ResumeWorkflow[int](ctx, "workflow-id") +// if err != nil { +// log.Printf("Failed to resume workflow: %v", err) +// } else { +// result, err := handle.GetResult() // blocks until completion +// if err != nil { +// log.Printf("Workflow failed: %v", err) +// } else { +// log.Printf("Result: %d", result) +// } +// } +func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + _, err := ctx.ResumeWorkflow(ctx, workflowID) + if err != nil { + return nil, err + } + return &workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil +} From 82faad32657f463899e310be65d86feab7216b5d Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 14:53:43 -0700 Subject: [PATCH 04/20] fork --- dbos/client_test.go | 251 ++++++++++++++++++++++++++++++++++++++++ dbos/dbos.go | 9 +- dbos/system_database.go | 127 +++++++++++++++++++- dbos/workflow.go | 134 +++++++++++++++++++++ 4 files changed, 516 insertions(+), 5 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 420e61ab..35566f2b 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -3,6 +3,7 @@ package dbos import ( "context" "fmt" + "strings" "testing" "time" ) @@ -381,3 +382,253 @@ func TestCancelResume(t *testing.T) { } }) } + +func TestForkWorkflow(t *testing.T) { + // Global counters for tracking execution (no mutex needed since workflows run solo) + var ( + stepCount1 int + stepCount2 int + child1Count int + child2Count int + ) + + // Setup server context - this will process tasks + serverCtx := setupDBOS(t, true, true) + + // Create queue for communication between client and server + queue := NewWorkflowQueue(serverCtx, "fork-workflow-queue") + + // Simple child workflows (no steps, just increment counters) + childWorkflow1 := func(ctx DBOSContext, input string) (string, error) { + child1Count++ + return "child1-" + input, nil + } + RegisterWorkflow(serverCtx, childWorkflow1, WithWorkflowName("ChildWorkflow1")) + + childWorkflow2 := func(ctx DBOSContext, input string) (string, error) { + child2Count++ + return "child2-" + input, nil + } + RegisterWorkflow(serverCtx, childWorkflow2, WithWorkflowName("ChildWorkflow2")) + + // Parent workflow with 2 steps and 2 child workflows + parentWorkflow := func(ctx DBOSContext, input string) (string, error) { + // Step 1 + step1Result, err := RunAsStep(ctx, func(ctx context.Context) (string, error) { + stepCount1++ + return "step1-" + input, nil + }) + if err != nil { + return "", err + } + + // Child workflow 1 + child1Handle, err := RunAsWorkflow(ctx, childWorkflow1, input) + if err != nil { + return "", err + } + child1Result, err := child1Handle.GetResult() + if err != nil { + return "", err + } + + // Step 2 + step2Result, err := RunAsStep(ctx, func(ctx context.Context) (string, error) { + stepCount2++ + return "step2-" + input, nil + }) + if err != nil { + return "", err + } + + // Child workflow 2 + child2Handle, err := RunAsWorkflow(ctx, childWorkflow2, input) + if err != nil { + return "", err + } + child2Result, err := child2Handle.GetResult() + if err != nil { + return "", err + } + + return step1Result + "+" + step2Result + "+" + child1Result + "+" + child2Result, nil + } + RegisterWorkflow(serverCtx, parentWorkflow, WithWorkflowName("ParentWorkflow")) + + // Launch the server context to start processing tasks + err := serverCtx.Launch() + if err != nil { + t.Fatalf("failed to launch server DBOS instance: %v", err) + } + + // Setup client context + clientCtx := setupDBOS(t, false, false) + + t.Run("ForkAtAllSteps", func(t *testing.T) { + // Reset counters + stepCount1, stepCount2, child1Count, child2Count = 0, 0, 0, 0 + + originalWorkflowID := "original-workflow-fork-test" + + // 1. Run the entire workflow first and check counters are 1 + handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "ParentWorkflow", + QueueName: queue.Name, + WorkflowID: originalWorkflowID, + WorkflowInput: "test", + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + if err != nil { + t.Fatalf("failed to enqueue original workflow: %v", err) + } + + // Wait for the original workflow to complete + result, err := handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from original workflow: %v", err) + } + + expectedResult := "step1-test+step2-test+child1-test+child2-test" + if result != expectedResult { + t.Fatalf("expected result to be '%s', got '%s'", expectedResult, result) + } + + // Verify all counters are 1 after original workflow + if stepCount1 != 1 || stepCount2 != 1 || child1Count != 1 || child2Count != 1 { + t.Fatalf("expected counters to be (step1:1, step2:1, child1:1, child2:1), got (step1:%d, step2:%d, child1:%d, child2:%d)", stepCount1, stepCount2, child1Count, child2Count) + } + + // 2. Fork from each step 1 to 6 and verify results + // Note: there's 6 steps: 2 steps 2 children and 2 GetResults + for step := 1; step <= 6; step++ { + t.Logf("Forking at step %d", step) + + customForkedWorkflowID := fmt.Sprintf("forked-workflow-step-%d", step) + forkedHandle, err := ForkWorkflow[string](clientCtx, originalWorkflowID, WithForkWorkflowID(customForkedWorkflowID), WithForkStartStep(uint(step-1))) + if err != nil { + t.Fatalf("failed to fork workflow at step %d: %v", step, err) + } + + forkedWorkflowID := forkedHandle.GetWorkflowID() + if forkedWorkflowID != customForkedWorkflowID { + t.Fatalf("expected forked workflow ID to be '%s', got '%s'", customForkedWorkflowID, forkedWorkflowID) + } + + forkedResult, err := forkedHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from forked workflow at step %d: %v", step, err) + } + + // 1) Verify workflow result is correct + if forkedResult != expectedResult { + t.Fatalf("forked workflow at step %d: expected result '%s', got '%s'", step, expectedResult, forkedResult) + } + + // 2) Verify counters are at expected totals based on the step where we're forking + t.Logf("Step %d: actual counters - step1:%d, step2:%d, child1:%d, child2:%d", step, stepCount1, stepCount2, child1Count, child2Count) + + // First step is executed only once + if stepCount1 != 1+1 { + t.Fatalf("forked workflow at step %d: step1 counter should be 2, got %d", step, stepCount1) + } + + // First child will be executed twice + if step < 3 { + if child1Count != 1+step { + t.Fatalf("forked workflow at step %d: child1 counter should be %d, got %d", step, 1+step, child1Count) + } + } else { + if child1Count != 1+2 { + t.Fatalf("forked workflow at step %d: child2 counter should be 3, got %d", step, child1Count) + } + } + + // Second step (in reality step 4) will be executed 4 times + if step < 5 { + if stepCount2 != 1+step { + t.Fatalf("forked workflow at step %d: step2 counter should be %d, got %d", step, 1+step, stepCount2) + } + } else { + if stepCount2 != 1+4 { + t.Fatalf("forked workflow at step %d: step2 counter should be 5, got %d", step, stepCount2) + } + } + + // Second child will be executed 5 times + if step < 6 { + if child2Count != 1+step { + t.Fatalf("forked workflow at step %d: child2 counter should be %d, got %d", step, 1+step, child2Count) + } + } else { + if child2Count != 1+5 { + t.Fatalf("forked workflow at step %d: child2 counter should be 6, got %d", step, child2Count) + } + } + + t.Logf("Step %d: all counter totals verified correctly", step) + } + + t.Logf("Final counters after all forks - steps:%d, child1:%d, child2:%d", stepCount1, child1Count, child2Count) + }) + + t.Run("ForkNonExistentWorkflow", func(t *testing.T) { + nonExistentWorkflowID := "non-existent-workflow-for-fork" + + // Try to fork a non-existent workflow + _, err := clientCtx.ForkWorkflow(clientCtx, nonExistentWorkflowID, WithForkStartStep(1)) + if err == nil { + t.Fatal("expected error when forking non-existent workflow, but got none") + } + + // Verify error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != NonExistentWorkflowError { + t.Fatalf("expected error code to be NonExistentWorkflowError, got %v", dbosErr.Code) + } + + if dbosErr.DestinationID != nonExistentWorkflowID { + t.Fatalf("expected DestinationID to be %s, got %s", nonExistentWorkflowID, dbosErr.DestinationID) + } + }) + + t.Run("ForkWithInvalidStep", func(t *testing.T) { + originalWorkflowID := "original-workflow-invalid-step" + + // Create an original workflow first + handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "ParentWorkflow", + QueueName: queue.Name, + WorkflowID: originalWorkflowID, + WorkflowInput: "test", + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + if err != nil { + t.Fatalf("failed to enqueue original workflow: %v", err) + } + + // Wait for completion + _, err = handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from original workflow: %v", err) + } + + // Try to fork at step 999 (beyond workflow's actual steps) + _, err = clientCtx.ForkWorkflow(clientCtx, originalWorkflowID, WithForkStartStep(999)) + if err == nil { + t.Fatal("expected error when forking at step 999, but got none") + } + // Verify the error message + if !strings.Contains(err.Error(), "exceeds workflow's maximum step") { + t.Fatalf("expected error message to contain 'exceeds workflow's maximum step', got: %v", err) + } + }) + + // Verify all queue entries are cleaned up + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after fork workflow tests") + } +} diff --git a/dbos/dbos.go b/dbos/dbos.go index ee4f94dd..1d31ffb1 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -84,10 +84,11 @@ type DBOSContext interface { GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) // Workflow management - RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow - Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters - CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED - ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow + RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow + Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters + CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED + ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow + ForkWorkflow(_ DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[any], error) // Fork a workflow from a specific step // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/system_database.go b/dbos/system_database.go index 70cd3f40..9ab2da07 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -36,6 +36,7 @@ type systemDatabase interface { awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) cancelWorkflow(ctx context.Context, workflowID string) error resumeWorkflow(ctx context.Context, workflowID string) error + forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error // Child workflows recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error @@ -540,10 +541,11 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( var errorStr *string var deduplicationID *string var applicationVersion *string + var executorID *string err := rows.Scan( &wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole, - &wf.AuthenticatedRoles, &outputString, &errorStr, &wf.ExecutorID, &createdAtMs, + &wf.AuthenticatedRoles, &outputString, &errorStr, &executorID, &createdAtMs, &updatedAtMs, &applicationVersion, &wf.ApplicationID, &wf.Attempts, &queueName, &timeoutMs, &deadlineMs, &startedAtMs, &deduplicationID, @@ -557,6 +559,11 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.QueueName = *queueName } + // Handle NULL executorID + if executorID != nil && len(*executorID) > 0 { + wf.ExecutorID = *executorID + } + // We work with strings -- the DB could return NULL values if applicationVersion != nil && len(*applicationVersion) > 0 { wf.ApplicationVersion = *applicationVersion @@ -750,6 +757,124 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error { return nil } +type forkWorkflowDBInput struct { + originalWorkflowID string + forkedWorkflowID string + startStep int + applicationVersion string +} + +func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error { + // Validate startStep + if input.startStep < 0 { + return fmt.Errorf("startStep must be >= 0, got %d", input.startStep) + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) + + // Get the original workflow status + listInput := listWorkflowsDBInput{ + workflowIDs: []string{input.originalWorkflowID}, + tx: tx, + } + wfs, err := s.listWorkflows(ctx, listInput) + if err != nil { + return fmt.Errorf("failed to list workflows: %w", err) + } + if len(wfs) == 0 { + return newNonExistentWorkflowError(input.originalWorkflowID) + } + + originalWorkflow := wfs[0] + + // Check if the workflow has completed successfully (required for forking) + if originalWorkflow.Status != WorkflowStatusSuccess { + return fmt.Errorf("cannot fork workflow %s: workflow must be in SUCCESS state, current state: %s", input.originalWorkflowID, originalWorkflow.Status) + } + + // Validate that startStep doesn't exceed the workflow's actual steps + maxStepQuery := `SELECT COALESCE(MAX(function_id), 0) FROM dbos.operation_outputs WHERE workflow_uuid = $1` + var maxStepID int + err = tx.QueryRow(ctx, maxStepQuery, input.originalWorkflowID).Scan(&maxStepID) + if err != nil { + return fmt.Errorf("failed to query max step ID: %w", err) + } + if input.startStep > maxStepID && maxStepID > 0 { + return fmt.Errorf("startStep %d exceeds workflow's maximum step %d", input.startStep, maxStepID) + } + + // Determine the application version to use + appVersion := originalWorkflow.ApplicationVersion + if input.applicationVersion != "" { + appVersion = input.applicationVersion + } + + // Create an entry for the forked workflow with the same initial values as the original + insertQuery := `INSERT INTO dbos.workflow_status ( + workflow_uuid, + status, + name, + authenticated_user, + assumed_role, + authenticated_roles, + application_version, + application_id, + queue_name, + inputs, + created_at, + updated_at, + recovery_attempts + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)` + + inputString, err := serialize(originalWorkflow.Input) + if err != nil { + return fmt.Errorf("failed to serialize input: %w", err) + } + + _, err = tx.Exec(ctx, insertQuery, + input.forkedWorkflowID, + WorkflowStatusEnqueued, + originalWorkflow.Name, + originalWorkflow.AuthenticatedUser, + originalWorkflow.AssumedRole, + originalWorkflow.AuthenticatedRoles, + &appVersion, + originalWorkflow.ApplicationID, + _DBOS_INTERNAL_QUEUE_NAME, + inputString, + time.Now().UnixMilli(), + time.Now().UnixMilli(), + 0) + + if err != nil { + return fmt.Errorf("failed to insert forked workflow status: %w", err) + } + + // If startStep > 0, copy the original workflow's outputs into the forked workflow + if input.startStep > 0 { + copyOutputsQuery := `INSERT INTO dbos.operation_outputs + (workflow_uuid, function_id, output, error, function_name, child_workflow_id) + SELECT $1, function_id, output, error, function_name, child_workflow_id + FROM dbos.operation_outputs + WHERE workflow_uuid = $2 AND function_id < $3` + + _, err = tx.Exec(ctx, copyOutputsQuery, input.forkedWorkflowID, input.originalWorkflowID, input.startStep) + if err != nil { + return fmt.Errorf("failed to copy operation outputs: %w", err) + } + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) { query := `SELECT status, output, error FROM dbos.workflow_status WHERE workflow_uuid = $1` var status WorkflowStatusType diff --git a/dbos/workflow.go b/dbos/workflow.go index 291a7e11..fa73bbf5 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1401,3 +1401,137 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R } return &workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil } + +// forkWorkflowParams holds configuration parameters for forking workflows +type forkWorkflowParams struct { + applicationVersion string + forkedWorkflowID string + startStep uint +} + +// ForkWorkflowOption is a functional option for configuring fork workflow execution parameters. +type ForkWorkflowOption func(*forkWorkflowParams) + +// WithForkApplicationVersion overrides the application version for the forked workflow. +// If not specified, the original workflow's application version is used. +// +// Example: +// +// dbos.ForkWorkflow[Result](ctx, originalID, 1, +// dbos.WithForkApplicationVersion("v2.0.0")) +func WithForkApplicationVersion(version string) ForkWorkflowOption { + return func(p *forkWorkflowParams) { + p.applicationVersion = version + } +} + +// WithForkStartStep overrides the start step for the forked workflow. +// This is an alternative to specifying the startStep as a parameter. +// If both are specified, this option takes precedence. +// +// Example: +// +// dbos.ForkWorkflow[Result](ctx, originalID, 1, +// dbos.WithForkStartStep(5)) // Will start from step 5, not step 1 +func WithForkStartStep(startStep uint) ForkWorkflowOption { + return func(p *forkWorkflowParams) { + p.startStep = startStep + } +} + +// WithForkWorkflowID sets a custom workflow ID for the forked workflow. +// If not specified, a new UUID will be generated automatically. +// The workflow ID must be unique across all workflows. +// +// Example: +// +// dbos.ForkWorkflow[Result](ctx, originalID, 1, +// dbos.WithForkWorkflowID("my-custom-fork-id")) +func WithForkWorkflowID(workflowID string) ForkWorkflowOption { + return func(p *forkWorkflowParams) { + p.forkedWorkflowID = workflowID + } +} + +func (c *dbosContext) ForkWorkflow(_ DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[any], error) { + // Parse options + params := &forkWorkflowParams{} + for _, opt := range opts { + opt(params) + } + + if originalWorkflowID == "" { + return nil, errors.New("original workflow ID cannot be empty") + } + + // Generate new workflow ID + if params.forkedWorkflowID == "" { + params.forkedWorkflowID = uuid.New().String() + } + + // Create input for system database + input := forkWorkflowDBInput{ + originalWorkflowID: originalWorkflowID, + forkedWorkflowID: params.forkedWorkflowID, + startStep: int(params.startStep), + applicationVersion: params.applicationVersion, + } + + // Call system database method + err := c.systemDB.forkWorkflow(c, input) + if err != nil { + return nil, err + } + + return &workflowPollingHandle[any]{ + workflowID: params.forkedWorkflowID, + dbosContext: c, + }, nil +} + +// ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step. +// The forked workflow will have a new UUID and will execute from the specified startStep. +// If startStep > 1, the forked workflow will have the operation outputs from steps 1 to startStep-1 +// copied from the original workflow. +// +// Parameters: +// - ctx: DBOS context for the operation +// - originalWorkflowID: The UUID of the original workflow to fork from +// - opts: Optional configuration parameters for the forked workflow using functional options +// +// Available functional options: +// - WithForkWorkflowID: Set a custom workflow ID for the forked workflow +// - WithForkStartStep: Override the start step (alternative to the startStep parameter) +// - WithForkApplicationVersion: Set a specific application version for the forked workflow +// +// Returns a typed workflow handle for the newly created forked workflow. +// +// Example usage: +// +// // Basic fork from step 5 +// handle, err := dbos.ForkWorkflow[MyResultType](ctx, "original-workflow-id", 5) +// if err != nil { +// log.Fatal(err) +// } +// +// // Fork with custom workflow ID and application version +// handle, err := dbos.ForkWorkflow[MyResultType](ctx, "original-workflow-id", 3, +// dbos.WithForkWorkflowID("my-custom-fork-id"), +// dbos.WithForkApplicationVersion("v2.0.0")) +// if err != nil { +// log.Fatal(err) +// } +func ForkWorkflow[R any](ctx DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[R], error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + + handle, err := ctx.ForkWorkflow(ctx, originalWorkflowID, opts...) + if err != nil { + return nil, err + } + return &workflowPollingHandle[R]{ + workflowID: handle.GetWorkflowID(), + dbosContext: handle.(*workflowPollingHandle[any]).dbosContext, + }, nil +} From 6eafe2b3fa30448993690cd0e583961243957835 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 14:55:47 -0700 Subject: [PATCH 05/20] must register R, because the client might be on another machine. Also we'll make the encoder part of the context eventually. --- dbos/workflow.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbos/workflow.go b/dbos/workflow.go index fa73bbf5..7a203c15 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1238,6 +1238,11 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (workflowPollin if ctx == nil { return workflowPollingHandle[R]{}, errors.New("dbosCtx cannot be nil") } + + // Register the output for gob encoding + var r R + gob.Register(r) + workflowStatus, err := ctx.(*dbosContext).systemDB.listWorkflows(ctx, listWorkflowsDBInput{ workflowIDs: []string{workflowID}, }) @@ -1395,6 +1400,11 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R if ctx == nil { return nil, errors.New("ctx cannot be nil") } + + // Register the output for gob encoding + var r R + gob.Register(r) + _, err := ctx.ResumeWorkflow(ctx, workflowID) if err != nil { return nil, err @@ -1526,6 +1536,10 @@ func ForkWorkflow[R any](ctx DBOSContext, originalWorkflowID string, opts ...For return nil, errors.New("ctx cannot be nil") } + // Register the output for gob encoding + var r R + gob.Register(r) + handle, err := ctx.ForkWorkflow(ctx, originalWorkflowID, opts...) if err != nil { return nil, err From 19efa484c810e21df5dea4b68b73a3ce7daf3741 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 15:48:58 -0700 Subject: [PATCH 06/20] list workflows --- dbos/client_test.go | 303 ++++++++++++++++++++++++++++++++++++++++ dbos/system_database.go | 77 +++++++--- dbos/workflow.go | 272 ++++++++++++++++++++++++++++++++++++ 3 files changed, 630 insertions(+), 22 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 35566f2b..cc0a7ea0 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -632,3 +632,306 @@ func TestForkWorkflow(t *testing.T) { t.Fatal("expected queue entries to be cleaned up after fork workflow tests") } } + +func TestListWorkflows(t *testing.T) { + // Setup server context + serverCtx := setupDBOS(t, true, true) + + // Create queue for communication + queue := NewWorkflowQueue(serverCtx, "list-workflows-queue") + + // Simple test workflow + type testInput struct { + Value int + ID string + } + + simpleWorkflow := func(ctx DBOSContext, input testInput) (string, error) { + if input.Value < 0 { + return "", fmt.Errorf("negative value: %d", input.Value) + } + return fmt.Sprintf("result-%d-%s", input.Value, input.ID), nil + } + RegisterWorkflow(serverCtx, simpleWorkflow, WithWorkflowName("SimpleWorkflow")) + + // Launch server + err := serverCtx.Launch() + if err != nil { + t.Fatalf("failed to launch server DBOS instance: %v", err) + } + + // Setup client context + clientCtx := setupDBOS(t, false, false) + + t.Run("ListWorkflowsFiltering", func(t *testing.T) { + var workflowIDs []string + var handles []WorkflowHandle[string] + + // Record start time for filtering tests + testStartTime := time.Now() + + // Start 10 workflows at 100ms intervals with different patterns + for i := range 10 { + var workflowID string + var handle WorkflowHandle[string] + + if i < 5 { + // First 5 workflows: use prefix "test-batch-" and succeed + workflowID = fmt.Sprintf("test-batch-%d", i) + handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{ + WorkflowName: "SimpleWorkflow", + QueueName: queue.Name, + WorkflowID: workflowID, + WorkflowInput: testInput{Value: i, ID: fmt.Sprintf("success-%d", i)}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + } else { + // Last 5 workflows: use prefix "test-other-" and some will fail + workflowID = fmt.Sprintf("test-other-%d", i) + value := i + if i >= 8 { + value = -i // These will fail + } + handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{ + WorkflowName: "SimpleWorkflow", + QueueName: queue.Name, + WorkflowID: workflowID, + WorkflowInput: testInput{Value: value, ID: fmt.Sprintf("test-%d", i)}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + } + + if err != nil { + t.Fatalf("failed to enqueue workflow %d: %v", i, err) + } + + workflowIDs = append(workflowIDs, workflowID) + handles = append(handles, handle) + + // Wait 100ms between workflow starts + time.Sleep(100 * time.Millisecond) + } + + // Wait for all workflows to complete + for i, handle := range handles { + _, err := handle.GetResult() + if i < 8 { + // First 8 should succeed + if err != nil { + t.Fatalf("workflow %d should have succeeded but got error: %v", i, err) + } + } else { + // Last 2 should fail + if err == nil { + t.Fatalf("workflow %d should have failed but succeeded", i) + } + } + } + + // Test 1: List all workflows (no filters) + allWorkflows, err := ListWorkflows(clientCtx) + if err != nil { + t.Fatalf("failed to list all workflows: %v", err) + } + if len(allWorkflows) < 10 { + t.Fatalf("expected at least 10 workflows, got %d", len(allWorkflows)) + } + + // Test 2: Filter by workflow IDs + expectedIDs := workflowIDs[:3] + specificWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(expectedIDs)) + if err != nil { + t.Fatalf("failed to list workflows by IDs: %v", err) + } + if len(specificWorkflows) != 3 { + t.Fatalf("expected 3 workflows, got %d", len(specificWorkflows)) + } + // Verify returned workflow IDs match expected + returnedIDs := make(map[string]bool) + for _, wf := range specificWorkflows { + returnedIDs[wf.ID] = true + } + for _, expectedID := range expectedIDs { + if !returnedIDs[expectedID] { + t.Fatalf("expected workflow ID %s not found in results", expectedID) + } + } + + // Test 3: Filter by workflow ID prefix + batchWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-batch-")) + if err != nil { + t.Fatalf("failed to list workflows by prefix: %v", err) + } + if len(batchWorkflows) != 5 { + t.Fatalf("expected 5 batch workflows, got %d", len(batchWorkflows)) + } + // Verify all returned workflow IDs have the correct prefix + for _, wf := range batchWorkflows { + if !strings.HasPrefix(wf.ID, "test-batch-") { + t.Fatalf("workflow ID %s does not have expected prefix 'test-batch-'", wf.ID) + } + } + + // Test 4: Filter by status - SUCCESS + successWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), // Only our test workflows + WithStatus([]WorkflowStatusType{WorkflowStatusSuccess})) + if err != nil { + t.Fatalf("failed to list successful workflows: %v", err) + } + if len(successWorkflows) != 8 { + t.Fatalf("expected 8 successful workflows, got %d", len(successWorkflows)) + } + // Verify all returned workflows have SUCCESS status + for _, wf := range successWorkflows { + if wf.Status != WorkflowStatusSuccess { + t.Fatalf("workflow %s has status %s, expected SUCCESS", wf.ID, wf.Status) + } + } + + // Test 5: Filter by status - ERROR + errorWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithStatus([]WorkflowStatusType{WorkflowStatusError})) + if err != nil { + t.Fatalf("failed to list error workflows: %v", err) + } + if len(errorWorkflows) != 2 { + t.Fatalf("expected 2 error workflows, got %d", len(errorWorkflows)) + } + // Verify all returned workflows have ERROR status + for _, wf := range errorWorkflows { + if wf.Status != WorkflowStatusError { + t.Fatalf("workflow %s has status %s, expected ERROR", wf.ID, wf.Status) + } + } + + // Test 6: Filter by time range - first 5 workflows (start to start+500ms) + firstHalfTime := testStartTime.Add(500 * time.Millisecond) + firstHalfWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithEndTime(firstHalfTime)) + if err != nil { + t.Fatalf("failed to list first half workflows by time range: %v", err) + } + if len(firstHalfWorkflows) != 5 { + t.Fatalf("expected 5 workflows in first half time range, got %d", len(firstHalfWorkflows)) + } + + // Test 6b: Filter by time range - last 5 workflows (start+500ms to end) + secondHalfWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithStartTime(firstHalfTime)) + if err != nil { + t.Fatalf("failed to list second half workflows by time range: %v", err) + } + if len(secondHalfWorkflows) != 5 { + t.Fatalf("expected 5 workflows in second half time range, got %d", len(secondHalfWorkflows)) + } + + // Test 7: Test sorting order (ascending - default) + ascWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithSortDesc(false)) + if err != nil { + t.Fatalf("failed to list workflows ascending: %v", err) + } + + // Test 8: Test sorting order (descending) + descWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithSortDesc(true)) + if err != nil { + t.Fatalf("failed to list workflows descending: %v", err) + } + + // Verify sorting - workflows should be ordered by creation time + // First workflow in desc should be last in asc (latest created) + if ascWorkflows[len(ascWorkflows)-1].ID != descWorkflows[0].ID { + t.Fatalf("sorting verification failed: asc last (%s) != desc first (%s)", + ascWorkflows[len(ascWorkflows)-1].ID, descWorkflows[0].ID) + } + // Last workflow in desc should be first in asc (earliest created) + if ascWorkflows[0].ID != descWorkflows[len(descWorkflows)-1].ID { + t.Fatalf("sorting verification failed: asc first (%s) != desc last (%s)", + ascWorkflows[0].ID, descWorkflows[len(descWorkflows)-1].ID) + } + + // Verify ascending order: each workflow should be created at or after the previous + for i := 1; i < len(ascWorkflows); i++ { + if ascWorkflows[i].CreatedAt.Before(ascWorkflows[i-1].CreatedAt) { + t.Fatalf("ascending order violation: workflow at index %d created before previous", i) + } + } + + // Verify descending order: each workflow should be created at or before the previous + for i := 1; i < len(descWorkflows); i++ { + if descWorkflows[i].CreatedAt.After(descWorkflows[i-1].CreatedAt) { + t.Fatalf("descending order violation: workflow at index %d created after previous", i) + } + } + + // Test 9: Test limit and offset + limitedWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithLimit(5)) + if err != nil { + t.Fatalf("failed to list workflows with limit: %v", err) + } + if len(limitedWorkflows) != 5 { + t.Fatalf("expected 5 workflows with limit, got %d", len(limitedWorkflows)) + } + // Verify we got the first 5 workflows (earliest created) + expectedFirstFive := ascWorkflows[:5] + for i, wf := range limitedWorkflows { + if wf.ID != expectedFirstFive[i].ID { + t.Fatalf("limited workflow at index %d: expected %s, got %s", i, expectedFirstFive[i].ID, wf.ID) + } + } + + offsetWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDPrefix("test-"), + WithOffset(5), + WithLimit(3)) + if err != nil { + t.Fatalf("failed to list workflows with offset: %v", err) + } + if len(offsetWorkflows) != 3 { + t.Fatalf("expected 3 workflows with offset, got %d", len(offsetWorkflows)) + } + // Verify we got workflows 5, 6, 7 from the ascending list + expectedOffsetThree := ascWorkflows[5:8] + for i, wf := range offsetWorkflows { + if wf.ID != expectedOffsetThree[i].ID { + t.Fatalf("offset workflow at index %d: expected %s, got %s", i, expectedOffsetThree[i].ID, wf.ID) + } + } + + // Test 10: Test input/output loading + noDataWorkflows, err := ListWorkflows(clientCtx, + WithWorkflowIDs(workflowIDs[:2]), + WithLoadInput(false), + WithLoadOutput(false)) + if err != nil { + t.Fatalf("failed to list workflows without data: %v", err) + } + if len(noDataWorkflows) != 2 { + t.Fatalf("expected 2 workflows without data, got %d", len(noDataWorkflows)) + } + + // Verify input/output are not loaded + for _, wf := range noDataWorkflows { + if wf.Input != nil { + t.Fatalf("expected input to be nil when LoadInput=false, got %v", wf.Input) + } + if wf.Output != nil { + t.Fatalf("expected output to be nil when LoadOutput=false, got %v", wf.Output) + } + } + }) + + // Verify all queue entries are cleaned up + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after list workflows tests") + } +} diff --git a/dbos/system_database.go b/dbos/system_database.go index 9ab2da07..90a99d91 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -441,6 +441,8 @@ type listWorkflowsDBInput struct { limit *int offset *int sortDesc bool + loadInput bool + loadOutput bool tx pgx.Tx } @@ -448,12 +450,22 @@ type listWorkflowsDBInput struct { func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) { qb := newQueryBuilder() - // Build the base query - baseQuery := `SELECT workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, - output, error, executor_id, created_at, updated_at, application_version, application_id, - recovery_attempts, queue_name, workflow_timeout_ms, workflow_deadline_epoch_ms, started_at_epoch_ms, - deduplication_id, inputs, priority - FROM dbos.workflow_status` + // Build the base query with conditional column selection + loadColumns := []string{ + "workflow_uuid", "status", "name", "authenticated_user", "assumed_role", "authenticated_roles", + "executor_id", "created_at", "updated_at", "application_version", "application_id", + "recovery_attempts", "queue_name", "workflow_timeout_ms", "workflow_deadline_epoch_ms", "started_at_epoch_ms", + "deduplication_id", "priority", + } + + if input.loadOutput { + loadColumns = append(loadColumns, "output", "error") + } + if input.loadInput { + loadColumns = append(loadColumns, "inputs") + } + + baseQuery := fmt.Sprintf("SELECT %s FROM dbos.workflow_status", strings.Join(loadColumns, ", ")) // Add filters using query builder if input.workflowName != "" { @@ -543,14 +555,23 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( var applicationVersion *string var executorID *string - err := rows.Scan( + // Build scan arguments dynamically based on loaded columns + scanArgs := []interface{}{ &wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole, - &wf.AuthenticatedRoles, &outputString, &errorStr, &executorID, &createdAtMs, + &wf.AuthenticatedRoles, &executorID, &createdAtMs, &updatedAtMs, &applicationVersion, &wf.ApplicationID, &wf.Attempts, &queueName, &timeoutMs, - &deadlineMs, &startedAtMs, &deduplicationID, - &inputString, &wf.Priority, - ) + &deadlineMs, &startedAtMs, &deduplicationID, &wf.Priority, + } + + if input.loadOutput { + scanArgs = append(scanArgs, &outputString, &errorStr) + } + if input.loadInput { + scanArgs = append(scanArgs, &inputString) + } + + err := rows.Scan(scanArgs...) if err != nil { return nil, fmt.Errorf("failed to scan workflow row: %w", err) } @@ -592,20 +613,26 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.StartedAt = time.Unix(0, *startedAtMs*int64(time.Millisecond)) } - // Convert error string to error type if present - if errorStr != nil && *errorStr != "" { - wf.Error = errors.New(*errorStr) - } + // Handle output and error only if loadOutput is true + if input.loadOutput { + // Convert error string to error type if present + if errorStr != nil && *errorStr != "" { + wf.Error = errors.New(*errorStr) + } - // XXX maybe set wf.Output to outputString and run deserialize out of system DB - wf.Output, err = deserialize(outputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize output: %w", err) + // XXX maybe set wf.Output to outputString and run deserialize out of system DB + wf.Output, err = deserialize(outputString) + if err != nil { + return nil, fmt.Errorf("failed to deserialize output: %w", err) + } } - wf.Input, err = deserialize(inputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize input: %w", err) + // Handle input only if loadInput is true + if input.loadInput { + wf.Input, err = deserialize(inputString) + if err != nil { + return nil, fmt.Errorf("failed to deserialize input: %w", err) + } } workflows = append(workflows, wf) @@ -664,6 +691,8 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error { // Check if workflow exists listInput := listWorkflowsDBInput{ workflowIDs: []string{workflowID}, + loadInput: true, + loadOutput: true, tx: tx, } wfs, err := s.listWorkflows(ctx, listInput) @@ -717,6 +746,8 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error { // Check the status of the workflow. If it is complete, do nothing. listInput := listWorkflowsDBInput{ workflowIDs: []string{workflowID}, + loadInput: true, + loadOutput: true, tx: tx, } wfs, err := s.listWorkflows(ctx, listInput) @@ -779,6 +810,8 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err // Get the original workflow status listInput := listWorkflowsDBInput{ workflowIDs: []string{input.originalWorkflowID}, + loadInput: true, + loadOutput: true, tx: tx, } wfs, err := s.listWorkflows(ctx, listInput) diff --git a/dbos/workflow.go b/dbos/workflow.go index 7a203c15..61335e90 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -131,6 +131,8 @@ func (h *workflowHandle[R]) GetResult() (R, error) { func (h *workflowHandle[R]) GetStatus() (WorkflowStatus, error) { workflowStatuses, err := h.dbosContext.(*dbosContext).systemDB.listWorkflows(h.dbosContext, listWorkflowsDBInput{ workflowIDs: []string{h.workflowID}, + loadInput: true, + loadOutput: true, }) if err != nil { return WorkflowStatus{}, fmt.Errorf("failed to get workflow status: %w", err) @@ -188,6 +190,8 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) { func (h *workflowPollingHandle[R]) GetStatus() (WorkflowStatus, error) { workflowStatuses, err := h.dbosContext.(*dbosContext).systemDB.listWorkflows(h.dbosContext, listWorkflowsDBInput{ workflowIDs: []string{h.workflowID}, + loadInput: true, + loadOutput: true, }) if err != nil { return WorkflowStatus{}, fmt.Errorf("failed to get workflow status: %w", err) @@ -1207,6 +1211,8 @@ func (c *dbosContext) GetStepID() (int, error) { func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) { workflowStatus, err := c.systemDB.listWorkflows(c, listWorkflowsDBInput{ workflowIDs: []string{workflowID}, + loadInput: true, + loadOutput: true, }) if err != nil { return nil, fmt.Errorf("failed to retrieve workflow status: %w", err) @@ -1549,3 +1555,269 @@ func ForkWorkflow[R any](ctx DBOSContext, originalWorkflowID string, opts ...For dbosContext: handle.(*workflowPollingHandle[any]).dbosContext, }, nil } + +// listWorkflowsParams holds configuration parameters for listing workflows +type listWorkflowsParams struct { + workflowIDs []string + status []WorkflowStatusType + startTime time.Time + endTime time.Time + name string + appVersion string + user string + limit *int + offset *int + sortDesc bool + workflowIDPrefix string + loadInput bool + loadOutput bool +} + +// ListWorkflowsOption is a functional option for configuring workflow listing parameters. +type ListWorkflowsOption func(*listWorkflowsParams) + +// WithWorkflowIDs filters workflows by the specified workflow IDs. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"})) +func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.workflowIDs = workflowIDs + } +} + +// WithStatus filters workflows by the specified status(es). +// Can accept a single status or a list of statuses. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess, dbos.WorkflowStatusError})) +func WithStatus(status []WorkflowStatusType) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.status = status + } +} + +// WithStartTime filters workflows created after the specified time. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithStartTime(time.Now().Add(-24*time.Hour))) +func WithStartTime(startTime time.Time) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.startTime = startTime + } +} + +// WithEndTime filters workflows created before the specified time. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithEndTime(time.Now())) +func WithEndTime(endTime time.Time) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.endTime = endTime + } +} + +// WithName filters workflows by the specified workflow function name. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithName("MyWorkflowFunction")) +func WithName(name string) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.name = name + } +} + +// WithAppVersion filters workflows by the specified application version. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithAppVersion("v1.0.0")) +func WithAppVersion(appVersion string) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.appVersion = appVersion + } +} + +// WithUser filters workflows by the specified authenticated user. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithUser("john.doe")) +func WithUser(user string) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.user = user + } +} + +// WithLimit limits the number of workflows returned. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithLimit(100)) +func WithLimit(limit int) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.limit = &limit + } +} + +// WithOffset sets the offset for pagination. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithOffset(50), dbos.WithLimit(25)) +func WithOffset(offset int) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.offset = &offset + } +} + +// WithSortDesc enables descending sort by creation time (default is ascending). +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithSortDesc(true)) +func WithSortDesc(sortDesc bool) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.sortDesc = sortDesc + } +} + +// WithWorkflowIDPrefix filters workflows by workflow ID prefix. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithWorkflowIDPrefix("batch-")) +func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.workflowIDPrefix = prefix + } +} + +// WithLoadInput controls whether to load workflow input data (default: true). +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithLoadInput(false)) +func WithLoadInput(loadInput bool) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.loadInput = loadInput + } +} + +// WithLoadOutput controls whether to load workflow output data (default: true). +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithLoadOutput(false)) +func WithLoadOutput(loadOutput bool) ListWorkflowsOption { + return func(p *listWorkflowsParams) { + p.loadOutput = loadOutput + } +} + +// ListWorkflows retrieves a list of workflows based on the provided filters. +// This function provides a high-level interface to query workflows with various filtering options. +// It wraps the system database's listWorkflows functionality with type-safe functional options. +// +// The function supports filtering by workflow IDs, status, time ranges, names, application versions, +// authenticated users, and more. It also supports pagination through limit/offset parameters and +// sorting control. +// +// By default, both input and output data are loaded for each workflow. This can be controlled +// using WithLoadInput(false) and WithLoadOutput(false) options for better performance when +// the data is not needed. +// +// Parameters: +// - ctx: DBOS context for the operation +// - opts: Functional options to configure the query filters and parameters +// +// Returns a slice of WorkflowStatus structs containing the workflow information. +// +// Example usage: +// +// // List all successful workflows from the last 24 hours +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}), +// dbos.WithStartTime(time.Now().Add(-24*time.Hour)), +// dbos.WithLimit(100)) +// if err != nil { +// log.Fatal(err) +// } +// +// // List workflows by specific IDs without loading input/output data +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}), +// dbos.WithLoadInput(false), +// dbos.WithLoadOutput(false)) +// if err != nil { +// log.Fatal(err) +// } +// +// // List workflows with pagination +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithUser("john.doe"), +// dbos.WithOffset(50), +// dbos.WithLimit(25), +// dbos.WithSortDesc(true)) +// if err != nil { +// log.Fatal(err) +// } +func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + + // Initialize parameters with defaults + params := &listWorkflowsParams{ + loadInput: true, // Default to loading input + loadOutput: true, // Default to loading output + } + + // Apply all provided options + for _, opt := range opts { + opt(params) + } + + // Convert to system database input structure + dbInput := listWorkflowsDBInput{ + workflowIDs: params.workflowIDs, + status: params.status, + startTime: params.startTime, + endTime: params.endTime, + workflowName: params.name, + applicationVersion: params.appVersion, + authenticatedUser: params.user, + limit: params.limit, + offset: params.offset, + sortDesc: params.sortDesc, + workflowIDPrefix: params.workflowIDPrefix, + loadInput: params.loadInput, + loadOutput: params.loadOutput, + } + + // Call the system database to list workflows + workflows, err := ctx.(*dbosContext).systemDB.listWorkflows(ctx, dbInput) + if err != nil { + return nil, fmt.Errorf("failed to list workflows: %w", err) + } + + return workflows, nil +} From 293deac23b8b00d410101279b6d28cab4f5dc002 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:13:02 -0700 Subject: [PATCH 07/20] nit --- dbos/queue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dbos/queue.go b/dbos/queue.go index 320c1562..e56f123b 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -209,6 +209,7 @@ func (qr *queueRunner) run(ctx *dbosContext) { registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)] if !exists { ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name) + continue } // Deserialize input From 82fc3a6ecca346e6a346e7ccc074af4dbec54de7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:15:59 -0700 Subject: [PATCH 08/20] simplify --- dbos/client_test.go | 16 ++++-- dbos/dbos.go | 2 +- dbos/workflow.go | 121 +++++++++++++++----------------------------- 3 files changed, 54 insertions(+), 85 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index cc0a7ea0..4644e846 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -504,7 +504,11 @@ func TestForkWorkflow(t *testing.T) { t.Logf("Forking at step %d", step) customForkedWorkflowID := fmt.Sprintf("forked-workflow-step-%d", step) - forkedHandle, err := ForkWorkflow[string](clientCtx, originalWorkflowID, WithForkWorkflowID(customForkedWorkflowID), WithForkStartStep(uint(step-1))) + forkedHandle, err := ForkWorkflow[string](clientCtx, ForkWorkflowInput{ + OriginalWorkflowID: originalWorkflowID, + ForkedWorkflowID: customForkedWorkflowID, + StartStep: uint(step - 1), + }) if err != nil { t.Fatalf("failed to fork workflow at step %d: %v", step, err) } @@ -575,7 +579,10 @@ func TestForkWorkflow(t *testing.T) { nonExistentWorkflowID := "non-existent-workflow-for-fork" // Try to fork a non-existent workflow - _, err := clientCtx.ForkWorkflow(clientCtx, nonExistentWorkflowID, WithForkStartStep(1)) + _, err := clientCtx.ForkWorkflow(clientCtx, ForkWorkflowInput{ + OriginalWorkflowID: nonExistentWorkflowID, + StartStep: 1, + }) if err == nil { t.Fatal("expected error when forking non-existent workflow, but got none") } @@ -617,7 +624,10 @@ func TestForkWorkflow(t *testing.T) { } // Try to fork at step 999 (beyond workflow's actual steps) - _, err = clientCtx.ForkWorkflow(clientCtx, originalWorkflowID, WithForkStartStep(999)) + _, err = clientCtx.ForkWorkflow(clientCtx, ForkWorkflowInput{ + OriginalWorkflowID: originalWorkflowID, + StartStep: 999, + }) if err == nil { t.Fatal("expected error when forking at step 999, but got none") } diff --git a/dbos/dbos.go b/dbos/dbos.go index 1d31ffb1..08a5ed90 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -88,7 +88,7 @@ type DBOSContext interface { Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow - ForkWorkflow(_ DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[any], error) // Fork a workflow from a specific step + ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/workflow.go b/dbos/workflow.go index 61335e90..a3685fef 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1418,126 +1418,85 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R return &workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil } -// forkWorkflowParams holds configuration parameters for forking workflows -type forkWorkflowParams struct { - applicationVersion string - forkedWorkflowID string - startStep uint -} - -// ForkWorkflowOption is a functional option for configuring fork workflow execution parameters. -type ForkWorkflowOption func(*forkWorkflowParams) - -// WithForkApplicationVersion overrides the application version for the forked workflow. -// If not specified, the original workflow's application version is used. -// -// Example: -// -// dbos.ForkWorkflow[Result](ctx, originalID, 1, -// dbos.WithForkApplicationVersion("v2.0.0")) -func WithForkApplicationVersion(version string) ForkWorkflowOption { - return func(p *forkWorkflowParams) { - p.applicationVersion = version - } +// ForkWorkflowInput holds configuration parameters for forking workflows. +// It replaces the functional options pattern to comply with CS-5 style guidelines. +type ForkWorkflowInput struct { + OriginalWorkflowID string // Required: The UUID of the original workflow to fork from + ForkedWorkflowID string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty) + StartStep uint // Optional: Step to start the forked workflow from (default: 0) + ApplicationVersion string // Optional: Application version for the forked workflow (inherits from original if empty) } -// WithForkStartStep overrides the start step for the forked workflow. -// This is an alternative to specifying the startStep as a parameter. -// If both are specified, this option takes precedence. -// -// Example: -// -// dbos.ForkWorkflow[Result](ctx, originalID, 1, -// dbos.WithForkStartStep(5)) // Will start from step 5, not step 1 -func WithForkStartStep(startStep uint) ForkWorkflowOption { - return func(p *forkWorkflowParams) { - p.startStep = startStep - } -} - -// WithForkWorkflowID sets a custom workflow ID for the forked workflow. -// If not specified, a new UUID will be generated automatically. -// The workflow ID must be unique across all workflows. -// -// Example: -// -// dbos.ForkWorkflow[Result](ctx, originalID, 1, -// dbos.WithForkWorkflowID("my-custom-fork-id")) -func WithForkWorkflowID(workflowID string) ForkWorkflowOption { - return func(p *forkWorkflowParams) { - p.forkedWorkflowID = workflowID - } -} - -func (c *dbosContext) ForkWorkflow(_ DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[any], error) { - // Parse options - params := &forkWorkflowParams{} - for _, opt := range opts { - opt(params) - } - - if originalWorkflowID == "" { +func (c *dbosContext) ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) { + if input.OriginalWorkflowID == "" { return nil, errors.New("original workflow ID cannot be empty") } - // Generate new workflow ID - if params.forkedWorkflowID == "" { - params.forkedWorkflowID = uuid.New().String() + // Generate new workflow ID if not provided + forkedWorkflowID := input.ForkedWorkflowID + if forkedWorkflowID == "" { + forkedWorkflowID = uuid.New().String() } // Create input for system database - input := forkWorkflowDBInput{ - originalWorkflowID: originalWorkflowID, - forkedWorkflowID: params.forkedWorkflowID, - startStep: int(params.startStep), - applicationVersion: params.applicationVersion, + dbInput := forkWorkflowDBInput{ + originalWorkflowID: input.OriginalWorkflowID, + forkedWorkflowID: forkedWorkflowID, + startStep: int(input.StartStep), + applicationVersion: input.ApplicationVersion, } // Call system database method - err := c.systemDB.forkWorkflow(c, input) + err := c.systemDB.forkWorkflow(c, dbInput) if err != nil { return nil, err } return &workflowPollingHandle[any]{ - workflowID: params.forkedWorkflowID, + workflowID: forkedWorkflowID, dbosContext: c, }, nil } // ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step. -// The forked workflow will have a new UUID and will execute from the specified startStep. -// If startStep > 1, the forked workflow will have the operation outputs from steps 1 to startStep-1 +// The forked workflow will have a new UUID and will execute from the specified StartStep. +// If StartStep > 0, the forked workflow will have the operation outputs from steps 0 to StartStep-1 // copied from the original workflow. // // Parameters: // - ctx: DBOS context for the operation -// - originalWorkflowID: The UUID of the original workflow to fork from -// - opts: Optional configuration parameters for the forked workflow using functional options +// - input: Configuration parameters for the forked workflow // -// Available functional options: -// - WithForkWorkflowID: Set a custom workflow ID for the forked workflow -// - WithForkStartStep: Override the start step (alternative to the startStep parameter) -// - WithForkApplicationVersion: Set a specific application version for the forked workflow +// The input struct contains: +// - OriginalWorkflowID: The UUID of the original workflow to fork from (required) +// - ForkedWorkflowID: Custom workflow ID for the forked workflow (optional, auto-generated if empty) +// - StartStep: Step to start the forked workflow from (optional, default: 0) +// - ApplicationVersion: Application version for the forked workflow (optional, inherits from original if empty) // // Returns a typed workflow handle for the newly created forked workflow. // // Example usage: // // // Basic fork from step 5 -// handle, err := dbos.ForkWorkflow[MyResultType](ctx, "original-workflow-id", 5) +// handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{ +// OriginalWorkflowID: "original-workflow-id", +// StartStep: 5, +// }) // if err != nil { // log.Fatal(err) // } // // // Fork with custom workflow ID and application version -// handle, err := dbos.ForkWorkflow[MyResultType](ctx, "original-workflow-id", 3, -// dbos.WithForkWorkflowID("my-custom-fork-id"), -// dbos.WithForkApplicationVersion("v2.0.0")) +// handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{ +// OriginalWorkflowID: "original-workflow-id", +// ForkedWorkflowID: "my-custom-fork-id", +// StartStep: 3, +// ApplicationVersion: "v2.0.0", +// }) // if err != nil { // log.Fatal(err) // } -func ForkWorkflow[R any](ctx DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[R], error) { +func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error) { if ctx == nil { return nil, errors.New("ctx cannot be nil") } @@ -1546,7 +1505,7 @@ func ForkWorkflow[R any](ctx DBOSContext, originalWorkflowID string, opts ...For var r R gob.Register(r) - handle, err := ctx.ForkWorkflow(ctx, originalWorkflowID, opts...) + handle, err := ctx.ForkWorkflow(ctx, input) if err != nil { return nil, err } From cf6cd658d392c2997644c5f3b32a29f35676702f Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:22:08 -0700 Subject: [PATCH 09/20] nit --- dbos/system_database.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 90a99d91..a907e020 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -580,12 +580,10 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.QueueName = *queueName } - // Handle NULL executorID if executorID != nil && len(*executorID) > 0 { wf.ExecutorID = *executorID } - // We work with strings -- the DB could return NULL values if applicationVersion != nil && len(*applicationVersion) > 0 { wf.ApplicationVersion = *applicationVersion } From 4968d21abe56e076e5dc70cfff9ed114de51b28a Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:40:36 -0700 Subject: [PATCH 10/20] ListWorkflows must be a DBOSContext method --- dbos/client_test.go | 24 ++++++++++++------------ dbos/dbos.go | 9 +++++---- dbos/workflow.go | 20 ++++++++------------ 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 4644e846..35438e30 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -739,7 +739,7 @@ func TestListWorkflows(t *testing.T) { } // Test 1: List all workflows (no filters) - allWorkflows, err := ListWorkflows(clientCtx) + allWorkflows, err := clientCtx.ListWorkflows() if err != nil { t.Fatalf("failed to list all workflows: %v", err) } @@ -749,7 +749,7 @@ func TestListWorkflows(t *testing.T) { // Test 2: Filter by workflow IDs expectedIDs := workflowIDs[:3] - specificWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(expectedIDs)) + specificWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDs(expectedIDs)) if err != nil { t.Fatalf("failed to list workflows by IDs: %v", err) } @@ -768,7 +768,7 @@ func TestListWorkflows(t *testing.T) { } // Test 3: Filter by workflow ID prefix - batchWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-batch-")) + batchWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDPrefix("test-batch-")) if err != nil { t.Fatalf("failed to list workflows by prefix: %v", err) } @@ -783,7 +783,7 @@ func TestListWorkflows(t *testing.T) { } // Test 4: Filter by status - SUCCESS - successWorkflows, err := ListWorkflows(clientCtx, + successWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), // Only our test workflows WithStatus([]WorkflowStatusType{WorkflowStatusSuccess})) if err != nil { @@ -800,7 +800,7 @@ func TestListWorkflows(t *testing.T) { } // Test 5: Filter by status - ERROR - errorWorkflows, err := ListWorkflows(clientCtx, + errorWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithStatus([]WorkflowStatusType{WorkflowStatusError})) if err != nil { @@ -818,7 +818,7 @@ func TestListWorkflows(t *testing.T) { // Test 6: Filter by time range - first 5 workflows (start to start+500ms) firstHalfTime := testStartTime.Add(500 * time.Millisecond) - firstHalfWorkflows, err := ListWorkflows(clientCtx, + firstHalfWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithEndTime(firstHalfTime)) if err != nil { @@ -829,7 +829,7 @@ func TestListWorkflows(t *testing.T) { } // Test 6b: Filter by time range - last 5 workflows (start+500ms to end) - secondHalfWorkflows, err := ListWorkflows(clientCtx, + secondHalfWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithStartTime(firstHalfTime)) if err != nil { @@ -840,7 +840,7 @@ func TestListWorkflows(t *testing.T) { } // Test 7: Test sorting order (ascending - default) - ascWorkflows, err := ListWorkflows(clientCtx, + ascWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithSortDesc(false)) if err != nil { @@ -848,7 +848,7 @@ func TestListWorkflows(t *testing.T) { } // Test 8: Test sorting order (descending) - descWorkflows, err := ListWorkflows(clientCtx, + descWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithSortDesc(true)) if err != nil { @@ -882,7 +882,7 @@ func TestListWorkflows(t *testing.T) { } // Test 9: Test limit and offset - limitedWorkflows, err := ListWorkflows(clientCtx, + limitedWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithLimit(5)) if err != nil { @@ -899,7 +899,7 @@ func TestListWorkflows(t *testing.T) { } } - offsetWorkflows, err := ListWorkflows(clientCtx, + offsetWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDPrefix("test-"), WithOffset(5), WithLimit(3)) @@ -918,7 +918,7 @@ func TestListWorkflows(t *testing.T) { } // Test 10: Test input/output loading - noDataWorkflows, err := ListWorkflows(clientCtx, + noDataWorkflows, err := clientCtx.ListWorkflows( WithWorkflowIDs(workflowIDs[:2]), WithLoadInput(false), WithLoadOutput(false)) diff --git a/dbos/dbos.go b/dbos/dbos.go index 08a5ed90..3fa0647e 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -84,11 +84,12 @@ type DBOSContext interface { GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) // Workflow management - RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow - Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters - CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED - ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow + RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow + Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters + CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED + ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step + ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria // Accessors GetApplicationVersion() string // Get the application version for this context diff --git a/dbos/workflow.go b/dbos/workflow.go index a3685fef..0ed1c1c3 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1697,15 +1697,14 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // It wraps the system database's listWorkflows functionality with type-safe functional options. // // The function supports filtering by workflow IDs, status, time ranges, names, application versions, -// authenticated users, and more. It also supports pagination through limit/offset parameters and -// sorting control. +// authenticated users, workflow ID prefixes, and more. It also supports pagination through +// limit/offset parameters and sorting control (ascending by default, or descending with WithSortDesc). // // By default, both input and output data are loaded for each workflow. This can be controlled // using WithLoadInput(false) and WithLoadOutput(false) options for better performance when // the data is not needed. // // Parameters: -// - ctx: DBOS context for the operation // - opts: Functional options to configure the query filters and parameters // // Returns a slice of WorkflowStatus structs containing the workflow information. @@ -1713,7 +1712,7 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // Example usage: // // // List all successful workflows from the last 24 hours -// workflows, err := dbos.ListWorkflows(ctx, +// workflows, err := ctx.ListWorkflows( // dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}), // dbos.WithStartTime(time.Now().Add(-24*time.Hour)), // dbos.WithLimit(100)) @@ -1722,7 +1721,7 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // } // // // List workflows by specific IDs without loading input/output data -// workflows, err := dbos.ListWorkflows(ctx, +// workflows, err := ctx.ListWorkflows( // dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}), // dbos.WithLoadInput(false), // dbos.WithLoadOutput(false)) @@ -1731,7 +1730,7 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // } // // // List workflows with pagination -// workflows, err := dbos.ListWorkflows(ctx, +// workflows, err := ctx.ListWorkflows( // dbos.WithUser("john.doe"), // dbos.WithOffset(50), // dbos.WithLimit(25), @@ -1739,10 +1738,7 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { // if err != nil { // log.Fatal(err) // } -func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { - if ctx == nil { - return nil, errors.New("ctx cannot be nil") - } +func (c *dbosContext) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { // Initialize parameters with defaults params := &listWorkflowsParams{ @@ -1772,8 +1768,8 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat loadOutput: params.loadOutput, } - // Call the system database to list workflows - workflows, err := ctx.(*dbosContext).systemDB.listWorkflows(ctx, dbInput) + // Call the context method to list workflows + workflows, err := c.systemDB.listWorkflows(c, dbInput) if err != nil { return nil, fmt.Errorf("failed to list workflows: %w", err) } From 4aaf9934a08e7244555385601c46d93a967432b3 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:40:43 -0700 Subject: [PATCH 11/20] no need outputs for fork --- dbos/system_database.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index a907e020..09b09833 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -809,7 +809,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err listInput := listWorkflowsDBInput{ workflowIDs: []string{input.originalWorkflowID}, loadInput: true, - loadOutput: true, tx: tx, } wfs, err := s.listWorkflows(ctx, listInput) From 7ce58abd95860de5521c05574621c3384ffd2014 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 16:40:49 -0700 Subject: [PATCH 12/20] fix --- dbos/system_database.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 09b09833..2b9ea73e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -821,11 +821,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err originalWorkflow := wfs[0] - // Check if the workflow has completed successfully (required for forking) - if originalWorkflow.Status != WorkflowStatusSuccess { - return fmt.Errorf("cannot fork workflow %s: workflow must be in SUCCESS state, current state: %s", input.originalWorkflowID, originalWorkflow.Status) - } - // Validate that startStep doesn't exceed the workflow's actual steps maxStepQuery := `SELECT COALESCE(MAX(function_id), 0) FROM dbos.operation_outputs WHERE workflow_uuid = $1` var maxStepID int From 9a4cb1a4043e5400cee8ff23e90641a5d234b508 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 17:02:05 -0700 Subject: [PATCH 13/20] docs + nit --- dbos/workflow.go | 68 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 0ed1c1c3..2f4743b1 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1334,6 +1334,61 @@ type GenericEnqueueOptions[P any] struct { WorkflowInput P } +// Enqueue adds a workflow to a named queue for later execution with type safety. +// The workflow will be persisted with ENQUEUED status until picked up by a DBOS process. +// This provides asynchronous workflow execution with durability guarantees. +// +// Parameters: +// - ctx: DBOS context for the operation +// - params: Configuration parameters including workflow name, queue name, input, and options +// +// The params struct contains: +// - WorkflowName: Name of the registered workflow function to execute (required) +// - QueueName: Name of the queue to enqueue the workflow to (required) +// - WorkflowID: Custom workflow ID (optional, auto-generated if empty) +// - ApplicationVersion: Application version override (optional) +// - DeduplicationID: Deduplication identifier for idempotent enqueuing (optional) +// - WorkflowTimeout: Maximum execution time for the workflow (optional) +// - WorkflowInput: Input parameters to pass to the workflow (type P) +// +// Returns a typed workflow handle that can be used to check status and retrieve results. +// The handle uses polling to check workflow completion since the execution is asynchronous. +// +// Example usage: +// +// // Enqueue a workflow with string input and int output +// handle, err := dbos.Enqueue[string, int](ctx, dbos.GenericEnqueueOptions[string]{ +// WorkflowName: "ProcessDataWorkflow", +// QueueName: "data-processing", +// WorkflowInput: "input data", +// WorkflowTimeout: 30 * time.Minute, +// }) +// if err != nil { +// log.Fatal(err) +// } +// +// // Check status +// status, err := handle.GetStatus() +// if err != nil { +// log.Printf("Failed to get status: %v", err) +// } +// +// // Wait for completion and get result +// result, err := handle.GetResult() // blocks until completion +// if err != nil { +// log.Printf("Workflow failed: %v", err) +// } else { +// log.Printf("Result: %d", result) +// } +// +// // Enqueue with deduplication and custom workflow ID +// handle, err := dbos.Enqueue[MyInputType, MyOutputType](ctx, dbos.GenericEnqueueOptions[MyInputType]{ +// WorkflowName: "MyWorkflow", +// QueueName: "my-queue", +// WorkflowID: "custom-workflow-id", +// DeduplicationID: "unique-operation-id", +// WorkflowInput: MyInputType{Field: "value"}, +// }) func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (WorkflowHandle[R], error) { if ctx == nil { return nil, errors.New("ctx cannot be nil") @@ -1363,9 +1418,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo } // CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED. -// Once cancelled, the workflow will stop executing and cannot be resumed. -// If the workflow has already completed (SUCCESS or ERROR), this operation has no effect. -// The workflow's final status and any partial results remain accessible through its handle. +// Once cancelled, the workflow will stop executing. Currently executing steps will not be interrupted. // // Parameters: // - workflowID: The unique identifier of the workflow to cancel @@ -1384,7 +1437,7 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow } // ResumeWorkflow resumes a cancelled workflow by setting its status back to ENQUEUED. -// The workflow will be picked up by the queue processor and execution will continue +// The workflow will be picked up by a DBOS queue processor and execution will continue // from where it left off. If the workflow is already completed, this is a no-op. // Returns a handle that can be used to wait for completion and retrieve results. // Returns an error if the workflow does not exist or if the cancellation operation fails. @@ -1419,7 +1472,6 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R } // ForkWorkflowInput holds configuration parameters for forking workflows. -// It replaces the functional options pattern to comply with CS-5 style guidelines. type ForkWorkflowInput struct { OriginalWorkflowID string // Required: The UUID of the original workflow to fork from ForkedWorkflowID string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty) @@ -1460,7 +1512,7 @@ func (c *dbosContext) ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (Work // ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step. // The forked workflow will have a new UUID and will execute from the specified StartStep. -// If StartStep > 0, the forked workflow will have the operation outputs from steps 0 to StartStep-1 +// If StartStep > 0, the forked workflow will reuse the operation outputs from steps 0 to StartStep-1 // copied from the original workflow. // // Parameters: @@ -1511,7 +1563,7 @@ func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHand } return &workflowPollingHandle[R]{ workflowID: handle.GetWorkflowID(), - dbosContext: handle.(*workflowPollingHandle[any]).dbosContext, + dbosContext: ctx, }, nil } @@ -1693,8 +1745,6 @@ func WithLoadOutput(loadOutput bool) ListWorkflowsOption { } // ListWorkflows retrieves a list of workflows based on the provided filters. -// This function provides a high-level interface to query workflows with various filtering options. -// It wraps the system database's listWorkflows functionality with type-safe functional options. // // The function supports filtering by workflow IDs, status, time ranges, names, application versions, // authenticated users, workflow ID prefixes, and more. It also supports pagination through From 6a77281aaed74ab0a64506d27359e973b60297b4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 17:05:27 -0700 Subject: [PATCH 14/20] nit --- dbos/client_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 35438e30..5cbad643 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -362,7 +362,6 @@ func TestCancelResume(t *testing.T) { // Try to resume a non-existent workflow _, err := ResumeWorkflow[int](clientCtx, nonExistentWorkflowID) - fmt.Println(err) if err == nil { t.Fatal("expected error when resuming non-existent workflow, but got none") } From c1ff3c2809fe939c2eaf2cee27ffc1d1180753c5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 17:19:35 -0700 Subject: [PATCH 15/20] fix --- dbos/dbos.go | 2 +- dbos/recovery.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 3fa0647e..7e7dd9ad 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -54,7 +54,7 @@ func processConfig(inputConfig *Config) (*Config, error) { // Load defaults if dbosConfig.Logger == nil { - dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) } return dbosConfig, nil diff --git a/dbos/recovery.go b/dbos/recovery.go index f3db7c00..47c3f0f7 100644 --- a/dbos/recovery.go +++ b/dbos/recovery.go @@ -11,6 +11,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow status: []WorkflowStatusType{WorkflowStatusPending}, executorIDs: executorIDs, applicationVersion: ctx.applicationVersion, + loadInput: true, }) if err != nil { return nil, err From 7928e122a483a0626d96ecf4ad93c4b63d2ff4e1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 12 Aug 2025 21:51:01 -0700 Subject: [PATCH 16/20] fix tests --- dbos/serialization_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 4349a1ad..0b040965 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -103,9 +103,9 @@ func TestWorkflowEncoding(t *testing.T) { } // Test results from ListWorkflows - workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{ - workflowIDs: []string{directHandle.GetWorkflowID()}, - }) + workflows, err := executor.ListWorkflows(WithWorkflowIDs( + []string{directHandle.GetWorkflowID()}, + )) if err != nil { t.Fatalf("failed to list workflows: %v", err) } @@ -220,9 +220,9 @@ func TestWorkflowEncoding(t *testing.T) { } // Test results from ListWorkflows - workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{ - workflowIDs: []string{directHandle.GetWorkflowID()}, - }) + workflows, err := executor.ListWorkflows(WithWorkflowIDs( + []string{directHandle.GetWorkflowID()}, + )) if err != nil { t.Fatalf("failed to list workflows: %v", err) } From ca7494d1b729725755ea9d19dfad62713a146f82 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 09:58:24 -0700 Subject: [PATCH 17/20] recompute workflow deadline upon resumption --- dbos/client_test.go | 110 ++++++++++++++++++++++++++++++++++++++++ dbos/system_database.go | 16 ++++-- 2 files changed, 122 insertions(+), 4 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 5cbad643..85e5523d 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -219,6 +219,20 @@ func TestCancelResume(t *testing.T) { } RegisterWorkflow(serverCtx, cancelResumeWorkflow, WithWorkflowName("CancelResumeWorkflow")) + // Timeout blocking workflow that spins until context is done + timeoutBlockingWorkflow := func(ctx DBOSContext, _ string) (string, error) { + for { + select { + case <-ctx.Done(): + return "cancelled", ctx.Err() + default: + // Small sleep to avoid tight loop + time.Sleep(10 * time.Millisecond) + } + } + } + RegisterWorkflow(serverCtx, timeoutBlockingWorkflow, WithWorkflowName("TimeoutBlockingWorkflow")) + // Launch the server context to start processing tasks err := serverCtx.Launch() if err != nil { @@ -333,6 +347,102 @@ func TestCancelResume(t *testing.T) { } }) + t.Run("CancelAndResumeTimeout", func(t *testing.T) { + workflowID := "test-cancel-resume-timeout-workflow" + workflowTimeout := 2 * time.Second + + // Start the workflow with a 2-second timeout + handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "TimeoutBlockingWorkflow", + QueueName: queue.Name, + WorkflowID: workflowID, + WorkflowInput: "timeout-test", + WorkflowTimeout: workflowTimeout, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + if err != nil { + t.Fatalf("failed to enqueue timeout blocking workflow: %v", err) + } + + // Wait 500ms (well before the timeout expires) + time.Sleep(500 * time.Millisecond) + + // Cancel the workflow before timeout expires + err = clientCtx.CancelWorkflow(workflowID) + if err != nil { + t.Fatalf("failed to cancel workflow: %v", err) + } + + // Verify workflow is cancelled + cancelStatus, err := handle.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status after cancel: %v", err) + } + + if cancelStatus.Status != WorkflowStatusCancelled { + t.Fatalf("expected workflow status to be CANCELLED, got %v", cancelStatus.Status) + } + + // Record the original deadline before resume + originalDeadline := cancelStatus.Deadline + + // Resume the workflow + resumeStart := time.Now() + resumeHandle, err := ResumeWorkflow[string](clientCtx, workflowID) + if err != nil { + t.Fatalf("failed to resume workflow: %v", err) + } + + // Get status after resume to check the deadline + resumeStatus, err := resumeHandle.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status after resume: %v", err) + } + + // Verify the deadline was reset (should be different from original) + if resumeStatus.Deadline.Equal(originalDeadline) { + t.Fatalf("expected deadline to be reset after resume, but it remained the same: %v", originalDeadline) + } + + // The new deadline should be after resumeStart + workflowTimeout + expectedDeadline := resumeStart.Add(workflowTimeout) + if resumeStatus.Deadline.Before(expectedDeadline) { + t.Fatalf("deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline) + } + + // Wait for the workflow to complete + _, err = resumeHandle.GetResult() + if err == nil { + t.Fatal("expected timeout error, but got none") + } + + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != AwaitedWorkflowCancelled { + t.Fatalf("expected error code to be AwaitedWorkflowCancelled (8), got %v", dbosErr.Code) + } + + if !strings.Contains(dbosErr.Error(), "test-cancel-resume-timeout-workflow was cancelled") { + t.Fatalf("expected error message to contain 'test-cancel-resume-timeout-workflow was cancelled', got: %v", dbosErr.Error()) + } + + finalStatus, err := resumeHandle.GetStatus() + if err != nil { + t.Fatalf("failed to get final workflow status: %v", err) + } + + if finalStatus.Status != WorkflowStatusCancelled { + t.Fatalf("expected final workflow status to be CANCELLED, got %v", finalStatus.Status) + } + + if !queueEntriesAreCleanedUp(serverCtx) { + t.Fatal("expected queue entries to be cleaned up after cancel/resume timeout test") + } + }) + t.Run("CancelNonExistentWorkflow", func(t *testing.T) { nonExistentWorkflowID := "non-existent-workflow-id" diff --git a/dbos/system_database.go b/dbos/system_database.go index 2b9ea73e..5bd1905d 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -762,17 +762,25 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error { return nil // Workflow is complete, do nothing } - // Set the workflow's status to ENQUEUED and clear its recovery attempts and deadline + // If the original workflow has a timeout, let's recompute a deadline + var deadline *int64 = nil + if wf.Timeout > 0 { + deadlineMs := time.Now().Add(wf.Timeout).UnixMilli() + deadline = &deadlineMs + } + + // Set the workflow's status to ENQUEUED and clear its recovery attempts, set new deadline updateStatusQuery := `UPDATE dbos.workflow_status SET status = $1, queue_name = $2, recovery_attempts = $3, - workflow_deadline_epoch_ms = NULL, deduplication_id = NULL, - started_at_epoch_ms = NULL, updated_at = $4 - WHERE workflow_uuid = $5` + workflow_deadline_epoch_ms = $4, deduplication_id = NULL, + started_at_epoch_ms = NULL, updated_at = $5 + WHERE workflow_uuid = $6` _, err = tx.Exec(ctx, updateStatusQuery, WorkflowStatusEnqueued, _DBOS_INTERNAL_QUEUE_NAME, 0, + deadline, time.Now().UnixMilli(), workflowID) if err != nil { From 45e468d6691a01e5c5bde8fa6cc7efe45c529790 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 10:01:20 -0700 Subject: [PATCH 18/20] RETRIES_EXCEEDED -> MAX_RECOVERY_ATTEMPTS_EXCEEDED --- dbos/workflow.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 2f4743b1..c75f4d08 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -23,12 +23,12 @@ import ( type WorkflowStatusType string const ( - WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run - WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution - WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully - WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error - WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout) - WorkflowStatusRetriesExceeded WorkflowStatusType = "RETRIES_EXCEEDED" // Workflow exceeded maximum retry attempts + WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run + WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution + WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully + WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error + WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout) + WorkflowStatusRetriesExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts ) // WorkflowStatus contains comprehensive information about a workflow's current state and execution history. From 386849f437475598031ee77cbf2777a1640484ed Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 10:19:09 -0700 Subject: [PATCH 19/20] 1:1 mapping from DBOSContext interface and package level methods --- dbos/client_test.go | 30 +++++----- dbos/dbos.go | 1 + dbos/serialization_test.go | 4 +- dbos/workflow.go | 118 +++++++++++++++++++++++++++++++++++++ dbos/workflows_test.go | 12 ++-- 5 files changed, 142 insertions(+), 23 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 85e5523d..387736b7 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -269,7 +269,7 @@ func TestCancelResume(t *testing.T) { } // Cancel the workflow - err = clientCtx.CancelWorkflow(workflowID) + err = CancelWorkflow(clientCtx, workflowID) if err != nil { t.Fatalf("failed to cancel workflow: %v", err) } @@ -368,7 +368,7 @@ func TestCancelResume(t *testing.T) { time.Sleep(500 * time.Millisecond) // Cancel the workflow before timeout expires - err = clientCtx.CancelWorkflow(workflowID) + err = CancelWorkflow(clientCtx, workflowID) if err != nil { t.Fatalf("failed to cancel workflow: %v", err) } @@ -447,7 +447,7 @@ func TestCancelResume(t *testing.T) { nonExistentWorkflowID := "non-existent-workflow-id" // Try to cancel a non-existent workflow - err := clientCtx.CancelWorkflow(nonExistentWorkflowID) + err := CancelWorkflow(clientCtx, nonExistentWorkflowID) if err == nil { t.Fatal("expected error when canceling non-existent workflow, but got none") } @@ -848,7 +848,7 @@ func TestListWorkflows(t *testing.T) { } // Test 1: List all workflows (no filters) - allWorkflows, err := clientCtx.ListWorkflows() + allWorkflows, err := ListWorkflows(clientCtx) if err != nil { t.Fatalf("failed to list all workflows: %v", err) } @@ -858,7 +858,7 @@ func TestListWorkflows(t *testing.T) { // Test 2: Filter by workflow IDs expectedIDs := workflowIDs[:3] - specificWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDs(expectedIDs)) + specificWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(expectedIDs)) if err != nil { t.Fatalf("failed to list workflows by IDs: %v", err) } @@ -877,7 +877,7 @@ func TestListWorkflows(t *testing.T) { } // Test 3: Filter by workflow ID prefix - batchWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDPrefix("test-batch-")) + batchWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-batch-")) if err != nil { t.Fatalf("failed to list workflows by prefix: %v", err) } @@ -892,7 +892,7 @@ func TestListWorkflows(t *testing.T) { } // Test 4: Filter by status - SUCCESS - successWorkflows, err := clientCtx.ListWorkflows( + successWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), // Only our test workflows WithStatus([]WorkflowStatusType{WorkflowStatusSuccess})) if err != nil { @@ -909,7 +909,7 @@ func TestListWorkflows(t *testing.T) { } // Test 5: Filter by status - ERROR - errorWorkflows, err := clientCtx.ListWorkflows( + errorWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithStatus([]WorkflowStatusType{WorkflowStatusError})) if err != nil { @@ -927,7 +927,7 @@ func TestListWorkflows(t *testing.T) { // Test 6: Filter by time range - first 5 workflows (start to start+500ms) firstHalfTime := testStartTime.Add(500 * time.Millisecond) - firstHalfWorkflows, err := clientCtx.ListWorkflows( + firstHalfWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithEndTime(firstHalfTime)) if err != nil { @@ -938,7 +938,7 @@ func TestListWorkflows(t *testing.T) { } // Test 6b: Filter by time range - last 5 workflows (start+500ms to end) - secondHalfWorkflows, err := clientCtx.ListWorkflows( + secondHalfWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithStartTime(firstHalfTime)) if err != nil { @@ -949,7 +949,7 @@ func TestListWorkflows(t *testing.T) { } // Test 7: Test sorting order (ascending - default) - ascWorkflows, err := clientCtx.ListWorkflows( + ascWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithSortDesc(false)) if err != nil { @@ -957,7 +957,7 @@ func TestListWorkflows(t *testing.T) { } // Test 8: Test sorting order (descending) - descWorkflows, err := clientCtx.ListWorkflows( + descWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithSortDesc(true)) if err != nil { @@ -991,7 +991,7 @@ func TestListWorkflows(t *testing.T) { } // Test 9: Test limit and offset - limitedWorkflows, err := clientCtx.ListWorkflows( + limitedWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithLimit(5)) if err != nil { @@ -1008,7 +1008,7 @@ func TestListWorkflows(t *testing.T) { } } - offsetWorkflows, err := clientCtx.ListWorkflows( + offsetWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-"), WithOffset(5), WithLimit(3)) @@ -1027,7 +1027,7 @@ func TestListWorkflows(t *testing.T) { } // Test 10: Test input/output loading - noDataWorkflows, err := clientCtx.ListWorkflows( + noDataWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(workflowIDs[:2]), WithLoadInput(false), WithLoadOutput(false)) diff --git a/dbos/dbos.go b/dbos/dbos.go index 7e7dd9ad..229480ed 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -82,6 +82,7 @@ type DBOSContext interface { GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) + GetStepID() (int, error) // Get the current step ID (only available within workflows) // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 0b040965..e97af44d 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -103,7 +103,7 @@ func TestWorkflowEncoding(t *testing.T) { } // Test results from ListWorkflows - workflows, err := executor.ListWorkflows(WithWorkflowIDs( + workflows, err := ListWorkflows(executor, WithWorkflowIDs( []string{directHandle.GetWorkflowID()}, )) if err != nil { @@ -220,7 +220,7 @@ func TestWorkflowEncoding(t *testing.T) { } // Test results from ListWorkflows - workflows, err := executor.ListWorkflows(WithWorkflowIDs( + workflows, err := ListWorkflows(executor, WithWorkflowIDs( []string{directHandle.GetWorkflowID()}, )) if err != nil { diff --git a/dbos/workflow.go b/dbos/workflow.go index c75f4d08..83840fad 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1186,6 +1186,24 @@ func (c *dbosContext) Sleep(duration time.Duration) (time.Duration, error) { return c.systemDB.sleep(c, duration) } +// Sleep pauses workflow execution for the specified duration. +// This is a durable sleep - if the workflow is recovered during the sleep period, +// it will continue sleeping for the remaining time. +// Returns the actual duration slept. +// +// Example: +// +// actualDuration, err := dbos.Sleep(ctx, 5*time.Second) +// if err != nil { +// return err +// } +func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) { + if ctx == nil { + return 0, errors.New("ctx cannot be nil") + } + return ctx.Sleep(duration) +} + /***********************************/ /******* WORKFLOW MANAGEMENT *******/ /***********************************/ @@ -1208,6 +1226,42 @@ func (c *dbosContext) GetStepID() (int, error) { return wfState.stepID, nil } +// GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow. +// Returns an error if not called from within a workflow context. +// +// Example: +// +// workflowID, err := dbos.GetWorkflowID(ctx) +// if err != nil { +// log.Printf("Not in a workflow context: %v", err) +// } else { +// log.Printf("Current workflow ID: %s", workflowID) +// } +func GetWorkflowID(ctx DBOSContext) (string, error) { + if ctx == nil { + return "", errors.New("ctx cannot be nil") + } + return ctx.GetWorkflowID() +} + +// GetStepID retrieves the current step ID from the context if called within a DBOS workflow. +// Returns -1 and an error if not called from within a workflow context. +// +// Example: +// +// stepID, err := dbos.GetStepID(ctx) +// if err != nil { +// log.Printf("Not in a workflow context: %v", err) +// } else { +// log.Printf("Current step ID: %d", stepID) +// } +func GetStepID(ctx DBOSContext) (int, error) { + if ctx == nil { + return -1, errors.New("ctx cannot be nil") + } + return ctx.GetStepID() +} + func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) { workflowStatus, err := c.systemDB.listWorkflows(c, listWorkflowsDBInput{ workflowIDs: []string{workflowID}, @@ -1428,6 +1482,28 @@ func (c *dbosContext) CancelWorkflow(workflowID string) error { return c.systemDB.cancelWorkflow(c, workflowID) } +// CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED. +// Once cancelled, the workflow will stop executing. Currently executing steps will not be interrupted. +// +// Parameters: +// - ctx: DBOS context for the operation +// - workflowID: The unique identifier of the workflow to cancel +// +// Returns an error if the workflow does not exist or if the cancellation operation fails. +// +// Example: +// +// err := dbos.CancelWorkflow(ctx, "workflow-to-cancel") +// if err != nil { +// log.Printf("Failed to cancel workflow: %v", err) +// } +func CancelWorkflow(ctx DBOSContext, workflowID string) error { + if ctx == nil { + return errors.New("ctx cannot be nil") + } + return ctx.CancelWorkflow(workflowID) +} + func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) { err := c.systemDB.resumeWorkflow(c, workflowID) if err != nil { @@ -1826,3 +1902,45 @@ func (c *dbosContext) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStat return workflows, nil } + +// ListWorkflows retrieves a list of workflows based on the provided filters. +// +// The function supports filtering by workflow IDs, status, time ranges, names, application versions, +// authenticated users, workflow ID prefixes, and more. It also supports pagination through +// limit/offset parameters and sorting control (ascending by default, or descending with WithSortDesc). +// +// By default, both input and output data are loaded for each workflow. This can be controlled +// using WithLoadInput(false) and WithLoadOutput(false) options for better performance when +// the data is not needed. +// +// Parameters: +// - ctx: DBOS context for the operation +// - opts: Functional options to configure the query filters and parameters +// +// Returns a slice of WorkflowStatus structs containing the workflow information. +// +// Example usage: +// +// // List all successful workflows from the last 24 hours +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}), +// dbos.WithStartTime(time.Now().Add(-24*time.Hour)), +// dbos.WithLimit(100)) +// if err != nil { +// log.Fatal(err) +// } +// +// // List workflows by specific IDs without loading input/output data +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}), +// dbos.WithLoadInput(false), +// dbos.WithLoadOutput(false)) +// if err != nil { +// log.Fatal(err) +// } +func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + return ctx.ListWorkflows(opts...) +} diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 153b0fcf..cd87a58e 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -541,7 +541,7 @@ func TestChildWorkflow(t *testing.T) { // Create child workflows with executor childWf := func(dbosCtx DBOSContext, input Inheritance) (string, error) { - workflowID, err := dbosCtx.GetWorkflowID() + workflowID, err := GetWorkflowID(dbosCtx) if err != nil { return "", fmt.Errorf("failed to get workflow ID: %w", err) } @@ -557,7 +557,7 @@ func TestChildWorkflow(t *testing.T) { RegisterWorkflow(dbosCtx, childWf) parentWf := func(ctx DBOSContext, input Inheritance) (string, error) { - workflowID, err := ctx.GetWorkflowID() + workflowID, err := GetWorkflowID(ctx) if err != nil { return "", fmt.Errorf("failed to get workflow ID: %w", err) } @@ -624,7 +624,7 @@ func TestChildWorkflow(t *testing.T) { RegisterWorkflow(dbosCtx, parentWf) grandParentWf := func(ctx DBOSContext, r int) (string, error) { - workflowID, err := ctx.GetWorkflowID() + workflowID, err := GetWorkflowID(ctx) if err != nil { return "", fmt.Errorf("failed to get workflow ID: %w", err) } @@ -1064,7 +1064,7 @@ var ( func deadLetterQueueWorkflow(ctx DBOSContext, input string) (int, error) { recoveryCount++ - wfid, err := ctx.GetWorkflowID() + wfid, err := GetWorkflowID(ctx) if err != nil { return 0, fmt.Errorf("failed to get workflow ID: %v", err) } @@ -2528,7 +2528,7 @@ var ( ) func sleepRecoveryWorkflow(dbosCtx DBOSContext, duration time.Duration) (time.Duration, error) { - result, err := dbosCtx.Sleep(duration) + result, err := Sleep(dbosCtx, duration) if err != nil { return 0, err } @@ -2603,7 +2603,7 @@ func TestSleep(t *testing.T) { t.Run("SleepCannotBeCalledOutsideWorkflow", func(t *testing.T) { // Attempt to call Sleep outside of a workflow context - _, err := dbosCtx.Sleep(1 * time.Second) + _, err := Sleep(dbosCtx, 1*time.Second) if err == nil { t.Fatal("expected error when calling Sleep outside of workflow context, but got none") } From 24298b0a928dc7df9ceaab3a259a721d708fd664 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 10:20:45 -0700 Subject: [PATCH 20/20] allow forking at a step larger than the workflow max number of steps. This will fallback to the latest step --- dbos/client_test.go | 35 ----------------------------------- dbos/system_database.go | 11 ----------- 2 files changed, 46 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index 387736b7..6741492a 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -711,41 +711,6 @@ func TestForkWorkflow(t *testing.T) { } }) - t.Run("ForkWithInvalidStep", func(t *testing.T) { - originalWorkflowID := "original-workflow-invalid-step" - - // Create an original workflow first - handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ - WorkflowName: "ParentWorkflow", - QueueName: queue.Name, - WorkflowID: originalWorkflowID, - WorkflowInput: "test", - ApplicationVersion: serverCtx.GetApplicationVersion(), - }) - if err != nil { - t.Fatalf("failed to enqueue original workflow: %v", err) - } - - // Wait for completion - _, err = handle.GetResult() - if err != nil { - t.Fatalf("failed to get result from original workflow: %v", err) - } - - // Try to fork at step 999 (beyond workflow's actual steps) - _, err = clientCtx.ForkWorkflow(clientCtx, ForkWorkflowInput{ - OriginalWorkflowID: originalWorkflowID, - StartStep: 999, - }) - if err == nil { - t.Fatal("expected error when forking at step 999, but got none") - } - // Verify the error message - if !strings.Contains(err.Error(), "exceeds workflow's maximum step") { - t.Fatalf("expected error message to contain 'exceeds workflow's maximum step', got: %v", err) - } - }) - // Verify all queue entries are cleaned up if !queueEntriesAreCleanedUp(serverCtx) { t.Fatal("expected queue entries to be cleaned up after fork workflow tests") diff --git a/dbos/system_database.go b/dbos/system_database.go index 5bd1905d..8de034ab 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -829,17 +829,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err originalWorkflow := wfs[0] - // Validate that startStep doesn't exceed the workflow's actual steps - maxStepQuery := `SELECT COALESCE(MAX(function_id), 0) FROM dbos.operation_outputs WHERE workflow_uuid = $1` - var maxStepID int - err = tx.QueryRow(ctx, maxStepQuery, input.originalWorkflowID).Scan(&maxStepID) - if err != nil { - return fmt.Errorf("failed to query max step ID: %w", err) - } - if input.startStep > maxStepID && maxStepID > 0 { - return fmt.Errorf("startStep %d exceeds workflow's maximum step %d", input.startStep, maxStepID) - } - // Determine the application version to use appVersion := originalWorkflow.ApplicationVersion if input.applicationVersion != "" {