Skip to content

Commit 991f060

Browse files
committed
fix timeouts: set timeout on dequeue + handle sub millisecond timouts + handle negative timeouts
1 parent e2c38a9 commit 991f060

File tree

4 files changed

+78
-25
lines changed

4 files changed

+78
-25
lines changed

dbos/client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,6 @@ func TestCancelResume(t *testing.T) {
473473
// Verify the deadline was reset (should be different from original)
474474
assert.False(t, resumeStatus.Deadline.Equal(originalDeadline), "expected deadline to be reset after resume, but it remained the same: %v", originalDeadline)
475475

476-
// The new deadline should be after resumeStart + workflowTimeout
477-
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
478-
assert.True(t, resumeStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)
479-
480476
// Wait for the workflow to complete
481477
_, err = resumeHandle.GetResult()
482478
require.Error(t, err, "expected timeout error, but got none")
@@ -491,6 +487,10 @@ func TestCancelResume(t *testing.T) {
491487
finalStatus, err := resumeHandle.GetStatus()
492488
require.NoError(t, err, "failed to get final workflow status")
493489

490+
// The new deadline should have been set after resumeStart + workflowTimeout
491+
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
492+
assert.True(t, finalStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)
493+
494494
assert.Equal(t, WorkflowStatusCancelled, finalStatus.Status, "expected final workflow status to be CANCELLED")
495495

496496
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after cancel/resume timeout test")

dbos/queues_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,18 @@ func TestQueueTimeouts(t *testing.T) {
850850
RegisterWorkflow(dbosCtx, detachedWorkflow)
851851
RegisterWorkflow(dbosCtx, enqueuedWorkflowEnqueuesADetachedWorkflow)
852852

853+
timeoutOnDequeueQueue := NewWorkflowQueue(dbosCtx, "timeout-on-dequeue-queue", WithGlobalConcurrency(1))
854+
blockingEvent := NewEvent()
855+
blockingWorkflow := func(ctx DBOSContext, _ string) (string, error) {
856+
blockingEvent.Wait()
857+
return "blocking-done", nil
858+
}
859+
RegisterWorkflow(dbosCtx, blockingWorkflow)
860+
fastWorkflow := func(ctx DBOSContext, _ string) (string, error) {
861+
return "done", nil
862+
}
863+
RegisterWorkflow(dbosCtx, fastWorkflow)
864+
853865
dbosCtx.Launch()
854866

855867
t.Run("EnqueueWorkflowTimeout", func(t *testing.T) {
@@ -936,6 +948,46 @@ func TestQueueTimeouts(t *testing.T) {
936948

937949
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not")
938950
})
951+
952+
t.Run("TimeoutOnlySetOnDequeue", func(t *testing.T) {
953+
// Test that deadline is only set when workflow is dequeued, not when enqueued
954+
955+
// Enqueue blocking workflow first
956+
blockingHandle, err := RunAsWorkflow(dbosCtx, blockingWorkflow, "blocking", WithQueue(timeoutOnDequeueQueue.Name))
957+
require.NoError(t, err, "failed to enqueue blocking workflow")
958+
959+
// Set a timeout that would expire if set on enqueue
960+
timeout := 2 * time.Second
961+
timeoutCtx, cancelFunc := WithTimeout(dbosCtx, timeout)
962+
defer cancelFunc()
963+
964+
// Enqueue second workflow with timeout
965+
handle, err := RunAsWorkflow(timeoutCtx, fastWorkflow, "timeout-test", WithQueue(timeoutOnDequeueQueue.Name))
966+
require.NoError(t, err, "failed to enqueue timeout workflow")
967+
968+
// Sleep for duration exceeding the timeout
969+
time.Sleep(timeout * 2)
970+
971+
// Signal the blocking workflow to complete
972+
blockingEvent.Set()
973+
974+
// Wait for blocking workflow to complete
975+
blockingResult, err := blockingHandle.GetResult()
976+
require.NoError(t, err, "failed to get result from blocking workflow")
977+
assert.Equal(t, "blocking-done", blockingResult, "expected blocking workflow result")
978+
979+
// Now the second workflow should dequeue and complete successfully (timeout should be much longer than execution time)
980+
// Note: this might be flaky if we hit the dequeue interval 1sec boundary...
981+
_, err = handle.GetResult()
982+
require.NoError(t, err, "unexpected error from workflow")
983+
984+
// Check the workflow status: should be success
985+
finalStatus, err := handle.GetStatus()
986+
require.NoError(t, err, "failed to get final status of timeout workflow")
987+
assert.Equal(t, WorkflowStatusSuccess, finalStatus.Status, "expected timeout workflow status to be WorkflowStatusSuccess")
988+
989+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after test")
990+
})
939991
}
940992

941993
func TestPriorityQueue(t *testing.T) {

dbos/system_database.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
307307

308308
var timeoutMs *int64 = nil
309309
if input.status.Timeout > 0 {
310-
millis := input.status.Timeout.Milliseconds()
310+
millis := input.status.Timeout.Round(time.Millisecond).Milliseconds()
311311
timeoutMs = &millis
312312
}
313313

@@ -787,25 +787,17 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
787787
return nil // Workflow is complete, do nothing
788788
}
789789

790-
// If the original workflow has a timeout, let's recompute a deadline
791-
var deadline *int64 = nil
792-
if wf.Timeout > 0 {
793-
deadlineMs := time.Now().Add(wf.Timeout).UnixMilli()
794-
deadline = &deadlineMs
795-
}
796-
797790
// Set the workflow's status to ENQUEUED and clear its recovery attempts, set new deadline
798791
updateStatusQuery := `UPDATE dbos.workflow_status
799792
SET status = $1, queue_name = $2, recovery_attempts = $3,
800-
workflow_deadline_epoch_ms = $4, deduplication_id = NULL,
801-
started_at_epoch_ms = NULL, updated_at = $5
802-
WHERE workflow_uuid = $6`
793+
workflow_deadline_epoch_ms = NULL, deduplication_id = NULL,
794+
started_at_epoch_ms = NULL, updated_at = $4
795+
WHERE workflow_uuid = $5`
803796

804797
_, err = tx.Exec(ctx, updateStatusQuery,
805798
WorkflowStatusEnqueued,
806799
_DBOS_INTERNAL_QUEUE_NAME,
807800
0,
808-
deadline,
809801
time.Now().UnixMilli(),
810802
workflowID)
811803
if err != nil {

dbos/workflow.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -643,21 +643,22 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
643643
status = WorkflowStatusPending
644644
}
645645

646-
// Check if the user-provided context has a deadline
646+
// Compute the timeout based on the context deadline, if any
647647
deadline, ok := c.Deadline()
648648
if !ok {
649649
deadline = time.Time{} // No deadline set
650650
}
651-
652-
// Compute the timeout based on the deadline
653651
var timeout time.Duration
654652
if !deadline.IsZero() {
655653
timeout = time.Until(deadline)
656-
/* unclear to me if this is a real use case:
654+
// The timeout could be in the past, for small deadlines, to propagation delays. If so set it to a minimal value
657655
if timeout < 0 {
658-
return nil, newWorkflowExecutionError(workflowID, "deadline is in the past")
656+
timeout = 1 * time.Millisecond
659657
}
660-
*/
658+
}
659+
// When enqueuing, we do not set a deadline. It'll be computed with the timeout during dequeue.
660+
if status == WorkflowStatusEnqueued {
661+
deadline = time.Time{} // No deadline for enqueued workflows
661662
}
662663

663664
if params.priority > uint(math.MaxInt) {
@@ -737,11 +738,19 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
737738

738739
workflowCtx := WithValue(c, workflowStateKey, wfState)
739740

740-
// If the workflow has a durable deadline, set it in the context.
741+
// If the workflow has a timeout but no deadline, compute the deadline from the timeout.
742+
// Else use the durable deadline.
743+
durableDeadline := time.Time{}
744+
if insertStatusResult.timeout > 0 && insertStatusResult.workflowDeadline.IsZero() {
745+
durableDeadline = time.Now().Add(insertStatusResult.timeout)
746+
} else if !insertStatusResult.workflowDeadline.IsZero() {
747+
durableDeadline = insertStatusResult.workflowDeadline
748+
}
749+
741750
var stopFunc func() bool
742751
cancelFuncCompleted := make(chan struct{})
743-
if !insertStatusResult.workflowDeadline.IsZero() {
744-
workflowCtx, _ = WithTimeout(workflowCtx, time.Until(insertStatusResult.workflowDeadline))
752+
if !durableDeadline.IsZero() {
753+
workflowCtx, _ = WithTimeout(workflowCtx, time.Until(durableDeadline))
745754
// Register a cancel function that cancels the workflow in the DB as soon as the context is cancelled
746755
dbosCancelFunction := func() {
747756
c.logger.Info("Cancelling workflow", "workflow_id", workflowID)

0 commit comments

Comments
 (0)