Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type SystemDatabase interface {
Recv(ctx context.Context, input WorkflowRecvInput) (any, error)
SetEvent(ctx context.Context, input WorkflowSetEventInput) error
GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error)
Sleep(ctx context.Context, duration time.Duration) (time.Duration, error)
}

type systemDatabase struct {
Expand Down Expand Up @@ -712,10 +713,12 @@ func (s *systemDatabase) RecordOperationResult(ctx context.Context, input record
getLogger().Debug("RecordOperationResult SQL", "sql", commandTag.String())
*/

// TODO return DBOSWorkflowConflictIDError(result["workflow_uuid"]) on 23505 conflict ID error
if err != nil {
getLogger().Error("RecordOperationResult Error occurred", "error", err)
return fmt.Errorf("failed to record operation result: %w", err)
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
return newWorkflowConflictIDError(input.workflowID)
}
return err
}

if commandTag.RowsAffected() == 0 {
Expand Down Expand Up @@ -980,6 +983,86 @@ func (s *systemDatabase) GetWorkflowSteps(ctx context.Context, workflowID string
return steps, nil
}

// Sleep is a special type of step that sleeps for a specified duration
// A wakeup time is computed and recorded in the database
// If we sleep is re-executed, it will only sleep for the remaining duration until the wakeup time
func (s *systemDatabase) Sleep(ctx context.Context, duration time.Duration) (time.Duration, error) {
functionName := "DBOS.sleep"

// Get workflow state from context
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return 0, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
}

if wfState.isWithinStep {
return 0, newStepExecutionError(wfState.workflowID, functionName, "cannot call Sleep within a step")
}

stepID := wfState.NextStepID()

// Check if operation was already executed
checkInput := checkOperationExecutionDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
}
recordedResult, err := s.CheckOperationExecution(ctx, checkInput)
if err != nil {
return 0, fmt.Errorf("failed to check operation execution: %w", err)
}

var endTime time.Time

if recordedResult != nil {
if recordedResult.output == nil { // This should never happen
return 0, fmt.Errorf("no recorded end time for recorded sleep operation")
}

// The output should be a time.Time representing the end time
endTimeInterface, ok := recordedResult.output.(time.Time)
if !ok {
return 0, fmt.Errorf("recorded output is not a time.Time: %T", recordedResult.output)
}
endTime = endTimeInterface

if recordedResult.err != nil { // This should never happen
return 0, recordedResult.err
}
} else {
// First execution: calculate and record the end time
getLogger().Debug("Durable sleep", "stepID", stepID, "duration", duration)

endTime = time.Now().Add(duration)

// Record the operation result with the calculated end time
recordInput := recordOperationResultDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
output: endTime,
err: nil,
}

err = s.RecordOperationResult(ctx, recordInput)
if err != nil {
// Check if this is a ConflictingWorkflowError (operation already recorded by another process)
if dbosErr, ok := err.(*DBOSError); ok && dbosErr.Code == ConflictingIDError {
} else {
return 0, fmt.Errorf("failed to record sleep operation result: %w", err)
}
}
}

// Calculate remaining duration until wake up time
remainingDuration := max(0, time.Until(endTime))

// Actually sleep for the remaining duration
time.Sleep(remainingDuration)

return remainingDuration, nil
}

/****************************************/
/******* WORKFLOW COMMUNICATIONS ********/
/****************************************/
Expand Down
4 changes: 4 additions & 0 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ func GetEvent[R any](ctx context.Context, input WorkflowGetEventInput) (R, error
return typedValue, nil
}

func Sleep(ctx context.Context, duration time.Duration) (time.Duration, error) {
return dbos.systemDB.Sleep(ctx, duration)
}

/***********************************/
/******* WORKFLOW MANAGEMENT *******/
/***********************************/
Expand Down
102 changes: 100 additions & 2 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestSteps(t *testing.T) {
}

// Test the specific message from the 3rd argument
expectedMessagePart := "workflow state not found in context"
expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?"
if !strings.Contains(err.Error(), expectedMessagePart) {
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
}
Expand Down Expand Up @@ -1205,7 +1205,7 @@ func TestSendRecv(t *testing.T) {
}

// Test the specific message from the error
expectedMessagePart := "workflow state not found in context"
expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?"
if !strings.Contains(err.Error(), expectedMessagePart) {
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
}
Expand Down Expand Up @@ -1737,3 +1737,101 @@ func TestSetGetEvent(t *testing.T) {
}
})
}

var (
sleepRecoveryWf = WithWorkflow(sleepRecoveryWorkflow)
sleepStartEvent *Event
sleepStopEvent *Event
)

func sleepRecoveryWorkflow(ctx context.Context, duration time.Duration) (time.Duration, error) {
result, err := Sleep(ctx, duration)
if err != nil {
return 0, err
}
// Block after sleep so we can recover a pending workflow
sleepStartEvent.Set()
sleepStopEvent.Wait()
return result, nil
}

func TestSleep(t *testing.T) {
setupDBOS(t)

t.Run("SleepDurableRecovery", func(t *testing.T) {
sleepStartEvent = NewEvent()
sleepStopEvent = NewEvent()

// Start a workflow that sleeps for 2 seconds then blocks
sleepDuration := 2 * time.Second

handle, err := sleepRecoveryWf(context.Background(), sleepDuration)
if err != nil {
t.Fatalf("failed to start sleep recovery workflow: %v", err)
}

sleepStartEvent.Wait()
sleepStartEvent.Clear()

// Run the workflow again and check the return time was less than the durable sleep
startTime := time.Now()
_, err = sleepRecoveryWf(context.Background(), sleepDuration, WithWorkflowID(handle.GetWorkflowID()))
if err != nil {
t.Fatalf("failed to start second sleep recovery workflow: %v", err)
}

sleepStartEvent.Wait()
// Time elapsed should be at most the sleep duration
elapsed := time.Since(startTime)
if elapsed >= sleepDuration {
t.Fatalf("expected elapsed time to be less than %v, got %v", sleepDuration, elapsed)
}

// Verify the sleep step was recorded correctly
steps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID())
if err != nil {
t.Fatalf("failed to get workflow steps: %v", err)
}

if len(steps) != 1 {
t.Fatalf("expected 1 step (the sleep), got %d", len(steps))
}

step := steps[0]
if step.FunctionName != "DBOS.sleep" {
t.Fatalf("expected step name to be 'DBOS.sleep', got '%s'", step.FunctionName)
}

if step.Error != nil {
t.Fatalf("expected step to have no error, got %v", step.Error)
}

sleepStopEvent.Set()
})

t.Run("SleepCannotBeCalledOutsideWorkflow", func(t *testing.T) {
ctx := context.Background()

// Attempt to call Sleep outside of a workflow context
_, err := Sleep(ctx, 1*time.Second)
if err == nil {
t.Fatal("expected error when calling Sleep outside of workflow context, but got none")
}

// Check the error type
dbosErr, ok := err.(*DBOSError)
if !ok {
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
}

if dbosErr.Code != StepExecutionError {
t.Fatalf("expected error code to be StepExecutionError, got %v", dbosErr.Code)
}

// Test the specific message from the error
expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?"
if !strings.Contains(err.Error(), expectedMessagePart) {
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
}
})
}
Loading