Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"strings"
"sync/atomic"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ This suite tests
[x] worker concurrency (2 at a time across two "workers")
[x] worker concurrency X recovery
[x] rate limiter
[x] conflicting workflow on different queues
[] queue deduplication
[] queue priority
[x] queued workflow times out
Expand All @@ -48,6 +50,8 @@ func TestWorkflowQueues(t *testing.T) {

queue := NewWorkflowQueue(dbosCtx, "test-queue")
dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue")
conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1")
conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2")

dlqStartEvent := NewEvent()
dlqCompleteEvent := NewEvent()
Expand Down Expand Up @@ -172,14 +176,15 @@ func TestWorkflowQueues(t *testing.T) {
}
})

/* TODO: we will move queue registry in the new interface in a subsequent PR
t.Run("DynamicRegistration", func(t *testing.T) {
q := NewWorkflowQueue("dynamic-queue")
if len(q.name) > 0 {
t.Fatalf("expected nil queue for dynamic registration after DBOS initialization, got %v", q)
}
// Attempting to register a queue after launch should panic
defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic from queue registration after launch but got none")
}
}()
NewWorkflowQueue(dbosCtx, "dynamic-queue")
})
*/

t.Run("QueueWorkflowDLQ", func(t *testing.T) {
workflowID := "blocking-workflow-test"
Expand Down Expand Up @@ -255,6 +260,52 @@ func TestWorkflowQueues(t *testing.T) {
t.Fatal("expected queue entries to be cleaned up after successive enqueues test")
}
})

t.Run("ConflictingWorkflowOnDifferentQueues", func(t *testing.T) {
workflowID := "conflicting-workflow-id"

// Enqueue the same workflow ID on the first queue
handle, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID))
if err != nil {
t.Fatalf("failed to enqueue workflow on first queue: %v", err)
}

// Get the result from the first workflow to ensure it completes
result, err := handle.GetResult()
if err != nil {
t.Fatalf("failed to get result from first workflow: %v", err)
}
if result != "test-input-1" {
t.Fatalf("expected 'test-input-1', got %v", result)
}

// Now try to enqueue the same workflow ID on a different queue
// This should trigger a ConflictingWorkflowError
_, err = RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-2", WithQueue(conflictQueue2.Name), WithWorkflowID(workflowID))
if err == nil {
t.Fatal("expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")
}

// Check that it's the correct error type
dbosErr, ok := err.(*DBOSError)
if !ok {
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
}

if dbosErr.Code != ConflictingWorkflowError {
t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code)
}

// Check that the error message contains queue information
expectedMsgPart := "Workflow already exists in a different queue"
if !strings.Contains(err.Error(), expectedMsgPart) {
t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error())
}

if !queueEntriesAreCleanedUp(dbosCtx) {
t.Fatal("expected queue entries to be cleaned up after conflicting workflow test")
}
})
}

func TestQueueRecovery(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *systemDatabase) InsertWorkflowStatus(ctx context.Context, input insertW
return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists with a different name: %s, but the provided name is: %s", result.name, input.status.Name))
}
if len(input.status.QueueName) > 0 && result.queueName != nil && input.status.QueueName != *result.queueName {
s.logger.Warn("Queue name conflict for workflow", "workflow_id", input.status.ID, "result_queue", *result.queueName, "status_queue", input.status.QueueName)
return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists in a different queue: %s, but the provided queue is: %s", *result.queueName, input.status.QueueName))
}

// Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1.
Expand Down
44 changes: 33 additions & 11 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"reflect"
"runtime"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -204,10 +205,11 @@ type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption
type workflowRegistryEntry struct {
wrappedFunction WrappedWorkflowFunc
maxRetries int
name string
}

// Register adds a workflow function to the registry (thread-safe, only once per name)
func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) {
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here? What's the difference between FQN and customName?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the tests seem to use neither?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FQN is the reflection name, which we get with runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name().

The FQN is the only information we get at runtime when a user calls RunAsWorkflow.

A user can set a custom name when registering a workflow. There are now tests exercising this path.

The custom name is stored in the registry, alongside other registration-time parameters like maxRetries.

RunAsWorkflow resolve the workflow name at runtime when looking up registration options (from the registry.)

// Skip if we don't have a concrete dbosContext
c, ok := ctx.(*dbosContext)
if !ok {
Expand All @@ -221,14 +223,15 @@ func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFu
c.workflowRegMutex.Lock()
defer c.workflowRegMutex.Unlock()

if _, exists := c.workflowRegistry[workflowName]; exists {
c.logger.Error("workflow function already registered", "fqn", workflowName)
panic(newConflictingRegistrationError(workflowName))
if _, exists := c.workflowRegistry[workflowFQN]; exists {
c.logger.Error("workflow function already registered", "fqn", workflowFQN)
panic(newConflictingRegistrationError(workflowFQN))
}

c.workflowRegistry[workflowName] = workflowRegistryEntry{
c.workflowRegistry[workflowFQN] = workflowRegistryEntry{
wrappedFunction: fn,
maxRetries: maxRetries,
name: customName,
}
}

Expand Down Expand Up @@ -274,6 +277,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
type workflowRegistrationParams struct {
cronSchedule string
maxRetries int
name string
}

type workflowRegistrationOption func(*workflowRegistrationParams)
Expand All @@ -294,6 +298,12 @@ func WithSchedule(schedule string) workflowRegistrationOption {
}
}

func WithWorkflowName(name string) workflowRegistrationOption {
return func(p *workflowRegistrationParams) {
p.name = name
}
}

// RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry
// If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it
// RegisterWorkflow is generically typed, providing compile-time type checking and allowing us to register the workflow input and output types for gob encoding
Expand Down Expand Up @@ -347,7 +357,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
}
return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it
})
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries)
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name)

// If this is a scheduled workflow, register a cron job
if registrationParams.cronSchedule != "" {
Expand Down Expand Up @@ -397,6 +407,7 @@ func WithApplicationVersion(version string) WorkflowOption {
}
}

// An internal option we use to map the reflection function name to the registration options.
func withWorkflowName(name string) WorkflowOption {
return func(p *workflowParams) {
p.workflowName = name
Expand Down Expand Up @@ -484,6 +495,9 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
if registeredWorkflow.maxRetries > 0 {
params.maxRetries = registeredWorkflow.maxRetries
}
if len(registeredWorkflow.name) > 0 {
params.workflowName = registeredWorkflow.name
}

// Check if we are within a workflow (and thus a child workflow)
parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState)
Expand Down Expand Up @@ -705,7 +719,12 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
BackoffFactor: 2.0,
BaseInterval: 100 * time.Millisecond, // Default base interval
MaxInterval: 5 * time.Second, // Default max interval
StepName: typeErasedStepNameToStepName[stepName],
StepName: func() string {
if value, ok := typeErasedStepNameToStepName.Load(stepName); ok {
return value.(string)
}
return "" // This should never happen
}(),
}
}

Expand All @@ -719,15 +738,17 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
if params.MaxInterval == 0 {
params.MaxInterval = 5 * time.Second // Default max interval
}
if params.StepName == "" {
if len(params.StepName) == 0 {
// If the step name is not provided, use the function name
params.StepName = typeErasedStepNameToStepName[stepName]
if value, ok := typeErasedStepNameToStepName.Load(stepName); ok {
params.StepName = value.(string)
}
}

return params
}

var typeErasedStepNameToStepName = make(map[string]string)
var typeErasedStepNameToStepName sync.Map
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map should be read-mostly. The first invocation of a step will store the reflection name.


func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) {
if ctx == nil {
Expand All @@ -742,7 +763,8 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) {

// Type-erase the function
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName
typeErasedFnName := runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()
typeErasedStepNameToStepName.LoadOrStore(typeErasedFnName, stepName)

// Call the executor method and pass through the result/error
result, err := ctx.RunAsStep(ctx, typeErasedFn)
Expand Down
Loading
Loading