Skip to content

Commit 6e1c5f5

Browse files
authored
Merge branch 'main' into admin-server-p2
2 parents dba359f + e5044c5 commit 6e1c5f5

File tree

7 files changed

+319
-836
lines changed

7 files changed

+319
-836
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
run: go install gotest.tools/gotestsum@latest
6262

6363
- name: Run tests
64-
run: go vet ./... && gotestsum --format github-action -- -race ./...
64+
run: go vet ./... && gotestsum --format github-action -- -race -v -count=1 ./...
6565
working-directory: ./dbos
6666
env:
6767
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22

dbos/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestEnqueue(t *testing.T) {
136136
require.Error(t, err, "expected timeout error, but got none")
137137

138138
dbosErr, ok := err.(*DBOSError)
139-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
139+
require.True(t, ok, "expected error to be of type *DBOSError, got %T (%v)", err, err)
140140

141141
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code)
142142

dbos/dbos.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type DBOSContext interface {
7575
Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources
7676

7777
// Workflow operations
78-
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
78+
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
7979
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
8080
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
8181
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
@@ -363,6 +363,8 @@ func (c *dbosContext) Launch() error {
363363
}
364364
if len(recoveryHandles) > 0 {
365365
c.logger.Info("Recovered pending workflows", "count", len(recoveryHandles))
366+
} else {
367+
c.logger.Info("No pending workflows to recover")
366368
}
367369

368370
c.logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)

dbos/queues_test.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func TestWorkflowQueues(t *testing.T) {
213213

214214
// Check that the workflow hits DLQ after re-running max retries
215215
handles := make([]WorkflowHandle[any], 0, dlqMaxRetries+1)
216-
for i := range dlqMaxRetries {
216+
for range dlqMaxRetries {
217217
recoveryHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
218218
require.NoError(t, err, "failed to recover pending workflows")
219219
assert.Len(t, recoveryHandles, 1, "expected 1 handle")
@@ -223,10 +223,25 @@ func TestWorkflowQueues(t *testing.T) {
223223
handles = append(handles, handle)
224224
status, err := handle.GetStatus()
225225
require.NoError(t, err, "failed to get status of recovered workflow handle")
226-
if i == dlqMaxRetries {
227-
// On the last retry, the workflow should be in DLQ
228-
assert.Equal(t, WorkflowStatusRetriesExceeded, status.Status, "expected workflow status to be %s", WorkflowStatusRetriesExceeded)
226+
assert.Equal(t, WorkflowStatusPending, status.Status, "expected workflow to be in PENDING status after recovery")
227+
}
228+
229+
dlqHandle, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
230+
require.NoError(t, err, "failed to recover pending workflows")
231+
assert.Len(t, dlqHandle, 1, "expected 1 handle in DLQ")
232+
retries := 0
233+
for {
234+
dlqStatus, err := dlqHandle[0].GetStatus()
235+
require.NoError(t, err, "failed to get status of DLQ workflow handle")
236+
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
237+
time.Sleep(1 * time.Second) // Wait a bit before checking again
238+
retries++
239+
continue
229240
}
241+
require.NoError(t, err, "failed to get status of DLQ workflow handle")
242+
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
243+
handles = append(handles, dlqHandle[0])
244+
break
230245
}
231246

232247
// Check the workflow completes
@@ -258,8 +273,8 @@ func TestWorkflowQueues(t *testing.T) {
258273
require.Error(t, err, "expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")
259274

260275
// Check that it's the correct error type
261-
dbosErr, ok := err.(*DBOSError)
262-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
276+
var dbosErr *DBOSError
277+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
263278

264279
assert.Equal(t, ConflictingWorkflowError, dbosErr.Code, "expected error code to be ConflictingWorkflowError")
265280

@@ -301,8 +316,8 @@ func TestWorkflowQueues(t *testing.T) {
301316
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")
302317

303318
// Check that it's the correct error type and message
304-
dbosErr, ok := err.(*DBOSError)
305-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
319+
var dbosErr *DBOSError
320+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
306321
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")
307322

308323
expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID)
@@ -416,14 +431,14 @@ func TestQueueRecovery(t *testing.T) {
416431
castedResult, ok := result.([]int)
417432
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
418433
expectedResult := []int{0, 1, 2, 3, 4}
419-
assert.True(t, equal(castedResult, expectedResult), "expected result %v, got %v", expectedResult, castedResult)
434+
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
420435
}
421436
}
422437

423438
result, err := handle.GetResult()
424439
require.NoError(t, err, "failed to get result from original handle")
425440
expectedResult := []int{0, 1, 2, 3, 4}
426-
assert.True(t, equal(result, expectedResult), "expected result %v, got %v", expectedResult, result)
441+
assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)
427442

428443
assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to be %d", queuedSteps*2)
429444

@@ -432,7 +447,7 @@ func TestQueueRecovery(t *testing.T) {
432447
require.NoError(t, err, "failed to rerun workflow")
433448
rerunResult, err := rerunHandle.GetResult()
434449
require.NoError(t, err, "failed to get result from rerun handle")
435-
assert.True(t, equal(rerunResult, expectedResult), "expected result %v, got %v", expectedResult, rerunResult)
450+
assert.Equal(t, expectedResult, rerunResult, "expected result %v, got %v", expectedResult, rerunResult)
436451

437452
assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to remain %d", queuedSteps*2)
438453

@@ -794,9 +809,7 @@ func TestQueueTimeouts(t *testing.T) {
794809
queuedWaitForCancelWorkflow := func(ctx DBOSContext, _ string) (string, error) {
795810
// This workflow will wait indefinitely until it is cancelled
796811
<-ctx.Done()
797-
if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) {
798-
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
799-
}
812+
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
800813
return "", ctx.Err()
801814
}
802815
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
@@ -808,8 +821,8 @@ func TestQueueTimeouts(t *testing.T) {
808821
// Workflow should get AwaitedWorkflowCancelled DBOSError
809822
_, err = handle.GetResult()
810823
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
811-
dbosErr, ok := err.(*DBOSError)
812-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
824+
var dbosErr *DBOSError
825+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
813826
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
814827

815828
// enqueud workflow should have been cancelled
@@ -877,8 +890,8 @@ func TestQueueTimeouts(t *testing.T) {
877890
require.Error(t, err, "expected error but got none")
878891

879892
// Check the error type
880-
dbosErr, ok := err.(*DBOSError)
881-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
893+
var dbosErr *DBOSError
894+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
882895

883896
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
884897

@@ -905,8 +918,8 @@ func TestQueueTimeouts(t *testing.T) {
905918
require.Error(t, err, "expected error but got none")
906919

907920
// Check the error type
908-
dbosErr, ok := err.(*DBOSError)
909-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
921+
var dbosErr *DBOSError
922+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
910923

911924
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
912925

@@ -934,8 +947,8 @@ func TestQueueTimeouts(t *testing.T) {
934947
require.Error(t, err, "expected error but got none")
935948

936949
// Check the error type
937-
dbosErr, ok := err.(*DBOSError)
938-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
950+
var dbosErr *DBOSError
951+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
939952

940953
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
941954

@@ -1051,6 +1064,7 @@ func TestPriorityQueue(t *testing.T) {
10511064
require.NoError(t, err)
10521065
wfHandles = append(wfHandles, handle6)
10531066

1067+
time.Sleep(10 * time.Millisecond) // Avoid collisions in created_at...
10541068
handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name))
10551069
require.NoError(t, err)
10561070
wfHandles = append(wfHandles, handle7)
@@ -1070,5 +1084,16 @@ func TestPriorityQueue(t *testing.T) {
10701084
assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList)
10711085
mu.Unlock()
10721086

1087+
// Verify that handle6 and handle7 workflows were dequeued in FIFO order
1088+
// by checking that their StartedAt time is in the correct order (6 is before 7)
1089+
status6, err := handle6.GetStatus()
1090+
require.NoError(t, err, "failed to get status for workflow 6")
1091+
status7, err := handle7.GetStatus()
1092+
require.NoError(t, err, "failed to get status for workflow 7")
1093+
1094+
assert.True(t, status6.StartedAt.Before(status7.StartedAt),
1095+
"expected workflow 6 to be dequeued before workflow 7, but got 6 started at %v (created at %v) and 7 started at %v (created at %v)",
1096+
status6.StartedAt, status6.CreatedAt, status7.StartedAt, status7.CreatedAt)
1097+
10731098
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
10741099
}

dbos/system_database.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
373373
input.status.ExecutorID,
374374
applicationVersion,
375375
input.status.ApplicationID,
376-
input.status.CreatedAt.UnixMilli(),
376+
input.status.CreatedAt.Round(time.Millisecond).UnixMilli(), // slightly reduce the likelihood of collisions
377377
attempts,
378378
updatedAt.UnixMilli(),
379379
timeoutMs,
@@ -1883,7 +1883,6 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
18831883
}
18841884

18851885
// First check the rate limiter
1886-
startTimeMs := time.Now().UnixMilli()
18871886
var numRecentQueries int
18881887
if input.queue.RateLimit != nil {
18891888
limiterPeriod := time.Duration(input.queue.RateLimit.Period * float64(time.Second))
@@ -2065,7 +2064,7 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
20652064
WorkflowStatusPending,
20662065
input.applicationVersion,
20672066
input.executorID,
2068-
startTimeMs,
2067+
time.Now().UnixMilli(),
20692068
id).Scan(&retWorkflow.name, &inputString)
20702069

20712070
if inputString != nil && len(*inputString) > 0 {

dbos/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
572572
}
573573
}()
574574

575-
typedHandle := newWorkflowHandle[R](handle.dbosContext, handle.workflowID, typedOutcomeChan)
575+
typedHandle := newWorkflowHandle(handle.dbosContext, handle.workflowID, typedOutcomeChan)
576576

577577
return typedHandle, nil
578578
}

0 commit comments

Comments
 (0)