Skip to content

Commit 82faad3

Browse files
committed
fork
1 parent 3b82d40 commit 82faad3

File tree

4 files changed

+516
-5
lines changed

4 files changed

+516
-5
lines changed

dbos/client_test.go

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dbos
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"testing"
78
"time"
89
)
@@ -381,3 +382,253 @@ func TestCancelResume(t *testing.T) {
381382
}
382383
})
383384
}
385+
386+
func TestForkWorkflow(t *testing.T) {
387+
// Global counters for tracking execution (no mutex needed since workflows run solo)
388+
var (
389+
stepCount1 int
390+
stepCount2 int
391+
child1Count int
392+
child2Count int
393+
)
394+
395+
// Setup server context - this will process tasks
396+
serverCtx := setupDBOS(t, true, true)
397+
398+
// Create queue for communication between client and server
399+
queue := NewWorkflowQueue(serverCtx, "fork-workflow-queue")
400+
401+
// Simple child workflows (no steps, just increment counters)
402+
childWorkflow1 := func(ctx DBOSContext, input string) (string, error) {
403+
child1Count++
404+
return "child1-" + input, nil
405+
}
406+
RegisterWorkflow(serverCtx, childWorkflow1, WithWorkflowName("ChildWorkflow1"))
407+
408+
childWorkflow2 := func(ctx DBOSContext, input string) (string, error) {
409+
child2Count++
410+
return "child2-" + input, nil
411+
}
412+
RegisterWorkflow(serverCtx, childWorkflow2, WithWorkflowName("ChildWorkflow2"))
413+
414+
// Parent workflow with 2 steps and 2 child workflows
415+
parentWorkflow := func(ctx DBOSContext, input string) (string, error) {
416+
// Step 1
417+
step1Result, err := RunAsStep(ctx, func(ctx context.Context) (string, error) {
418+
stepCount1++
419+
return "step1-" + input, nil
420+
})
421+
if err != nil {
422+
return "", err
423+
}
424+
425+
// Child workflow 1
426+
child1Handle, err := RunAsWorkflow(ctx, childWorkflow1, input)
427+
if err != nil {
428+
return "", err
429+
}
430+
child1Result, err := child1Handle.GetResult()
431+
if err != nil {
432+
return "", err
433+
}
434+
435+
// Step 2
436+
step2Result, err := RunAsStep(ctx, func(ctx context.Context) (string, error) {
437+
stepCount2++
438+
return "step2-" + input, nil
439+
})
440+
if err != nil {
441+
return "", err
442+
}
443+
444+
// Child workflow 2
445+
child2Handle, err := RunAsWorkflow(ctx, childWorkflow2, input)
446+
if err != nil {
447+
return "", err
448+
}
449+
child2Result, err := child2Handle.GetResult()
450+
if err != nil {
451+
return "", err
452+
}
453+
454+
return step1Result + "+" + step2Result + "+" + child1Result + "+" + child2Result, nil
455+
}
456+
RegisterWorkflow(serverCtx, parentWorkflow, WithWorkflowName("ParentWorkflow"))
457+
458+
// Launch the server context to start processing tasks
459+
err := serverCtx.Launch()
460+
if err != nil {
461+
t.Fatalf("failed to launch server DBOS instance: %v", err)
462+
}
463+
464+
// Setup client context
465+
clientCtx := setupDBOS(t, false, false)
466+
467+
t.Run("ForkAtAllSteps", func(t *testing.T) {
468+
// Reset counters
469+
stepCount1, stepCount2, child1Count, child2Count = 0, 0, 0, 0
470+
471+
originalWorkflowID := "original-workflow-fork-test"
472+
473+
// 1. Run the entire workflow first and check counters are 1
474+
handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
475+
WorkflowName: "ParentWorkflow",
476+
QueueName: queue.Name,
477+
WorkflowID: originalWorkflowID,
478+
WorkflowInput: "test",
479+
ApplicationVersion: serverCtx.GetApplicationVersion(),
480+
})
481+
if err != nil {
482+
t.Fatalf("failed to enqueue original workflow: %v", err)
483+
}
484+
485+
// Wait for the original workflow to complete
486+
result, err := handle.GetResult()
487+
if err != nil {
488+
t.Fatalf("failed to get result from original workflow: %v", err)
489+
}
490+
491+
expectedResult := "step1-test+step2-test+child1-test+child2-test"
492+
if result != expectedResult {
493+
t.Fatalf("expected result to be '%s', got '%s'", expectedResult, result)
494+
}
495+
496+
// Verify all counters are 1 after original workflow
497+
if stepCount1 != 1 || stepCount2 != 1 || child1Count != 1 || child2Count != 1 {
498+
t.Fatalf("expected counters to be (step1:1, step2:1, child1:1, child2:1), got (step1:%d, step2:%d, child1:%d, child2:%d)", stepCount1, stepCount2, child1Count, child2Count)
499+
}
500+
501+
// 2. Fork from each step 1 to 6 and verify results
502+
// Note: there's 6 steps: 2 steps 2 children and 2 GetResults
503+
for step := 1; step <= 6; step++ {
504+
t.Logf("Forking at step %d", step)
505+
506+
customForkedWorkflowID := fmt.Sprintf("forked-workflow-step-%d", step)
507+
forkedHandle, err := ForkWorkflow[string](clientCtx, originalWorkflowID, WithForkWorkflowID(customForkedWorkflowID), WithForkStartStep(uint(step-1)))
508+
if err != nil {
509+
t.Fatalf("failed to fork workflow at step %d: %v", step, err)
510+
}
511+
512+
forkedWorkflowID := forkedHandle.GetWorkflowID()
513+
if forkedWorkflowID != customForkedWorkflowID {
514+
t.Fatalf("expected forked workflow ID to be '%s', got '%s'", customForkedWorkflowID, forkedWorkflowID)
515+
}
516+
517+
forkedResult, err := forkedHandle.GetResult()
518+
if err != nil {
519+
t.Fatalf("failed to get result from forked workflow at step %d: %v", step, err)
520+
}
521+
522+
// 1) Verify workflow result is correct
523+
if forkedResult != expectedResult {
524+
t.Fatalf("forked workflow at step %d: expected result '%s', got '%s'", step, expectedResult, forkedResult)
525+
}
526+
527+
// 2) Verify counters are at expected totals based on the step where we're forking
528+
t.Logf("Step %d: actual counters - step1:%d, step2:%d, child1:%d, child2:%d", step, stepCount1, stepCount2, child1Count, child2Count)
529+
530+
// First step is executed only once
531+
if stepCount1 != 1+1 {
532+
t.Fatalf("forked workflow at step %d: step1 counter should be 2, got %d", step, stepCount1)
533+
}
534+
535+
// First child will be executed twice
536+
if step < 3 {
537+
if child1Count != 1+step {
538+
t.Fatalf("forked workflow at step %d: child1 counter should be %d, got %d", step, 1+step, child1Count)
539+
}
540+
} else {
541+
if child1Count != 1+2 {
542+
t.Fatalf("forked workflow at step %d: child2 counter should be 3, got %d", step, child1Count)
543+
}
544+
}
545+
546+
// Second step (in reality step 4) will be executed 4 times
547+
if step < 5 {
548+
if stepCount2 != 1+step {
549+
t.Fatalf("forked workflow at step %d: step2 counter should be %d, got %d", step, 1+step, stepCount2)
550+
}
551+
} else {
552+
if stepCount2 != 1+4 {
553+
t.Fatalf("forked workflow at step %d: step2 counter should be 5, got %d", step, stepCount2)
554+
}
555+
}
556+
557+
// Second child will be executed 5 times
558+
if step < 6 {
559+
if child2Count != 1+step {
560+
t.Fatalf("forked workflow at step %d: child2 counter should be %d, got %d", step, 1+step, child2Count)
561+
}
562+
} else {
563+
if child2Count != 1+5 {
564+
t.Fatalf("forked workflow at step %d: child2 counter should be 6, got %d", step, child2Count)
565+
}
566+
}
567+
568+
t.Logf("Step %d: all counter totals verified correctly", step)
569+
}
570+
571+
t.Logf("Final counters after all forks - steps:%d, child1:%d, child2:%d", stepCount1, child1Count, child2Count)
572+
})
573+
574+
t.Run("ForkNonExistentWorkflow", func(t *testing.T) {
575+
nonExistentWorkflowID := "non-existent-workflow-for-fork"
576+
577+
// Try to fork a non-existent workflow
578+
_, err := clientCtx.ForkWorkflow(clientCtx, nonExistentWorkflowID, WithForkStartStep(1))
579+
if err == nil {
580+
t.Fatal("expected error when forking non-existent workflow, but got none")
581+
}
582+
583+
// Verify error type
584+
dbosErr, ok := err.(*DBOSError)
585+
if !ok {
586+
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
587+
}
588+
589+
if dbosErr.Code != NonExistentWorkflowError {
590+
t.Fatalf("expected error code to be NonExistentWorkflowError, got %v", dbosErr.Code)
591+
}
592+
593+
if dbosErr.DestinationID != nonExistentWorkflowID {
594+
t.Fatalf("expected DestinationID to be %s, got %s", nonExistentWorkflowID, dbosErr.DestinationID)
595+
}
596+
})
597+
598+
t.Run("ForkWithInvalidStep", func(t *testing.T) {
599+
originalWorkflowID := "original-workflow-invalid-step"
600+
601+
// Create an original workflow first
602+
handle, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
603+
WorkflowName: "ParentWorkflow",
604+
QueueName: queue.Name,
605+
WorkflowID: originalWorkflowID,
606+
WorkflowInput: "test",
607+
ApplicationVersion: serverCtx.GetApplicationVersion(),
608+
})
609+
if err != nil {
610+
t.Fatalf("failed to enqueue original workflow: %v", err)
611+
}
612+
613+
// Wait for completion
614+
_, err = handle.GetResult()
615+
if err != nil {
616+
t.Fatalf("failed to get result from original workflow: %v", err)
617+
}
618+
619+
// Try to fork at step 999 (beyond workflow's actual steps)
620+
_, err = clientCtx.ForkWorkflow(clientCtx, originalWorkflowID, WithForkStartStep(999))
621+
if err == nil {
622+
t.Fatal("expected error when forking at step 999, but got none")
623+
}
624+
// Verify the error message
625+
if !strings.Contains(err.Error(), "exceeds workflow's maximum step") {
626+
t.Fatalf("expected error message to contain 'exceeds workflow's maximum step', got: %v", err)
627+
}
628+
})
629+
630+
// Verify all queue entries are cleaned up
631+
if !queueEntriesAreCleanedUp(serverCtx) {
632+
t.Fatal("expected queue entries to be cleaned up after fork workflow tests")
633+
}
634+
}

dbos/dbos.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ type DBOSContext interface {
8484
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
8585

8686
// Workflow management
87-
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
88-
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
89-
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
90-
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
87+
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
88+
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
89+
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
90+
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
91+
ForkWorkflow(_ DBOSContext, originalWorkflowID string, opts ...ForkWorkflowOption) (WorkflowHandle[any], error) // Fork a workflow from a specific step
9192

9293
// Accessors
9394
GetApplicationVersion() string // Get the application version for this context

0 commit comments

Comments
 (0)