Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
run: go install gotest.tools/gotestsum@latest

- name: Run tests
run: go vet ./... && gotestsum --format github-action -- -race ./...
run: go vet ./... && gotestsum --format github-action -- -race -v -count=1 ./...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This disables "caching" for the tests...

working-directory: ./dbos
env:
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22
Expand Down
2 changes: 1 addition & 1 deletion dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestEnqueue(t *testing.T) {
require.Error(t, err, "expected timeout error, but got none")

dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
require.True(t, ok, "expected error to be of type *DBOSError, got %T (%v)", err, err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code)

Expand Down
4 changes: 3 additions & 1 deletion dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DBOSContext interface {
Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
Expand Down Expand Up @@ -363,6 +363,8 @@ func (c *dbosContext) Launch() error {
}
if len(recoveryHandles) > 0 {
c.logger.Info("Recovered pending workflows", "count", len(recoveryHandles))
} else {
c.logger.Info("No pending workflows to recover")
}

c.logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)
Expand Down
87 changes: 63 additions & 24 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestWorkflowQueues(t *testing.T) {

// Check that the workflow hits DLQ after re-running max retries
handles := make([]WorkflowHandle[any], 0, dlqMaxRetries+1)
for i := range dlqMaxRetries {
for range dlqMaxRetries {
recoveryHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
require.NoError(t, err, "failed to recover pending workflows")
assert.Len(t, recoveryHandles, 1, "expected 1 handle")
Expand All @@ -223,10 +224,25 @@ func TestWorkflowQueues(t *testing.T) {
handles = append(handles, handle)
status, err := handle.GetStatus()
require.NoError(t, err, "failed to get status of recovered workflow handle")
if i == dlqMaxRetries {
// On the last retry, the workflow should be in DLQ
assert.Equal(t, WorkflowStatusRetriesExceeded, status.Status, "expected workflow status to be %s", WorkflowStatusRetriesExceeded)
assert.Equal(t, WorkflowStatusPending, status.Status, "expected workflow to be in PENDING status after recovery")
}

dlqHandle, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
require.NoError(t, err, "failed to recover pending workflows")
assert.Len(t, dlqHandle, 1, "expected 1 handle in DLQ")
retries := 0
for {
dlqStatus, err := dlqHandle[0].GetStatus()
require.NoError(t, err, "failed to get status of DLQ workflow handle")
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
time.Sleep(1 * time.Second) // Wait a bit before checking again
retries++
continue
}
require.NoError(t, err, "failed to get status of DLQ workflow handle")
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
handles = append(handles, dlqHandle[0])
break
Comment on lines +226 to +244
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were not actually testing the DLQ behavior in this test -- now we do

}

// Check the workflow completes
Expand Down Expand Up @@ -258,8 +274,8 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")

// Check that it's the correct error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, ConflictingWorkflowError, dbosErr.Code, "expected error code to be ConflictingWorkflowError")

Expand Down Expand Up @@ -301,8 +317,8 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")

// Check that it's the correct error type and message
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")

expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID)
Expand Down Expand Up @@ -416,14 +432,14 @@ func TestQueueRecovery(t *testing.T) {
castedResult, ok := result.([]int)
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
expectedResult := []int{0, 1, 2, 3, 4}
assert.True(t, equal(castedResult, expectedResult), "expected result %v, got %v", expectedResult, castedResult)
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
}
}

result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from original handle")
expectedResult := []int{0, 1, 2, 3, 4}
assert.True(t, equal(result, expectedResult), "expected result %v, got %v", expectedResult, result)
assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)

assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to be %d", queuedSteps*2)

Expand All @@ -432,7 +448,7 @@ func TestQueueRecovery(t *testing.T) {
require.NoError(t, err, "failed to rerun workflow")
rerunResult, err := rerunHandle.GetResult()
require.NoError(t, err, "failed to get result from rerun handle")
assert.True(t, equal(rerunResult, expectedResult), "expected result %v, got %v", expectedResult, rerunResult)
assert.Equal(t, expectedResult, rerunResult, "expected result %v, got %v", expectedResult, rerunResult)

assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to remain %d", queuedSteps*2)

Expand Down Expand Up @@ -794,9 +810,7 @@ func TestQueueTimeouts(t *testing.T) {
queuedWaitForCancelWorkflow := func(ctx DBOSContext, _ string) (string, error) {
// This workflow will wait indefinitely until it is cancelled
<-ctx.Done()
if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) {
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
}
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
return "", ctx.Err()
}
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
Expand All @@ -808,8 +822,8 @@ func TestQueueTimeouts(t *testing.T) {
// Workflow should get AwaitedWorkflowCancelled DBOSError
_, err = handle.GetResult()
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

// enqueud workflow should have been cancelled
Expand Down Expand Up @@ -877,8 +891,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand All @@ -905,8 +919,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand Down Expand Up @@ -934,8 +948,8 @@ func TestQueueTimeouts(t *testing.T) {
require.Error(t, err, "expected error but got none")

// Check the error type
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")

Expand Down Expand Up @@ -1051,6 +1065,7 @@ func TestPriorityQueue(t *testing.T) {
require.NoError(t, err)
wfHandles = append(wfHandles, handle6)

time.Sleep(10 * time.Millisecond) // Avoid collisions in created_at...
handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name))
require.NoError(t, err)
wfHandles = append(wfHandles, handle7)
Expand All @@ -1066,9 +1081,33 @@ func TestPriorityQueue(t *testing.T) {
}

mu.Lock()
expectedOrder := []int{0, 6, 7, 1, 2, 3, 4, 5}
assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList)
// Check if the expected order is either {0, 6, 7, ...} or {0, 7, 6, ...}
// This is because while tasks are dequeued in order, they can run asynchronously
// and one could set a value in wfPriorityList before the other
expectedOrder1 := []int{0, 6, 7, 1, 2, 3, 4, 5}
expectedOrder2 := []int{0, 7, 6, 1, 2, 3, 4, 5}

validOrder := false
if reflect.DeepEqual(wfPriorityList, expectedOrder1) {
validOrder = true
} else if reflect.DeepEqual(wfPriorityList, expectedOrder2) {
validOrder = true
}

assert.True(t, validOrder, "expected workflow execution order to be either %v or %v, got %v",
expectedOrder1, expectedOrder2, wfPriorityList)
mu.Unlock()

// Verify that handle6 and handle7 workflows were dequeued in FIFO order
// by checking that their StartedAt time is in the correct order (6 is before 7)
status6, err := handle6.GetStatus()
require.NoError(t, err, "failed to get status for workflow 6")
status7, err := handle7.GetStatus()
require.NoError(t, err, "failed to get status for workflow 7")

assert.True(t, status6.StartedAt.Before(status7.StartedAt),
"expected workflow 6 to be dequeued before workflow 7, but got 6 started at %v (created at %v) and 7 started at %v (created at %v)",
status6.StartedAt, status6.CreatedAt, status7.StartedAt, status7.CreatedAt)

require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
}
5 changes: 2 additions & 3 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
input.status.ExecutorID,
applicationVersion,
input.status.ApplicationID,
input.status.CreatedAt.UnixMilli(),
input.status.CreatedAt.Round(time.Millisecond).UnixMilli(), // slightly reduce the likelihood of collisions
attempts,
updatedAt.UnixMilli(),
timeoutMs,
Expand Down Expand Up @@ -1882,7 +1882,6 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
}

// First check the rate limiter
startTimeMs := time.Now().UnixMilli()
var numRecentQueries int
if input.queue.RateLimit != nil {
limiterPeriod := time.Duration(input.queue.RateLimit.Period * float64(time.Second))
Expand Down Expand Up @@ -2064,7 +2063,7 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
WorkflowStatusPending,
input.applicationVersion,
input.executorID,
startTimeMs,
time.Now().UnixMilli(),
id).Scan(&retWorkflow.name, &inputString)

if inputString != nil && len(*inputString) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
}
}()

typedHandle := newWorkflowHandle[R](handle.dbosContext, handle.workflowID, typedOutcomeChan)
typedHandle := newWorkflowHandle(handle.dbosContext, handle.workflowID, typedOutcomeChan)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary because inferred from typedOutcomeChan


return typedHandle, nil
}
Expand Down
Loading
Loading