From 0a801e5c6c26905e16840bd354bd90daae5494a5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 14:23:23 -0700 Subject: [PATCH 1/6] add client GetWorkflowSteps --- dbos/client.go | 6 ++++ dbos/client_test.go | 66 +++++++++++++++++++++++++++++++++++++++++ dbos/system_database.go | 15 ++++++---- dbos/workflow.go | 15 ++++++++-- 4 files changed, 95 insertions(+), 7 deletions(-) diff --git a/dbos/client.go b/dbos/client.go index f9ad206..c04130b 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -31,6 +31,7 @@ type Client interface { CancelWorkflow(workflowID string) error ResumeWorkflow(workflowID string) (WorkflowHandle[any], error) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error) + GetWorkflowSteps(workflowID string) ([]StepInfo, error) Shutdown(timeout time.Duration) // Simply close the system DB connection pool } @@ -295,6 +296,11 @@ func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], err return c.dbosCtx.ForkWorkflow(c.dbosCtx, input) } +// GetWorkflowSteps retrieves the execution steps of a workflow. +func (c *client) GetWorkflowSteps(workflowID string) ([]StepInfo, error) { + return c.dbosCtx.GetWorkflowSteps(c.dbosCtx, workflowID) +} + // Shutdown gracefully shuts down the client and closes the system database connection. func (c *client) Shutdown(timeout time.Duration) { // Get the concrete dbosContext to access internal fields diff --git a/dbos/client_test.go b/dbos/client_test.go index ab4d287..cfa9a88 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -928,3 +928,69 @@ func TestListWorkflows(t *testing.T) { // Verify all queue entries are cleaned up require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests") } + +func TestGetWorkflowSteps(t *testing.T) { + // Setup server context + serverCtx := setupDBOS(t, true, true) + + // Create queue for communication + queue := NewWorkflowQueue(serverCtx, "get-workflow-steps-queue") + + // Workflow with one step + stepFunction := func(ctx context.Context) (string, error) { + return "abc", nil + } + + testWorkflow := func(ctx DBOSContext, input string) (string, error) { + result, err := RunAsStep(ctx, stepFunction) + if err != nil { + return "", err + } + return result, nil + } + RegisterWorkflow(serverCtx, testWorkflow, WithWorkflowName("TestWorkflow")) + + // Launch server + err := Launch(serverCtx) + require.NoError(t, err) + + // Setup client + databaseURL := getDatabaseURL() + config := ClientConfig{ + DatabaseURL: databaseURL, + } + client, err := NewClient(context.Background(), config) + require.NoError(t, err) + t.Cleanup(func() { + if client != nil { + client.Shutdown(30 * time.Second) + } + }) + + // Enqueue and run the workflow + workflowID := "test-get-workflow-steps" + handle, err := Enqueue[string, string](client, queue.Name, "TestWorkflow", "test-input", WithEnqueueWorkflowID(workflowID)) + require.NoError(t, err) + + // Wait for workflow to complete + result, err := handle.GetResult() + require.NoError(t, err) + assert.Equal(t, "abc", result) + + // Test GetWorkflowSteps with loadOutput = true + stepsWithOutput, err := client.GetWorkflowSteps(workflowID) + require.NoError(t, err) + require.Len(t, stepsWithOutput, 1, "expected exactly 1 step") + + step := stepsWithOutput[0] + assert.Equal(t, 0, step.StepID, "expected step ID to be 0") + assert.NotEmpty(t, step.StepName, "expected step name to be set") + assert.Nil(t, step.Error, "expected no error in step") + assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID") + + // Verify the output wasn't loaded + require.Nil(t, step.Output, "expected output not to be loaded") + + // Verify all queue entries are cleaned up + require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after get workflow steps test") +} diff --git a/dbos/system_database.go b/dbos/system_database.go index 7c68e73..01a4ef1 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -51,7 +51,7 @@ type systemDatabase interface { // Steps recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error) - getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) + getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error) // Communication (special steps) send(ctx context.Context, input WorkflowSendInput) error @@ -1457,13 +1457,18 @@ type StepInfo struct { ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable) } -func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) { +type getWorkflowStepsInput struct { + workflowID string + loadOutput bool +} + +func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error) { query := fmt.Sprintf(`SELECT function_id, function_name, output, error, child_workflow_id FROM %s.operation_outputs WHERE workflow_uuid = $1 ORDER BY function_id ASC`, pgx.Identifier{s.schema}.Sanitize()) - rows, err := s.pool.Query(ctx, query, workflowID) + rows, err := s.pool.Query(ctx, query, input.workflowID) if err != nil { return nil, fmt.Errorf("failed to query workflow steps: %w", err) } @@ -1481,8 +1486,8 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]Step return nil, fmt.Errorf("failed to scan step row: %w", err) } - // Deserialize output if present - if outputString != nil { + // Deserialize output if present and loadOutput is true + if input.loadOutput && outputString != nil { output, err := deserialize(outputString) if err != nil { return nil, fmt.Errorf("failed to deserialize output: %w", err) diff --git a/dbos/workflow.go b/dbos/workflow.go index f62c0be..c590dc8 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1956,14 +1956,25 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat } func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) { + var loadOutput bool + if c.launched.Load() { + loadOutput = true + } else { + loadOutput = false + } + getWorkflowStepsInput := getWorkflowStepsInput{ + workflowID: workflowID, + loadOutput: loadOutput, + } + workflowState, ok := c.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { return RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) { - return c.systemDB.getWorkflowSteps(ctx, workflowID) + return c.systemDB.getWorkflowSteps(ctx, getWorkflowStepsInput) }, WithStepName("DBOS.getWorkflowSteps")) } else { - return c.systemDB.getWorkflowSteps(c, workflowID) + return c.systemDB.getWorkflowSteps(c, getWorkflowStepsInput) } } From 173c850be561ab0e18dfdc3204620d01c4de882c Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 15:01:49 -0700 Subject: [PATCH 2/6] pass around launched when creating child contexts --- dbos/dbos.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbos/dbos.go b/dbos/dbos.go index aa72ec5..334e056 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -194,6 +194,9 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext { } // Will do nothing if the concrete type is not dbosContext if dbosCtx, ok := ctx.(*dbosContext); ok { + launched := dbosCtx.launched.Load() + childCtxLaunched := atomic.Bool{} + childCtxLaunched.Store(launched) return &dbosContext{ ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set logger: dbosCtx.logger, @@ -204,6 +207,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + launched: childCtxLaunched, } } return nil @@ -217,6 +221,11 @@ func WithoutCancel(ctx DBOSContext) DBOSContext { return nil } if dbosCtx, ok := ctx.(*dbosContext); ok { + launched := dbosCtx.launched.Load() + childCtxLaunched := atomic.Bool{} + childCtxLaunched.Store(launched) + // Create a new context that is not canceled when the parent is canceled + // but retains all other values return &dbosContext{ ctx: context.WithoutCancel(dbosCtx.ctx), logger: dbosCtx.logger, @@ -227,6 +236,7 @@ func WithoutCancel(ctx DBOSContext) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + launched: childCtxLaunched, } } return nil @@ -240,6 +250,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C return nil, func() {} } if dbosCtx, ok := ctx.(*dbosContext); ok { + launched := dbosCtx.launched.Load() + childCtxLaunched := atomic.Bool{} + childCtxLaunched.Store(launched) newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout")) return &dbosContext{ ctx: newCtx, @@ -251,6 +264,7 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + launched: childCtxLaunched, }, cancelFunc } return nil, func() {} From 910cdd5de69223c51fed55c70e793e5ccbefaf47 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 15:02:23 -0700 Subject: [PATCH 3/6] launch ctx in tests where needed --- dbos/serialization_test.go | 12 +- dbos/workflows_test.go | 399 +++++++++++++++++++------------------ 2 files changed, 209 insertions(+), 202 deletions(-) diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index fb896a5..22418ef 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -11,15 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -/** Test serialization and deserialization -[x] Built in types -[x] User defined types (structs) -[x] Workflow inputs/outputs -[x] Step inputs/outputs -[x] Direct handlers, polling handler, list workflows results, get step infos -[x] Set/get event with user defined types -*/ - // Builtin types func encodingStepBuiltinTypes(_ context.Context, input int) (int, error) { return input, errors.New("step error") @@ -76,6 +67,9 @@ func TestWorkflowEncoding(t *testing.T) { RegisterWorkflow(executor, encodingWorkflowBuiltinTypes) RegisterWorkflow(executor, encodingWorkflowStruct) + err := Launch(executor) + require.NoError(t, err) + t.Run("BuiltinTypes", func(t *testing.T) { // Test a workflow that uses a built-in type (string) directHandle, err := RunWorkflow(executor, encodingWorkflowBuiltinTypes, "test") diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 44ea377..730f157 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -590,6 +590,105 @@ func TestSteps(t *testing.T) { RegisterWorkflow(dbosCtx, stepRetryWorkflow) RegisterWorkflow(dbosCtx, testStepWf1) RegisterWorkflow(dbosCtx, testStepWf2) + // Create a workflow that uses custom step names + customNameWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + // Run a step with a custom name + result1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return "custom-step-1-result", nil + }, WithStepName("MyCustomStep1")) + if err != nil { + return "", err + } + + // Run another step with a different custom name + result2, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return "custom-step-2-result", nil + }, WithStepName("MyCustomStep2")) + if err != nil { + return "", err + } + + return result1 + "-" + result2, nil + } + + RegisterWorkflow(dbosCtx, customNameWorkflow) + + // Define user-defined types for testing serialization + type StepInput struct { + Name string `json:"name"` + Count int `json:"count"` + Active bool `json:"active"` + Metadata map[string]string `json:"metadata"` + CreatedAt time.Time `json:"created_at"` + } + + type StepOutput struct { + ProcessedName string `json:"processed_name"` + TotalCount int `json:"total_count"` + Success bool `json:"success"` + ProcessedAt time.Time `json:"processed_at"` + Details []string `json:"details"` + } + + // Create a step function that accepts StepInput and returns StepOutput + processUserObjectStep := func(_ context.Context, input StepInput) (StepOutput, error) { + // Process the input and create output + output := StepOutput{ + ProcessedName: fmt.Sprintf("Processed_%s", input.Name), + TotalCount: input.Count * 2, + Success: input.Active, + ProcessedAt: time.Now(), + Details: []string{"step1", "step2", "step3"}, + } + + // Verify input was correctly deserialized + if input.Metadata == nil { + return StepOutput{}, fmt.Errorf("metadata map was not properly deserialized") + } + + return output, nil + } + + // Create a workflow that uses the step with user-defined objects + userObjectWorkflow := func(dbosCtx DBOSContext, workflowInput string) (string, error) { + // Create input for the step + stepInput := StepInput{ + Name: workflowInput, + Count: 42, + Active: true, + Metadata: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + CreatedAt: time.Now(), + } + + // Run the step with user-defined input and output + output, err := RunAsStep(dbosCtx, func(ctx context.Context) (StepOutput, error) { + return processUserObjectStep(ctx, stepInput) + }) + if err != nil { + return "", fmt.Errorf("step failed: %w", err) + } + + // Verify the output was correctly returned + if output.ProcessedName == "" { + return "", fmt.Errorf("output ProcessedName is empty") + } + if output.TotalCount != 84 { + return "", fmt.Errorf("expected TotalCount to be 84, got %d", output.TotalCount) + } + if len(output.Details) != 3 { + return "", fmt.Errorf("expected 3 details, got %d", len(output.Details)) + } + + return "", nil + } + // Register the workflow + RegisterWorkflow(dbosCtx, userObjectWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS") t.Run("StepsMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run a step outside of a workflow context @@ -699,28 +798,6 @@ func TestSteps(t *testing.T) { }) t.Run("customStepNames", func(t *testing.T) { - // Create a workflow that uses custom step names - customNameWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { - // Run a step with a custom name - result1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { - return "custom-step-1-result", nil - }, WithStepName("MyCustomStep1")) - if err != nil { - return "", err - } - - // Run another step with a different custom name - result2, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { - return "custom-step-2-result", nil - }, WithStepName("MyCustomStep2")) - if err != nil { - return "", err - } - - return result1 + "-" + result2, nil - } - - RegisterWorkflow(dbosCtx, customNameWorkflow) // Execute the workflow handle, err := RunWorkflow(dbosCtx, customNameWorkflow, "test-input") @@ -745,80 +822,6 @@ func TestSteps(t *testing.T) { }) t.Run("stepsOutputEncoding", func(t *testing.T) { - // Define user-defined types for testing serialization - type StepInput struct { - Name string `json:"name"` - Count int `json:"count"` - Active bool `json:"active"` - Metadata map[string]string `json:"metadata"` - CreatedAt time.Time `json:"created_at"` - } - - type StepOutput struct { - ProcessedName string `json:"processed_name"` - TotalCount int `json:"total_count"` - Success bool `json:"success"` - ProcessedAt time.Time `json:"processed_at"` - Details []string `json:"details"` - } - - // Create a step function that accepts StepInput and returns StepOutput - processUserObjectStep := func(_ context.Context, input StepInput) (StepOutput, error) { - // Process the input and create output - output := StepOutput{ - ProcessedName: fmt.Sprintf("Processed_%s", input.Name), - TotalCount: input.Count * 2, - Success: input.Active, - ProcessedAt: time.Now(), - Details: []string{"step1", "step2", "step3"}, - } - - // Verify input was correctly deserialized - if input.Metadata == nil { - return StepOutput{}, fmt.Errorf("metadata map was not properly deserialized") - } - - return output, nil - } - - // Create a workflow that uses the step with user-defined objects - userObjectWorkflow := func(dbosCtx DBOSContext, workflowInput string) (string, error) { - // Create input for the step - stepInput := StepInput{ - Name: workflowInput, - Count: 42, - Active: true, - Metadata: map[string]string{ - "key1": "value1", - "key2": "value2", - }, - CreatedAt: time.Now(), - } - - // Run the step with user-defined input and output - output, err := RunAsStep(dbosCtx, func(ctx context.Context) (StepOutput, error) { - return processUserObjectStep(ctx, stepInput) - }) - if err != nil { - return "", fmt.Errorf("step failed: %w", err) - } - - // Verify the output was correctly returned - if output.ProcessedName == "" { - return "", fmt.Errorf("output ProcessedName is empty") - } - if output.TotalCount != 84 { - return "", fmt.Errorf("expected TotalCount to be 84, got %d", output.TotalCount) - } - if len(output.Details) != 3 { - return "", fmt.Errorf("expected 3 details, got %d", len(output.Details)) - } - - return "", nil - } - - // Register the workflow - RegisterWorkflow(dbosCtx, userObjectWorkflow) // Execute the workflow handle, err := RunWorkflow(dbosCtx, userObjectWorkflow, "TestObject") @@ -861,8 +864,8 @@ func TestChildWorkflow(t *testing.T) { } // Create child workflows with executor - childWf := func(dbosCtx DBOSContext, input Inheritance) (string, error) { - workflowID, err := GetWorkflowID(dbosCtx) + childWf := func(ctx DBOSContext, input Inheritance) (string, error) { + workflowID, err := GetWorkflowID(ctx) if err != nil { return "", fmt.Errorf("failed to get workflow ID: %w", err) } @@ -871,7 +874,7 @@ func TestChildWorkflow(t *testing.T) { return "", fmt.Errorf("expected childWf workflow ID to be %s, got %s", expectedCurrentID, workflowID) } // Steps of a child workflow start with an incremented step ID, because the first step ID is allocated to the child workflow - return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(ctx, func(ctx context.Context) (string, error) { return simpleStep(ctx) }) } @@ -1031,41 +1034,115 @@ func TestChildWorkflow(t *testing.T) { } RegisterWorkflow(dbosCtx, grandParentWf) - t.Run("ChildWorkflowIDGeneration", func(t *testing.T) { - r := 3 - h, err := RunWorkflow(dbosCtx, grandParentWf, r) - require.NoError(t, err, "failed to execute grand parent workflow") - _, err = h.GetResult() - require.NoError(t, err, "failed to get result from grand parent workflow") - }) + // Register workflows needed for ChildWorkflowWithCustomID test + simpleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return simpleStep(ctx) + }) + } + RegisterWorkflow(dbosCtx, simpleChildWf) - t.Run("ChildWorkflowWithCustomID", func(t *testing.T) { - customChildID := uuid.NewString() + // Register workflows needed for RecoveredChildWorkflowPollingHandle test + var pollingHandleCompleteEvent *Event + pollingHandleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { + // Wait if event is set + if pollingHandleCompleteEvent != nil { + pollingHandleCompleteEvent.Wait() + } + return input + "-result", nil + } + RegisterWorkflow(dbosCtx, pollingHandleChildWf) - simpleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { - return simpleStep(ctx) - }) + var pollingCounter int + var pollingHandleStartEvent *Event + pollingHandleParentWf := func(ctx DBOSContext, input string) (string, error) { + pollingCounter++ + + // Run child workflow with a known ID + childHandle, err := RunWorkflow(ctx, pollingHandleChildWf, "child-input", WithWorkflowID("known-child-workflow-id")) + if err != nil { + return "", fmt.Errorf("failed to run child workflow: %w", err) } - RegisterWorkflow(dbosCtx, simpleChildWf) - // Simple parent that starts one child with a custom workflow ID - parentWf := func(ctx DBOSContext, input string) (string, error) { - childHandle, err := RunWorkflow(ctx, simpleChildWf, "test-child-input", WithWorkflowID(customChildID)) - if err != nil { - return "", fmt.Errorf("failed to run child workflow: %w", err) + switch pollingCounter { + case 1: + // First handle will be a direct handle + _, ok := childHandle.(*workflowHandle[string]) + if !ok { + return "", fmt.Errorf("expected child handle to be of type workflowDirectHandle, got %T", childHandle) + } + // Signal the child workflow is started + if pollingHandleStartEvent != nil { + pollingHandleStartEvent.Set() } result, err := childHandle.GetResult() if err != nil { return "", fmt.Errorf("failed to get result from child workflow: %w", err) } - return result, nil + case 2: + // Second handle will be a polling handle + _, ok := childHandle.(*workflowPollingHandle[string]) + if !ok { + return "", fmt.Errorf("expected recovered child handle to be of type workflowPollingHandle, got %T", childHandle) + } + } + return "", nil + } + RegisterWorkflow(dbosCtx, pollingHandleParentWf) + + // Register workflows needed for ChildWorkflowCannotBeSpawnedFromStep test + childWfForStepTest := func(dbosCtx DBOSContext, input string) (string, error) { + return "child-result", nil + } + RegisterWorkflow(dbosCtx, childWfForStepTest) + + parentWfForStepTest := func(ctx DBOSContext, input string) (string, error) { + return RunAsStep(ctx, func(context context.Context) (string, error) { + dbosCtx := context.(DBOSContext) + _, err := RunWorkflow(dbosCtx, childWfForStepTest, input) + if err != nil { + return "", err + } + return "should-not-reach", nil + }) + } + RegisterWorkflow(dbosCtx, parentWfForStepTest) + // Simple parent that starts one child with a custom workflow ID + simpleParentWf := func(ctx DBOSContext, customChildID string) (string, error) { + childHandle, err := RunWorkflow(ctx, simpleChildWf, "test-child-input", WithWorkflowID(customChildID)) + if err != nil { + return "", fmt.Errorf("failed to run child workflow: %w", err) + } + + result, err := childHandle.GetResult() + if err != nil { + return "", fmt.Errorf("failed to get result from child workflow: %w", err) } - RegisterWorkflow(dbosCtx, parentWf) - parentHandle, err := RunWorkflow(dbosCtx, parentWf, "test-input") + return result, nil + } + + RegisterWorkflow(dbosCtx, simpleParentWf) + + // Launch the context once for all subtests + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS") + + t.Run("ChildWorkflowIDGeneration", func(t *testing.T) { + + r := 3 + h, err := RunWorkflow(dbosCtx, grandParentWf, r) + require.NoError(t, err, "failed to execute grand parent workflow") + _, err = h.GetResult() + require.NoError(t, err, "failed to get result from grand parent workflow") + }) + + t.Run("ChildWorkflowWithCustomID", func(t *testing.T) { + customChildID := uuid.NewString() + + parentHandle, err := RunWorkflow(dbosCtx, simpleParentWf, customChildID) require.NoError(t, err, "failed to start parent workflow") result, err := parentHandle.GetResult() @@ -1089,55 +1166,12 @@ func TestChildWorkflow(t *testing.T) { }) t.Run("RecoveredChildWorkflowPollingHandle", func(t *testing.T) { - pollingHandleStartEvent := NewEvent() - pollingHandleCompleteEvent := NewEvent() + // Reset counter and set up events for this test + pollingCounter = 0 + pollingHandleStartEvent = NewEvent() + pollingHandleCompleteEvent = NewEvent() knownChildID := "known-child-workflow-id" knownParentID := "known-parent-workflow-id" - counter := 0 - - // Simple child workflow that returns a result - pollingHandleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { - // Wait - pollingHandleCompleteEvent.Wait() - return input + "-result", nil - } - RegisterWorkflow(dbosCtx, pollingHandleChildWf) - - pollingHandleParentWf := func(ctx DBOSContext, input string) (string, error) { - counter++ - - // Run child workflow with a known ID - childHandle, err := RunWorkflow(ctx, pollingHandleChildWf, "child-input", WithWorkflowID(knownChildID)) - if err != nil { - return "", fmt.Errorf("failed to run child workflow: %w", err) - } - - switch counter { - case 1: - // First handle will be a direct handle - _, ok := childHandle.(*workflowHandle[string]) - if !ok { - return "", fmt.Errorf("expected child handle to be of type workflowDirectHandle, got %T", childHandle) - } - // Signal the child workflow is started - pollingHandleStartEvent.Set() - - result, err := childHandle.GetResult() - if err != nil { - return "", fmt.Errorf("failed to get result from child workflow: %w", err) - } - return result, nil - case 2: - // Second handle will be a polling handle - _, ok := childHandle.(*workflowPollingHandle[string]) - if !ok { - return "", fmt.Errorf("expected recovered child handle to be of type workflowPollingHandle, got %T", childHandle) - } - } - return "", nil - } - - RegisterWorkflow(dbosCtx, pollingHandleParentWf) // Execute parent workflow - it will block after starting the child parentHandle, err := RunWorkflow(dbosCtx, pollingHandleParentWf, "parent-input", WithWorkflowID(knownParentID)) @@ -1175,32 +1209,8 @@ func TestChildWorkflow(t *testing.T) { }) t.Run("ChildWorkflowCannotBeSpawnedFromStep", func(t *testing.T) { - // Child workflow for testing - childWf := func(dbosCtx DBOSContext, input string) (string, error) { - return "child-result", nil - } - RegisterWorkflow(dbosCtx, childWf) - - // Step that tries to spawn a child workflow - this should fail - stepThatSpawnsChild := func(ctx context.Context, input string) (string, error) { - dbosCtx := ctx.(DBOSContext) - _, err := RunWorkflow(dbosCtx, childWf, input) - if err != nil { - return "", err - } - return "should-not-reach", nil - } - - // Workflow that calls the step - parentWf := func(ctx DBOSContext, input string) (string, error) { - return RunAsStep(ctx, func(context context.Context) (string, error) { - return stepThatSpawnsChild(context, input) - }) - } - RegisterWorkflow(dbosCtx, parentWf) - // Execute the workflow - should fail when step tries to spawn child workflow - handle, err := RunWorkflow(dbosCtx, parentWf, "test-input") + handle, err := RunWorkflow(dbosCtx, parentWfForStepTest, "test-input") require.NoError(t, err, "failed to start parent workflow") // Expect the workflow to fail @@ -1299,6 +1309,9 @@ func TestWorkflowRecovery(t *testing.T) { RegisterWorkflow(dbosCtx, recoveryWorkflow) + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS") + t.Run("WorkflowRecovery", func(t *testing.T) { const numWorkflows = 5 From 49c76969a1f8c5b5329456256b0c3c45fbcf18c4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 15:04:57 -0700 Subject: [PATCH 4/6] nit --- dbos/client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index cfa9a88..cca7fa8 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -942,7 +942,7 @@ func TestGetWorkflowSteps(t *testing.T) { } testWorkflow := func(ctx DBOSContext, input string) (string, error) { - result, err := RunAsStep(ctx, stepFunction) + result, err := RunAsStep(ctx, stepFunction, WithStepName("TestStep")) if err != nil { return "", err } @@ -984,7 +984,7 @@ func TestGetWorkflowSteps(t *testing.T) { step := stepsWithOutput[0] assert.Equal(t, 0, step.StepID, "expected step ID to be 0") - assert.NotEmpty(t, step.StepName, "expected step name to be set") + assert.Equal(t, "TestStep", step.StepName, "expected step name to be set") assert.Nil(t, step.Error, "expected no error in step") assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID") From df58a0788022aaa61ffa66335a5628203ed06a01 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 15:38:39 -0700 Subject: [PATCH 5/6] fix --- dbos/dbos.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 334e056..f9aa829 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -195,9 +195,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext { // Will do nothing if the concrete type is not dbosContext if dbosCtx, ok := ctx.(*dbosContext); ok { launched := dbosCtx.launched.Load() - childCtxLaunched := atomic.Bool{} - childCtxLaunched.Store(launched) - return &dbosContext{ + childCtx := &dbosContext{ ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set logger: dbosCtx.logger, systemDB: dbosCtx.systemDB, @@ -207,8 +205,9 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, - launched: childCtxLaunched, } + childCtx.launched.Store(launched) + return childCtx } return nil } @@ -222,11 +221,9 @@ func WithoutCancel(ctx DBOSContext) DBOSContext { } if dbosCtx, ok := ctx.(*dbosContext); ok { launched := dbosCtx.launched.Load() - childCtxLaunched := atomic.Bool{} - childCtxLaunched.Store(launched) // Create a new context that is not canceled when the parent is canceled // but retains all other values - return &dbosContext{ + childCtx := &dbosContext{ ctx: context.WithoutCancel(dbosCtx.ctx), logger: dbosCtx.logger, systemDB: dbosCtx.systemDB, @@ -236,8 +233,9 @@ func WithoutCancel(ctx DBOSContext) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, - launched: childCtxLaunched, } + childCtx.launched.Store(launched) + return childCtx } return nil } @@ -251,10 +249,8 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C } if dbosCtx, ok := ctx.(*dbosContext); ok { launched := dbosCtx.launched.Load() - childCtxLaunched := atomic.Bool{} - childCtxLaunched.Store(launched) newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout")) - return &dbosContext{ + childCtx := &dbosContext{ ctx: newCtx, logger: dbosCtx.logger, systemDB: dbosCtx.systemDB, @@ -264,8 +260,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, - launched: childCtxLaunched, - }, cancelFunc + } + childCtx.launched.Store(launched) + return childCtx, cancelFunc } return nil, func() {} } From b5eac4e1098abc59cd76855cc9d94b7e156bfad1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 2 Oct 2025 15:52:47 -0700 Subject: [PATCH 6/6] flaky test --- dbos/workflows_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 730f157..56b8cf8 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -3075,7 +3075,7 @@ func TestWorkflowTimeout(t *testing.T) { t.Run("WorkflowWithStepTimeout", func(t *testing.T) { // Start a workflow that will run a step that triggers cancellation - cancelCtx, cancelFunc := WithTimeout(dbosCtx, 1*time.Millisecond) + cancelCtx, cancelFunc := WithTimeout(dbosCtx, 100*time.Millisecond) defer cancelFunc() // Ensure we clean up the context handle, err := RunWorkflow(cancelCtx, waitForCancelWorkflowWithStep, "wf-with-step-timeout") require.NoError(t, err, "failed to start workflow with step timeout")