From 168f46986db3826229d16d714585921b04785f04 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 14:53:07 -0700 Subject: [PATCH 01/13] queueOptions public --- dbos/admin_server_test.go | 2 +- dbos/queue.go | 42 +++++++++++++++++++-------------------- dbos/system_database.go | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 9ba3c24c..daa150c2 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -125,7 +125,7 @@ func TestAdminServer(t *testing.T) { endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOW_QUEUES_METADATA_PATTERN, "GET /")), expectedStatus: http.StatusOK, validateResp: func(t *testing.T, resp *http.Response) { - var queueMetadata []WorkflowQueue + var queueMetadata []QueueOptions err := json.NewDecoder(resp.Body).Decode(&queueMetadata) require.NoError(t, err, "Failed to decode response as QueueMetadata array") assert.NotNil(t, queueMetadata, "Expected non-nil queue metadata array") diff --git a/dbos/queue.go b/dbos/queue.go index 709dd503..eed24e11 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -25,9 +25,9 @@ type RateLimiter struct { Period float64 // Time period in seconds for the rate limit } -// WorkflowQueue defines a named queue for workflow execution. +// QueueOptions defines a named queue for workflow execution. // Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting. -type WorkflowQueue struct { +type QueueOptions struct { Name string `json:"name"` // Unique queue name WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors @@ -36,45 +36,45 @@ type WorkflowQueue struct { MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration } -// queueOption is a functional option for configuring a workflow queue -type queueOption func(*WorkflowQueue) +// QueueOption is a functional option for configuring a workflow queue +type QueueOption func(*QueueOptions) // WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue. // This provides per-executor concurrency control. -func WithWorkerConcurrency(concurrency int) queueOption { - return func(q *WorkflowQueue) { +func WithWorkerConcurrency(concurrency int) QueueOption { + return func(q *QueueOptions) { q.WorkerConcurrency = &concurrency } } // WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue // across all executors. This provides global concurrency control. -func WithGlobalConcurrency(concurrency int) queueOption { - return func(q *WorkflowQueue) { +func WithGlobalConcurrency(concurrency int) QueueOption { + return func(q *QueueOptions) { q.GlobalConcurrency = &concurrency } } // WithPriorityEnabled enables priority-based scheduling for the queue. // When enabled, workflows with lower priority numbers are executed first. -func WithPriorityEnabled(enabled bool) queueOption { - return func(q *WorkflowQueue) { +func WithPriorityEnabled(enabled bool) QueueOption { + return func(q *QueueOptions) { q.PriorityEnabled = enabled } } // WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services. // The rate limiter enforces a maximum number of workflow starts within a time period. -func WithRateLimiter(limiter *RateLimiter) queueOption { - return func(q *WorkflowQueue) { +func WithRateLimiter(limiter *RateLimiter) QueueOption { + return func(q *QueueOptions) { q.RateLimit = limiter } } // WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration. // This controls batch sizes for queue processing. -func WithMaxTasksPerIteration(maxTasks int) queueOption { - return func(q *WorkflowQueue) { +func WithMaxTasksPerIteration(maxTasks int) QueueOption { + return func(q *QueueOptions) { q.MaxTasksPerIteration = maxTasks } } @@ -96,10 +96,10 @@ func WithMaxTasksPerIteration(maxTasks int) queueOption { // // // Enqueue workflows to this queue: // handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue")) -func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue { +func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) QueueOptions { ctx, ok := dbosCtx.(*dbosContext) if !ok { - return WorkflowQueue{} // Do nothing if the concrete type is not dbosContext + return QueueOptions{} // Do nothing if the concrete type is not dbosContext } if ctx.launched.Load() { panic("Cannot register workflow queue after DBOS has launched") @@ -111,7 +111,7 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) } // Create queue with default settings - q := WorkflowQueue{ + q := QueueOptions{ Name: name, WorkerConcurrency: nil, GlobalConcurrency: nil, @@ -142,7 +142,7 @@ type queueRunner struct { jitterMax float64 // Queue registry - workflowQueueRegistry map[string]WorkflowQueue + workflowQueueRegistry map[string]QueueOptions // Channel to signal completion back to the DBOS context completionChan chan struct{} @@ -157,13 +157,13 @@ func newQueueRunner() *queueRunner { scalebackFactor: 0.9, jitterMin: 0.95, jitterMax: 1.05, - workflowQueueRegistry: make(map[string]WorkflowQueue), + workflowQueueRegistry: make(map[string]QueueOptions), completionChan: make(chan struct{}), } } -func (qr *queueRunner) listQueues() []WorkflowQueue { - queues := make([]WorkflowQueue, 0, len(qr.workflowQueueRegistry)) +func (qr *queueRunner) listQueues() []QueueOptions { + queues := make([]QueueOptions, 0, len(qr.workflowQueueRegistry)) for _, queue := range qr.workflowQueueRegistry { queues = append(queues, queue) } diff --git a/dbos/system_database.go b/dbos/system_database.go index 4de79579..330bb57c 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1862,7 +1862,7 @@ type dequeuedWorkflow struct { } type dequeueWorkflowsInput struct { - queue WorkflowQueue + queue QueueOptions executorID string applicationVersion string } From 503d46cb724e37a3fbf6e08dc40905157a53d322 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 14:53:53 -0700 Subject: [PATCH 02/13] add a test for steps cancellation status after a wf timeout --- dbos/utils_test.go | 19 ++++++++ dbos/workflows_test.go | 98 +++++++++++++++++++++++++++++++++++------- 2 files changed, 102 insertions(+), 15 deletions(-) diff --git a/dbos/utils_test.go b/dbos/utils_test.go index 9a8bb20c..fc82954c 100644 --- a/dbos/utils_test.go +++ b/dbos/utils_test.go @@ -168,3 +168,22 @@ func queueEntriesAreCleanedUp(ctx DBOSContext) bool { return success } + +func checkWfStatus(ctx DBOSContext, expectedStatus WorkflowStatusType) (bool, error) { + wfid, err := GetWorkflowID(ctx) + if err != nil { + return false, err + } + me, err := RetrieveWorkflow[string](ctx, wfid) + if err != nil { + return false, err + } + meStatus, err := me.GetStatus() + if err != nil { + return false, err + } + if meStatus.Status == expectedStatus { + return true, nil + } + return false, nil +} diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 12fb8288..e205ebc1 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1308,15 +1308,15 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { } func receiveWorkflow(ctx DBOSContext, topic string) (string, error) { - msg1, err := Recv[string](ctx, topic, 10 * time.Second) + msg1, err := Recv[string](ctx, topic, 10*time.Second) if err != nil { return "", err } - msg2, err := Recv[string](ctx, topic, 10 * time.Second) + msg2, err := Recv[string](ctx, topic, 10*time.Second) if err != nil { return "", err } - msg3, err := Recv[string](ctx, topic, 10 * time.Second) + msg3, err := Recv[string](ctx, topic, 10*time.Second) if err != nil { return "", err } @@ -1335,7 +1335,7 @@ func receiveWorkflowCoordinated(ctx DBOSContext, input struct { concurrentRecvStartEvent.Wait() // Do a single Recv call with timeout - msg, err := Recv[string](ctx, input.Topic, 3 * time.Second) + msg, err := Recv[string](ctx, input.Topic, 3*time.Second) if err != nil { return "", err } @@ -1349,7 +1349,7 @@ func sendStructWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error } func receiveStructWorkflow(ctx DBOSContext, topic string) (sendRecvType, error) { - return Recv[sendRecvType](ctx, topic, 3 * time.Second) + return Recv[sendRecvType](ctx, topic, 3*time.Second) } func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { @@ -1362,7 +1362,7 @@ func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, } func receiveIdempotencyWorkflow(ctx DBOSContext, topic string) (string, error) { - msg, err := Recv[string](ctx, topic, 3 * time.Second) + msg, err := Recv[string](ctx, topic, 3*time.Second) if err != nil { // Unlock the test in this case receiveIdempotencyStartEvent.Set() @@ -1511,7 +1511,7 @@ func TestSendRecv(t *testing.T) { t.Run("RecvMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run Recv outside of a workflow context - _, err := Recv[string](dbosCtx, "test-topic", 1 * time.Second) + _, err := Recv[string](dbosCtx, "test-topic", 1*time.Second) require.Error(t, err, "expected error when running Recv outside of workflow context, but got none") // Check the error type @@ -1722,7 +1722,7 @@ func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, err } func getEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second) + result, err := GetEvent[string](ctx, input.Key, input.Message, 3*time.Second) if err != nil { return "", err } @@ -1759,7 +1759,7 @@ func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) ( } func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second) + result, err := GetEvent[string](ctx, input.Key, input.Message, 3*time.Second) if err != nil { return "", err } @@ -1969,7 +1969,7 @@ func TestSetGetEvent(t *testing.T) { } // Start a workflow that gets the event from outside the original workflow - message, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "test-key", 3 * time.Second) + message, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "test-key", 3*time.Second) if err != nil { t.Fatalf("failed to get event from outside workflow: %v", err) } @@ -1994,7 +1994,7 @@ func TestSetGetEvent(t *testing.T) { t.Run("GetEventTimeout", func(t *testing.T) { // Try to get an event from a non-existent workflow nonExistentID := uuid.NewString() - message, err := GetEvent[string](dbosCtx, nonExistentID, "test-key", 3 * time.Second) + message, err := GetEvent[string](dbosCtx, nonExistentID, "test-key", 3*time.Second) require.NoError(t, err, "failed to get event from non-existent workflow") if message != "" { t.Fatalf("expected empty result on timeout, got '%s'", message) @@ -2008,7 +2008,7 @@ func TestSetGetEvent(t *testing.T) { require.NoError(t, err, "failed to set event") _, err = setHandle.GetResult() require.NoError(t, err, "failed to get result from set event workflow") - message, err = GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "non-existent-key", 3 * time.Second) + message, err = GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "non-existent-key", 3*time.Second) require.NoError(t, err, "failed to get event with non-existent key") if message != "" { t.Fatalf("expected empty result on timeout with non-existent key, got '%s'", message) @@ -2155,7 +2155,7 @@ func TestSetGetEvent(t *testing.T) { for range numGoroutines { go func() { defer wg.Done() - res, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "concurrent-event-key", 10 * time.Second) + res, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "concurrent-event-key", 10*time.Second) if err != nil { errors <- fmt.Errorf("failed to get event in goroutine: %v", err) return @@ -2260,6 +2260,18 @@ func TestWorkflowTimeout(t *testing.T) { <-ctx.Done() assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) + // The status of this workflow should transition to cancelled + maxtries := 10 + for range maxtries { + isCancelled, err := checkWfStatus(ctx, WorkflowStatusCancelled) + if err != nil { + return "", err + } + if isCancelled { + break + } + time.Sleep(500 * time.Millisecond) + } return "", ctx.Err() } RegisterWorkflow(dbosCtx, waitForCancelWorkflow) @@ -2334,6 +2346,62 @@ func TestWorkflowTimeout(t *testing.T) { assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled") }) + waitForCancelWorkflowWithStepAfterCancel := func(ctx DBOSContext, _ string) (string, error) { + // Wait for cancellation + <-ctx.Done() + // Check that we have the correct cancellation error + if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) { + return "", fmt.Errorf("workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) + } + // The status of this workflow should transition to cancelled + maxtries := 10 + for range maxtries { + isCancelled, err := checkWfStatus(ctx, WorkflowStatusCancelled) + if err != nil { + return "", err + } + if isCancelled { + break + } + time.Sleep(500 * time.Millisecond) + } + + // After cancellation, try to run a simple step + // This should return a WorkflowCancelled error + return RunAsStep(ctx, simpleStep) + } + RegisterWorkflow(dbosCtx, waitForCancelWorkflowWithStepAfterCancel) + + t.Run("WorkflowWithStepAfterTimeout", func(t *testing.T) { + // Start a workflow that waits for cancellation then tries to run a step + cancelCtx, cancelFunc := WithTimeout(dbosCtx, 1*time.Millisecond) + defer cancelFunc() // Ensure we clean up the context + handle, err := RunAsWorkflow(cancelCtx, waitForCancelWorkflowWithStepAfterCancel, "wf-with-step-after-timeout") + require.NoError(t, err, "failed to start workflow with step after timeout") + + // Wait for the workflow to complete and get the result + result, err := handle.GetResult() + fmt.Println(result) + // The workflow should return a WorkflowCancelled error from the step + require.Error(t, err, "expected error from workflow") + + // Check if the error is a DBOSError with WorkflowCancelled code + var dbosErr *DBOSError + if errors.As(err, &dbosErr) { + assert.Equal(t, WorkflowCancelled, dbosErr.Code, "expected WorkflowCancelled error code, got: %v", dbosErr.Code) + } else { + // If not a DBOSError, check if it's a context error + assert.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), + "expected context.Canceled or context.DeadlineExceeded error, got: %v", err) + } + assert.Equal(t, "", result, "expected result to be an empty string") + + // Check the workflow status: should be cancelled + status, err := handle.GetStatus() + require.NoError(t, err, "failed to get workflow status") + assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled") + }) + shorterStepTimeoutWorkflow := func(ctx DBOSContext, _ string) (string, error) { // This workflow will run a step that has a shorter timeout than the workflow itself // The timeout will trigger a step error, the workflow can do whatever it wants with that error @@ -2519,7 +2587,7 @@ func TestWorkflowTimeout(t *testing.T) { } func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { - result, err := GetEvent[string](ctx, fmt.Sprintf("notification-setter-%d", pairID), "event-key", 10 * time.Second) + result, err := GetEvent[string](ctx, fmt.Sprintf("notification-setter-%d", pairID), "event-key", 10*time.Second) if err != nil { return "", err } @@ -2535,7 +2603,7 @@ func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { } func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { - result, err := Recv[string](ctx, "send-recv-topic", 10 * time.Second) + result, err := Recv[string](ctx, "send-recv-topic", 10*time.Second) if err != nil { return "", err } From ad463269ede3e6587ba99e383f41c5f281103a53 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 15:13:41 -0700 Subject: [PATCH 03/13] more exhaustive recovery test --- dbos/client_test.go | 2 +- dbos/serialization_test.go | 4 +- dbos/workflows_test.go | 165 ++++++++++++++++++++++++++++--------- 3 files changed, 131 insertions(+), 40 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index a076ae1e..4b8f4112 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) { // After first workflow completes, we should be able to enqueue with same deduplication ID handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"}, - WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before + WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion") diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 180117b3..98c472d1 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) { assert.Equal(t, "user-defined-event-set", result) // Retrieve the event to verify it was properly serialized and can be deserialized - retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second) + retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second) require.NoError(t, err) // Verify the retrieved data matches what we set @@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string, func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) { // Receive the user-defined type message - result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second) + result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second) return result, err } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index e205ebc1..c0c66a6a 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1009,57 +1009,148 @@ func TestWorkflowIdempotency(t *testing.T) { func TestWorkflowRecovery(t *testing.T) { dbosCtx := setupDBOS(t, true, true) - RegisterWorkflow(dbosCtx, idempotencyWorkflowWithStep) - t.Run("RecoveryResumeWhereItLeftOff", func(t *testing.T) { - // Reset the global counter - idempotencyCounter = 0 - // First execution - run the workflow once - input := "recovery-test" - idempotencyWorkflowWithStepEvent = NewEvent() - blockingStepStopEvent = NewEvent() - handle1, err := RunAsWorkflow(dbosCtx, idempotencyWorkflowWithStep, input) - require.NoError(t, err, "failed to execute workflow first time") + var ( + recoveryCounters []int64 + recoveryEvents []*Event + blockingEvents []*Event + ) + + recoveryWorkflow := func(dbosCtx DBOSContext, index int) (int64, error) { + // First step with custom name - increments the counter + _, err := RunAsStep(dbosCtx, func(ctx context.Context) (int64, error) { + recoveryCounters[index]++ + return recoveryCounters[index], nil + }, WithStepName(fmt.Sprintf("IncrementStep-%d", index))) + if err != nil { + return 0, err + } + + // Signal that first step is complete + recoveryEvents[index].Set() + + // Second step with custom name - blocks until signaled + _, err = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + blockingEvents[index].Wait() + return fmt.Sprintf("completed-%d", index), nil + }, WithStepName(fmt.Sprintf("BlockingStep-%d", index))) + if err != nil { + return 0, err + } + + return recoveryCounters[index], nil + } + + RegisterWorkflow(dbosCtx, recoveryWorkflow) + + t.Run("WorkflowRecovery", func(t *testing.T) { + const numWorkflows = 5 + + // Initialize slices for multiple workflows + recoveryCounters = make([]int64, numWorkflows) + recoveryEvents = make([]*Event, numWorkflows) + blockingEvents = make([]*Event, numWorkflows) + + // Create events for each workflow + for i := range numWorkflows { + recoveryEvents[i] = NewEvent() + blockingEvents[i] = NewEvent() + } + + // Start all workflows + handles := make([]WorkflowHandle[int64], numWorkflows) + for i := range numWorkflows { + handle, err := RunAsWorkflow(dbosCtx, recoveryWorkflow, i, WithWorkflowID(fmt.Sprintf("recovery-test-%d", i))) + require.NoError(t, err, "failed to start workflow %d", i) + handles[i] = handle + } + + // Wait for all first steps to complete + for i := range numWorkflows { + recoveryEvents[i].Wait() + } + + // Verify step states before recovery + for i := range numWorkflows { + steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) + require.NoError(t, err, "failed to get steps for workflow %d", i) + require.Len(t, steps, 1, "expected 1 completed step for workflow %d before recovery", i) - idempotencyWorkflowWithStepEvent.Wait() // Wait for the first step to complete. The second spins forever. + // Verify first step has custom name and completed + assert.Equal(t, fmt.Sprintf("IncrementStep-%d", i), steps[0].StepName, "workflow %d first step name mismatch", i) + assert.Equal(t, 0, steps[0].StepID, "workflow %d first step ID should be 0", i) + assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i) + assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i) + } + + // Verify counters are all 1 (executed once) + for i := range numWorkflows { + require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should be 1 before recovery", i) + } - // Run recovery for pending workflows with "local" executor + // Run recovery recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"}) require.NoError(t, err, "failed to recover pending workflows") + require.Len(t, recoveredHandles, numWorkflows, "expected %d recovered handles, got %d", numWorkflows, len(recoveredHandles)) - // Check that we have a single handle in the return list - require.Len(t, recoveredHandles, 1, "expected 1 recovered handle, got %d", len(recoveredHandles)) + // Create a map for easy lookup of recovered handles + recoveredMap := make(map[string]WorkflowHandle[any]) + for _, h := range recoveredHandles { + recoveredMap[h.GetWorkflowID()] = h + } - // Check that the workflow ID from the handle is the same as the first handle - recoveredHandle := recoveredHandles[0] - _, ok := recoveredHandle.(*workflowPollingHandle[any]) - require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", recoveredHandle) - require.Equal(t, handle1.GetWorkflowID(), recoveredHandle.GetWorkflowID()) + // Verify all original workflows were recovered + for i := range numWorkflows { + originalID := handles[i].GetWorkflowID() + recoveredHandle, found := recoveredMap[originalID] + require.True(t, found, "workflow %d with ID %s not found in recovered handles", i, originalID) + + _, ok := recoveredHandle.(*workflowPollingHandle[any]) + require.True(t, ok, "recovered handle %d should be of type workflowPollingHandle, got %T", i, recoveredHandle) + } - idempotencyWorkflowWithStepEvent.Clear() - idempotencyWorkflowWithStepEvent.Wait() + // Verify first steps were NOT re-executed (counters should still be 1) + for i := range numWorkflows { + require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should remain 1 after recovery (idempotent)", i) + } - // Check that the first step was *not* re-executed (idempotency counter is still 1) - require.Equal(t, int64(1), idempotencyCounter, "expected counter to remain 1 after recovery (idempotent)") + // Verify workflow attempts increased to 2 + for i := range numWorkflows { + workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{ + workflowIDs: []string{handles[i].GetWorkflowID()}, + }) + require.NoError(t, err, "failed to list workflow %d", i) + require.Len(t, workflows, 1, "expected 1 workflow entry for workflow %d", i) + assert.Equal(t, 2, workflows[0].Attempts, "workflow %d should have 2 attempts after recovery", i) + } - // Using ListWorkflows, retrieve the status of the workflow - workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{ - workflowIDs: []string{handle1.GetWorkflowID()}, - }) - require.NoError(t, err, "failed to list workflows") + // Unblock all workflows and verify they complete + for i := range numWorkflows { + blockingEvents[i].Set() + } - require.Len(t, workflows, 1, "expected 1 workflow, got %d", len(workflows)) + // Get results from all recovered workflows + for i := range numWorkflows { + recoveredHandle := recoveredMap[handles[i].GetWorkflowID()] + result, err := recoveredHandle.GetResult() + require.NoError(t, err, "failed to get result from recovered workflow %d", i) - workflow := workflows[0] + // Result should be the counter value (1) + require.Equal(t, int64(1), result, "workflow %d result should be 1", i) + } - // Ensure its number of attempts is 2 - require.Equal(t, 2, workflow.Attempts) + // Final verification of step states + for i := range numWorkflows { + steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) + require.NoError(t, err, "failed to get final steps for workflow %d", i) + require.Len(t, steps, 2, "expected 2 steps for workflow %d", i) - // unlock the workflow & wait for result - blockingStepStopEvent.Set() // This will allow the blocking step to complete - result, err := recoveredHandle.GetResult() - require.NoError(t, err, "failed to get result from recovered handle") - require.Equal(t, idempotencyCounter, result) + // Both steps should now be completed + assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i) + assert.NotNil(t, steps[1].Output, "workflow %d second step should have output", i) + assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i) + assert.Nil(t, steps[1].Error, "workflow %d second step should not have error", i) + } }) } From c158c43fd717f9ad7f5b04dcc4adbc47d8ba5f5b Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 16:07:08 -0700 Subject: [PATCH 04/13] http server explicitly stringifies input/outputs --- dbos/admin_server.go | 51 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 4953a247..b6449529 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -105,9 +105,9 @@ type adminServer struct { isDeactivated atomic.Int32 } -// workflowStatusToUTC converts a WorkflowStatus to a map with all time fields in UTC +// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC // not super ergonomic but the DBOS console excepts unix timestamps -func workflowStatusToUTC(ws WorkflowStatus) map[string]any { +func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, err) { result := map[string]any{ "WorkflowUUID": ws.ID, "Status": ws.Status, @@ -152,7 +152,23 @@ func workflowStatusToUTC(ws WorkflowStatus) map[string]any { result["StartedAt"] = nil } - return result + if ws.Input != nil { + bytes, err := json.Marshal(ws.Input) + if err != nil { + return nil, fmt.Errorf("failed to marshal input: %w", err) + } + result["Input"] = string(bytes) + } + + if ws.Output != nil { + bytes, err := json.Marshal(ws.Output) + if err != nil { + return nil, fmt.Errorf("failed to marshal output: %w", err) + } + result["Output"] = string(bytes) + } + + return result, nil } func newAdminServer(ctx *dbosContext, port int) *adminServer { @@ -295,13 +311,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } // Transform to UTC before encoding - utcWorkflows := make([]map[string]any, len(workflows)) + responseWorkflows := make([]map[string]any, len(workflows)) for i, wf := range workflows { - utcWorkflows[i] = workflowStatusToUTC(wf) + responseWorkflows[i], err = toListWorkflowResponse(wf) + if err != nil { + ctx.logger.Error("Error transforming workflow response", "error", err) + http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError) + return + } } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil { + if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil { ctx.logger.Error("Error encoding workflows response", "error", err) http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError) } @@ -327,7 +348,12 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } // Return the first (and only) workflow, transformed to UTC - workflow := workflowStatusToUTC(workflows[0]) + workflow, err := toListWorkflowResponse(workflows[0]) + if err != nil { + ctx.logger.Error("Error transforming workflow response", "error", err) + http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError) + return + } w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(workflow); err != nil { @@ -365,13 +391,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } // Transform to UNIX timestamps before encoding - utcWorkflows := make([]map[string]any, len(workflows)) + responseWorkflows := make([]map[string]any, len(workflows)) for i, wf := range workflows { - utcWorkflows[i] = workflowStatusToUTC(wf) + responseWorkflows[i], err = toListWorkflowResponse(wf) + if err != nil { + ctx.logger.Error("Error transforming workflow response", "error", err) + http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError) + return + } } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil { + if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil { ctx.logger.Error("Error encoding queued workflows response", "error", err) http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError) } From 6f05959849962c471d9a44c93e80b1d57e83bf3b Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 16:08:27 -0700 Subject: [PATCH 05/13] typo --- dbos/admin_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index b6449529..d34ea73c 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -107,7 +107,7 @@ type adminServer struct { // toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC // not super ergonomic but the DBOS console excepts unix timestamps -func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, err) { +func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { result := map[string]any{ "WorkflowUUID": ws.ID, "Status": ws.Status, From 8cee96a69dde41210baf6a858946d6cc6e954e01 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 17:11:25 -0700 Subject: [PATCH 06/13] must not stringify empty strings + add a test --- dbos/admin_server.go | 4 +- dbos/admin_server_test.go | 146 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 2 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index d34ea73c..75543a19 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -152,7 +152,7 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { result["StartedAt"] = nil } - if ws.Input != nil { + if ws.Input != nil && ws.Input != "" { bytes, err := json.Marshal(ws.Input) if err != nil { return nil, fmt.Errorf("failed to marshal input: %w", err) @@ -160,7 +160,7 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { result["Input"] = string(bytes) } - if ws.Output != nil { + if ws.Output != nil && ws.Output != "" { bytes, err := json.Marshal(ws.Output) if err != nil { return nil, fmt.Errorf("failed to marshal output: %w", err) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index daa150c2..48a3215f 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -213,6 +213,152 @@ func TestAdminServer(t *testing.T) { } }) + t.Run("List workflows input/output values", func(t *testing.T) { + resetTestDatabase(t, databaseURL) + ctx, err := NewDBOSContext(Config{ + DatabaseURL: databaseURL, + AppName: "test-app", + AdminServer: true, + }) + require.NoError(t, err) + + // Define a custom struct for testing + type TestStruct struct { + Name string `json:"name"` + Value int `json:"value"` + } + + // Test workflow with int input/output + intWorkflow := func(dbosCtx DBOSContext, input int) (int, error) { + return input * 2, nil + } + RegisterWorkflow(ctx, intWorkflow) + + // Test workflow with empty string input/output + emptyStringWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + return "", nil + } + RegisterWorkflow(ctx, emptyStringWorkflow) + + // Test workflow with struct input/output + structWorkflow := func(dbosCtx DBOSContext, input TestStruct) (TestStruct, error) { + return TestStruct{Name: "output-" + input.Name, Value: input.Value * 2}, nil + } + RegisterWorkflow(ctx, structWorkflow) + + err = ctx.Launch() + require.NoError(t, err) + + // Ensure cleanup + defer func() { + if ctx != nil { + ctx.Cancel() + } + }() + + // Give the server a moment to start + time.Sleep(100 * time.Millisecond) + + client := &http.Client{Timeout: 5 * time.Second} + endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /")) + + // Create workflows with different input/output types + // 1. Integer workflow + intHandle, err := RunAsWorkflow(ctx, intWorkflow, 42) + require.NoError(t, err, "Failed to create int workflow") + intResult, err := intHandle.GetResult() + require.NoError(t, err, "Failed to get int workflow result") + assert.Equal(t, 84, intResult) + + // 2. Empty string workflow + emptyStringHandle, err := RunAsWorkflow(ctx, emptyStringWorkflow, "") + require.NoError(t, err, "Failed to create empty string workflow") + emptyStringResult, err := emptyStringHandle.GetResult() + require.NoError(t, err, "Failed to get empty string workflow result") + assert.Equal(t, "", emptyStringResult) + + // 3. Struct workflow + structInput := TestStruct{Name: "test", Value: 10} + structHandle, err := RunAsWorkflow(ctx, structWorkflow, structInput) + require.NoError(t, err, "Failed to create struct workflow") + structResult, err := structHandle.GetResult() + require.NoError(t, err, "Failed to get struct workflow result") + assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, structResult) + + // Query workflows with input/output loading enabled + // Filter by the workflow IDs we just created to avoid interference from other tests + reqBody := map[string]any{ + "workflow_uuids": []string{ + intHandle.GetWorkflowID(), + emptyStringHandle.GetWorkflowID(), + structHandle.GetWorkflowID(), + }, + "load_input": true, + "load_output": true, + "limit": 10, + } + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody))) + require.NoError(t, err, "Failed to create request") + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + require.NoError(t, err, "Failed to make request") + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var workflows []map[string]any + err = json.NewDecoder(resp.Body).Decode(&workflows) + require.NoError(t, err, "Failed to decode workflows response") + + // Should have exactly 3 workflows + assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows") + + // Verify each workflow's input/output marshaling + for _, wf := range workflows { + wfID := wf["WorkflowUUID"].(string) + + // Check input and output fields exist and are strings (JSON marshaled) + if wfID == intHandle.GetWorkflowID() { + // Integer workflow: input and output should be marshaled as JSON strings + inputStr, ok := wf["Input"].(string) + require.True(t, ok, "Int workflow Input should be a string") + assert.Equal(t, "42", inputStr, "Int workflow input should be marshaled as '42'") + + outputStr, ok := wf["Output"].(string) + require.True(t, ok, "Int workflow Output should be a string") + assert.Equal(t, "84", outputStr, "Int workflow output should be marshaled as '84'") + + } else if wfID == emptyStringHandle.GetWorkflowID() { + // Empty string workflow: both input and output are empty strings + // According to the logic, empty strings should not have Input/Output fields + input, hasInput := wf["Input"] + require.Equal(t, "", input) + require.True(t, hasInput, "Empty string workflow should have Input field") + + output, hasOutput := wf["Output"] + require.True(t, hasOutput, "Empty string workflow should have Output field") + require.Equal(t, "", output) + + } else if wfID == structHandle.GetWorkflowID() { + // Struct workflow: input and output should be marshaled as JSON strings + inputStr, ok := wf["Input"].(string) + require.True(t, ok, "Struct workflow Input should be a string") + var inputStruct TestStruct + err = json.Unmarshal([]byte(inputStr), &inputStruct) + require.NoError(t, err, "Failed to unmarshal struct workflow input") + assert.Equal(t, structInput, inputStruct, "Struct workflow input should match") + + outputStr, ok := wf["Output"].(string) + require.True(t, ok, "Struct workflow Output should be a string") + var outputStruct TestStruct + err = json.Unmarshal([]byte(outputStr), &outputStruct) + require.NoError(t, err, "Failed to unmarshal struct workflow output") + assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, outputStruct, "Struct workflow output should match") + } + } + }) + t.Run("List endpoints time filtering", func(t *testing.T) { ctx, err := NewDBOSContext(Config{ DatabaseURL: databaseURL, From 60fb179d28906dfb8ee9c9944a3c28f1fc62a7bf Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 19 Aug 2025 17:14:48 -0700 Subject: [PATCH 07/13] not an option --- dbos/admin_server_test.go | 2 +- dbos/queue.go | 30 +++++++++++++++--------------- dbos/system_database.go | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 48a3215f..6990089a 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -125,7 +125,7 @@ func TestAdminServer(t *testing.T) { endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOW_QUEUES_METADATA_PATTERN, "GET /")), expectedStatus: http.StatusOK, validateResp: func(t *testing.T, resp *http.Response) { - var queueMetadata []QueueOptions + var queueMetadata []WorkflowQueue err := json.NewDecoder(resp.Body).Decode(&queueMetadata) require.NoError(t, err, "Failed to decode response as QueueMetadata array") assert.NotNil(t, queueMetadata, "Expected non-nil queue metadata array") diff --git a/dbos/queue.go b/dbos/queue.go index eed24e11..88035ff2 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -25,9 +25,9 @@ type RateLimiter struct { Period float64 // Time period in seconds for the rate limit } -// QueueOptions defines a named queue for workflow execution. +// WorkflowQueue defines a named queue for workflow execution. // Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting. -type QueueOptions struct { +type WorkflowQueue struct { Name string `json:"name"` // Unique queue name WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors @@ -37,12 +37,12 @@ type QueueOptions struct { } // QueueOption is a functional option for configuring a workflow queue -type QueueOption func(*QueueOptions) +type QueueOption func(*WorkflowQueue) // WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue. // This provides per-executor concurrency control. func WithWorkerConcurrency(concurrency int) QueueOption { - return func(q *QueueOptions) { + return func(q *WorkflowQueue) { q.WorkerConcurrency = &concurrency } } @@ -50,7 +50,7 @@ func WithWorkerConcurrency(concurrency int) QueueOption { // WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue // across all executors. This provides global concurrency control. func WithGlobalConcurrency(concurrency int) QueueOption { - return func(q *QueueOptions) { + return func(q *WorkflowQueue) { q.GlobalConcurrency = &concurrency } } @@ -58,7 +58,7 @@ func WithGlobalConcurrency(concurrency int) QueueOption { // WithPriorityEnabled enables priority-based scheduling for the queue. // When enabled, workflows with lower priority numbers are executed first. func WithPriorityEnabled(enabled bool) QueueOption { - return func(q *QueueOptions) { + return func(q *WorkflowQueue) { q.PriorityEnabled = enabled } } @@ -66,7 +66,7 @@ func WithPriorityEnabled(enabled bool) QueueOption { // WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services. // The rate limiter enforces a maximum number of workflow starts within a time period. func WithRateLimiter(limiter *RateLimiter) QueueOption { - return func(q *QueueOptions) { + return func(q *WorkflowQueue) { q.RateLimit = limiter } } @@ -74,7 +74,7 @@ func WithRateLimiter(limiter *RateLimiter) QueueOption { // WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration. // This controls batch sizes for queue processing. func WithMaxTasksPerIteration(maxTasks int) QueueOption { - return func(q *QueueOptions) { + return func(q *WorkflowQueue) { q.MaxTasksPerIteration = maxTasks } } @@ -96,10 +96,10 @@ func WithMaxTasksPerIteration(maxTasks int) QueueOption { // // // Enqueue workflows to this queue: // handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue")) -func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) QueueOptions { +func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue { ctx, ok := dbosCtx.(*dbosContext) if !ok { - return QueueOptions{} // Do nothing if the concrete type is not dbosContext + return WorkflowQueue{} // Do nothing if the concrete type is not dbosContext } if ctx.launched.Load() { panic("Cannot register workflow queue after DBOS has launched") @@ -111,7 +111,7 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) } // Create queue with default settings - q := QueueOptions{ + q := WorkflowQueue{ Name: name, WorkerConcurrency: nil, GlobalConcurrency: nil, @@ -142,7 +142,7 @@ type queueRunner struct { jitterMax float64 // Queue registry - workflowQueueRegistry map[string]QueueOptions + workflowQueueRegistry map[string]WorkflowQueue // Channel to signal completion back to the DBOS context completionChan chan struct{} @@ -157,13 +157,13 @@ func newQueueRunner() *queueRunner { scalebackFactor: 0.9, jitterMin: 0.95, jitterMax: 1.05, - workflowQueueRegistry: make(map[string]QueueOptions), + workflowQueueRegistry: make(map[string]WorkflowQueue), completionChan: make(chan struct{}), } } -func (qr *queueRunner) listQueues() []QueueOptions { - queues := make([]QueueOptions, 0, len(qr.workflowQueueRegistry)) +func (qr *queueRunner) listQueues() []WorkflowQueue { + queues := make([]WorkflowQueue, 0, len(qr.workflowQueueRegistry)) for _, queue := range qr.workflowQueueRegistry { queues = append(queues, queue) } diff --git a/dbos/system_database.go b/dbos/system_database.go index 330bb57c..4de79579 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1862,7 +1862,7 @@ type dequeuedWorkflow struct { } type dequeueWorkflowsInput struct { - queue QueueOptions + queue WorkflowQueue executorID string applicationVersion string } From 645726b3827d04cf94000cd25ec0b7ab03a0ef6d Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 08:13:59 -0700 Subject: [PATCH 08/13] fix test --- dbos/admin_server_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 6990089a..25dbffa8 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -360,6 +360,7 @@ func TestAdminServer(t *testing.T) { }) t.Run("List endpoints time filtering", func(t *testing.T) { + resetTestDatabase(t, databaseURL) ctx, err := NewDBOSContext(Config{ DatabaseURL: databaseURL, AppName: "test-app", From 057a409f7d544696c82a96b9807415059651baa7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 10:09:21 -0700 Subject: [PATCH 09/13] filter out workflow status when listing queue tasks --- dbos/admin_server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 75543a19..d25840fc 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -372,7 +372,10 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } } - workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...) + req.Status = "" // We are not expecting a filter here but clear just in case + filters := req.toListWorkflowsOptions() + filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending})) + workflows, err := ListWorkflows(ctx, filters...) if err != nil { ctx.logger.Error("Failed to list queued workflows", "error", err) http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError) From f703fbe48a13f43306024949924ad526b4f449dc Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 10:22:58 -0700 Subject: [PATCH 10/13] nit --- dbos/workflows_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index c0c66a6a..828425ab 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2472,7 +2472,6 @@ func TestWorkflowTimeout(t *testing.T) { // Wait for the workflow to complete and get the result result, err := handle.GetResult() - fmt.Println(result) // The workflow should return a WorkflowCancelled error from the step require.Error(t, err, "expected error from workflow") From 0d571055170910f58d60660ccd4f9708a57bb525 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 10:53:27 -0700 Subject: [PATCH 11/13] RetrieveWorkflow return handle interface --- dbos/workflow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index dcf47596..27230182 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1344,7 +1344,7 @@ func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (Workfl // } else { // log.Printf("Result: %d", result) // } -func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPollingHandle[R], error) { +func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error) { if ctx == nil { return nil, errors.New("dbosCtx cannot be nil") } From f183e1efdc75a8242442f62d260c66f474990601 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 11:10:06 -0700 Subject: [PATCH 12/13] MAX_RECOVERY_ATTEMPTS_EXCEEDED --- dbos/system_database.go | 8 ++++---- dbos/workflow.go | 2 +- dbos/workflows_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 4de79579..61afb939 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -421,11 +421,11 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt } // Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1. - // When this number becomes equal to `maxRetries + 1`, we mark the workflow as `RETRIES_EXCEEDED`. + // When this number becomes equal to `maxRetries + 1`, we mark the workflow as `MAX_RECOVERY_ATTEMPTS_EXCEEDED`. if result.status != WorkflowStatusSuccess && result.status != WorkflowStatusError && input.maxRetries > 0 && result.attempts > input.maxRetries+1 { - // Update workflow status to RETRIES_EXCEEDED and clear queue-related fields + // Update workflow status to MAX_RECOVERY_ATTEMPTS_EXCEEDED and clear queue-related fields dlqQuery := `UPDATE dbos.workflow_status SET status = $1, deduplication_id = NULL, started_at_epoch_ms = NULL, queue_name = NULL WHERE workflow_uuid = $2 AND status = $3` @@ -436,12 +436,12 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt WorkflowStatusPending) if err != nil { - return nil, fmt.Errorf("failed to update workflow to RETRIES_EXCEEDED: %w", err) + return nil, fmt.Errorf("failed to update workflow to %s: %w", WorkflowStatusRetriesExceeded, err) } // Commit the transaction before throwing the error if err := input.tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("failed to commit transaction after marking workflow as RETRIES_EXCEEDED: %w", err) + return nil, fmt.Errorf("failed to commit transaction after marking workflow as %s: %w", WorkflowStatusRetriesExceeded, err) } return nil, newDeadLetterQueueError(input.status.ID, input.maxRetries) diff --git a/dbos/workflow.go b/dbos/workflow.go index 27230182..1523f3bf 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -317,7 +317,7 @@ const ( // WithMaxRetries sets the maximum number of retry attempts for workflow recovery. // If a workflow fails or is interrupted, it will be retried up to this many times. -// After exceeding max retries, the workflow status becomes RETRIES_EXCEEDED. +// After exceeding max retries, the workflow status becomes MAX_RECOVERY_ATTEMPTS_EXCEEDED. func WithMaxRetries(maxRetries int) WorkflowRegistrationOption { return func(p *WorkflowRegistrationOptions) { p.maxRetries = maxRetries diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 828425ab..0d1c47e4 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1213,7 +1213,7 @@ func TestWorkflowDeadLetterQueue(t *testing.T) { require.True(t, ok, "expected DBOSError, got %T", err) require.Equal(t, DeadLetterQueueError, dbosErr.Code) - // Verify workflow status is RETRIES_EXCEEDED + // Verify workflow status is MAX_RECOVERY_ATTEMPTS_EXCEEDED status, err := handle.GetStatus() require.NoError(t, err, "failed to get workflow status") require.Equal(t, WorkflowStatusRetriesExceeded, status.Status) From 35e559fa57dac4c5d48118f88f288b218550c2dd Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 20 Aug 2025 14:45:16 -0700 Subject: [PATCH 13/13] WorkflowStatusMaxRecoveryAttemptsExceeded --- dbos/admin_server_test.go | 2 -- dbos/queues_test.go | 4 ++-- dbos/system_database.go | 6 +++--- dbos/workflow.go | 12 ++++++------ dbos/workflows_test.go | 2 +- 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 25dbffa8..da40359e 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -455,7 +455,6 @@ func TestAdminServer(t *testing.T) { "start_time": timeBetween.Format(time.RFC3339Nano), "limit": 10, } - fmt.Println("Request body 2:", reqBody2, "timebetween", timeBetween.UnixMilli()) req2, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody2))) require.NoError(t, err, "Failed to create request 2") req2.Header.Set("Content-Type", "application/json") @@ -469,7 +468,6 @@ func TestAdminServer(t *testing.T) { var workflows2 []map[string]any err = json.NewDecoder(resp2.Body).Decode(&workflows2) require.NoError(t, err, "Failed to decode workflows response 2") - fmt.Println(workflows2) // Should have exactly 1 workflow (the second one) assert.Equal(t, 1, len(workflows2), "Expected exactly 1 workflow with start_time after timeBetween") diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 78bcc16f..8b28fe6d 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -233,13 +233,13 @@ func TestWorkflowQueues(t *testing.T) { for { dlqStatus, err := dlqHandle[0].GetStatus() require.NoError(t, err, "failed to get status of DLQ workflow handle") - if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 { + if dlqStatus.Status != WorkflowStatusMaxRecoveryAttemptsExceeded && retries < 10 { time.Sleep(1 * time.Second) // Wait a bit before checking again retries++ continue } require.NoError(t, err, "failed to get status of DLQ workflow handle") - assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded") + assert.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded") handles = append(handles, dlqHandle[0]) break } diff --git a/dbos/system_database.go b/dbos/system_database.go index 61afb939..94496daa 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -431,17 +431,17 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt WHERE workflow_uuid = $2 AND status = $3` _, err = input.tx.Exec(ctx, dlqQuery, - WorkflowStatusRetriesExceeded, + WorkflowStatusMaxRecoveryAttemptsExceeded, input.status.ID, WorkflowStatusPending) if err != nil { - return nil, fmt.Errorf("failed to update workflow to %s: %w", WorkflowStatusRetriesExceeded, err) + return nil, fmt.Errorf("failed to update workflow to %s: %w", WorkflowStatusMaxRecoveryAttemptsExceeded, err) } // Commit the transaction before throwing the error if err := input.tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("failed to commit transaction after marking workflow as %s: %w", WorkflowStatusRetriesExceeded, err) + return nil, fmt.Errorf("failed to commit transaction after marking workflow as %s: %w", WorkflowStatusMaxRecoveryAttemptsExceeded, err) } return nil, newDeadLetterQueueError(input.status.ID, input.maxRetries) diff --git a/dbos/workflow.go b/dbos/workflow.go index 1523f3bf..c26469a0 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -22,12 +22,12 @@ import ( type WorkflowStatusType string const ( - WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run - WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution - WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully - WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error - WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout) - WorkflowStatusRetriesExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts + WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run + WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution + WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully + WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error + WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout) + WorkflowStatusMaxRecoveryAttemptsExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts ) // WorkflowStatus contains comprehensive information about a workflow's current state and execution history. diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 0d1c47e4..19eceac4 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1216,7 +1216,7 @@ func TestWorkflowDeadLetterQueue(t *testing.T) { // Verify workflow status is MAX_RECOVERY_ATTEMPTS_EXCEEDED status, err := handle.GetStatus() require.NoError(t, err, "failed to get workflow status") - require.Equal(t, WorkflowStatusRetriesExceeded, status.Status) + require.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, status.Status) // Verify that attempting to start a workflow with the same ID throws a DLQ error _, err = RunAsWorkflow(dbosCtx, deadLetterQueueWorkflow, "test", WithWorkflowID(wfID))