Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
run: go install gotest.tools/gotestsum@latest

- name: Run tests
run: go vet ./... && gotestsum --format github-action -- -race ./...
run: go vet ./... && gotestsum --format github-action -- -race -v -count=1 ./...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This disables "caching" for the tests...

working-directory: ./dbos
env:
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22
Expand Down
2 changes: 1 addition & 1 deletion dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestEnqueue(t *testing.T) {
require.Error(t, err, "expected timeout error, but got none")

dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
require.True(t, ok, "expected error to be of type *DBOSError, got %T (%v)", err, err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code)

Expand Down
4 changes: 3 additions & 1 deletion dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DBOSContext interface {
Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
Expand Down Expand Up @@ -363,6 +363,8 @@ func (c *dbosContext) Launch() error {
}
if len(recoveryHandles) > 0 {
c.logger.Info("Recovered pending workflows", "count", len(recoveryHandles))
} else {
c.logger.Info("No pending workflows to recover")
}

c.logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)
Expand Down
69 changes: 47 additions & 22 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestWorkflowQueues(t *testing.T) {

// Check that the workflow hits DLQ after re-running max retries
handles := make([]WorkflowHandle[any], 0, dlqMaxRetries+1)
for i := range dlqMaxRetries {
for range dlqMaxRetries {
recoveryHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
require.NoError(t, err, "failed to recover pending workflows")
assert.Len(t, recoveryHandles, 1, "expected 1 handle")
Expand All @@ -223,10 +223,25 @@ func TestWorkflowQueues(t *testing.T) {
handles = append(handles, handle)
status, err := handle.GetStatus()
require.NoError(t, err, "failed to get status of recovered workflow handle")
if i == dlqMaxRetries {
// On the last retry, the workflow should be in DLQ
assert.Equal(t, WorkflowStatusRetriesExceeded, status.Status, "expected workflow status to be %s", WorkflowStatusRetriesExceeded)
assert.Equal(t, WorkflowStatusPending, status.Status, "expected workflow to be in PENDING status after recovery")
}

dlqHandle, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
require.NoError(t, err, "failed to recover pending workflows")
assert.Len(t, dlqHandle, 1, "expected 1 handle in DLQ")
retries := 0
for {
dlqStatus, err := dlqHandle[0].GetStatus()
require.NoError(t, err, "failed to get status of DLQ workflow handle")
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
time.Sleep(1 * time.Second) // Wait a bit before checking again
retries++
continue
}
require.NoError(t, err, "failed to get status of DLQ workflow handle")
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
handles = append(handles, dlqHandle[0])
break
Comment on lines +226 to +244
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were not actually testing the DLQ behavior in this test -- now we do

}

// Check the workflow completes
Expand Down Expand Up @@ -258,8 +273,8 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")

// Check that it's the correct error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, ConflictingWorkflowError, dbosErr.Code, "expected error code to be ConflictingWorkflowError")

Expand Down Expand Up @@ -301,8 +316,8 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")

// Check that it's the correct error type and message
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")

expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID)
Expand Down Expand Up @@ -416,14 +431,14 @@ func TestQueueRecovery(t *testing.T) {
castedResult, ok := result.([]int)
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
expectedResult := []int{0, 1, 2, 3, 4}
assert.True(t, equal(castedResult, expectedResult), "expected result %v, got %v", expectedResult, castedResult)
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
}
}

result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from original handle")
expectedResult := []int{0, 1, 2, 3, 4}
assert.True(t, equal(result, expectedResult), "expected result %v, got %v", expectedResult, result)
assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)

assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to be %d", queuedSteps*2)

Expand All @@ -432,7 +447,7 @@ func TestQueueRecovery(t *testing.T) {
require.NoError(t, err, "failed to rerun workflow")
rerunResult, err := rerunHandle.GetResult()
require.NoError(t, err, "failed to get result from rerun handle")
assert.True(t, equal(rerunResult, expectedResult), "expected result %v, got %v", expectedResult, rerunResult)
assert.Equal(t, expectedResult, rerunResult, "expected result %v, got %v", expectedResult, rerunResult)

assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to remain %d", queuedSteps*2)

Expand Down Expand Up @@ -794,9 +809,7 @@ func TestQueueTimeouts(t *testing.T) {
queuedWaitForCancelWorkflow := func(ctx DBOSContext, _ string) (string, error) {
// This workflow will wait indefinitely until it is cancelled
<-ctx.Done()
if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) {
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
}
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
return "", ctx.Err()
}
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
Expand All @@ -808,8 +821,8 @@ func TestQueueTimeouts(t *testing.T) {
// Workflow should get AwaitedWorkflowCancelled DBOSError
_, err = handle.GetResult()
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

// enqueud workflow should have been cancelled
Expand Down Expand Up @@ -877,8 +890,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand All @@ -905,8 +918,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand Down Expand Up @@ -934,8 +947,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand Down Expand Up @@ -1051,6 +1064,7 @@ func TestPriorityQueue(t *testing.T) {
require.NoError(t, err)
wfHandles = append(wfHandles, handle6)

time.Sleep(10 * time.Millisecond) // Avoid collisions in created_at...
handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name))
require.NoError(t, err)
wfHandles = append(wfHandles, handle7)
Expand All @@ -1070,5 +1084,16 @@ func TestPriorityQueue(t *testing.T) {
assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList)
mu.Unlock()

// Verify that handle6 and handle7 workflows were dequeued in FIFO order
// by checking that their StartedAt time is in the correct order (6 is before 7)
status6, err := handle6.GetStatus()
require.NoError(t, err, "failed to get status for workflow 6")
status7, err := handle7.GetStatus()
require.NoError(t, err, "failed to get status for workflow 7")

assert.True(t, status6.StartedAt.Before(status7.StartedAt),
"expected workflow 6 to be dequeued before workflow 7, but got 6 started at %v (created at %v) and 7 started at %v (created at %v)",
status6.StartedAt, status6.CreatedAt, status7.StartedAt, status7.CreatedAt)

require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
}
5 changes: 2 additions & 3 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
input.status.ExecutorID,
applicationVersion,
input.status.ApplicationID,
input.status.CreatedAt.UnixMilli(),
input.status.CreatedAt.Round(time.Millisecond).UnixMilli(), // slightly reduce the likelihood of collisions
attempts,
updatedAt.UnixMilli(),
timeoutMs,
Expand Down Expand Up @@ -1882,7 +1882,6 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
}

// First check the rate limiter
startTimeMs := time.Now().UnixMilli()
var numRecentQueries int
if input.queue.RateLimit != nil {
limiterPeriod := time.Duration(input.queue.RateLimit.Period * float64(time.Second))
Expand Down Expand Up @@ -2064,7 +2063,7 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
WorkflowStatusPending,
input.applicationVersion,
input.executorID,
startTimeMs,
time.Now().UnixMilli(),
id).Scan(&retWorkflow.name, &inputString)

if inputString != nil && len(*inputString) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
}
}()

typedHandle := newWorkflowHandle[R](handle.dbosContext, handle.workflowID, typedOutcomeChan)
typedHandle := newWorkflowHandle(handle.dbosContext, handle.workflowID, typedOutcomeChan)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary because inferred from typedOutcomeChan


return typedHandle, nil
}
Expand Down
Loading
Loading