Skip to content

Commit 0659870

Browse files
committed
handle DBOS operations as special, idempotent steps when done within workflows
1 parent 468e579 commit 0659870

File tree

5 files changed

+532
-39
lines changed

5 files changed

+532
-39
lines changed

dbos/dbos.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
318318
// Register types we serialize with gob
319319
var t time.Time
320320
gob.Register(t)
321+
var ws []WorkflowStatus
322+
gob.Register(ws)
323+
var si []StepInfo
324+
gob.Register(si)
321325

322326
// Initialize global variables from processed config (already handles env vars and defaults)
323327
initExecutor.applicationVersion = config.ApplicationVersion

dbos/queues_test.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -926,23 +926,17 @@ func TestQueueTimeouts(t *testing.T) {
926926
}
927927
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
928928

929-
enqueuedWorkflowEnqueuesATimeoutWorkflow := func(ctx DBOSContext, _ string) (string, error) {
929+
enqueuedWorkflowEnqueuesATimeoutWorkflow := func(ctx DBOSContext, childWorkflowID string) (string, error) {
930930
// This workflow will enqueue a workflow that waits indefinitely until it is cancelled
931-
handle, err := RunWorkflow(ctx, queuedWaitForCancelWorkflow, "enqueued-wait-for-cancel", WithQueue(timeoutQueue.Name))
931+
handle, err := RunWorkflow(ctx, queuedWaitForCancelWorkflow, "enqueued-wait-for-cancel", WithQueue(timeoutQueue.Name), WithWorkflowID(childWorkflowID))
932932
require.NoError(t, err, "failed to start enqueued wait for cancel workflow")
933933
// Workflow should get AwaitedWorkflowCancelled DBOSError
934934
_, err = handle.GetResult()
935935
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
936936
var dbosErr *DBOSError
937937
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
938938
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
939-
940-
// enqueud workflow should have been cancelled
941-
status, err := handle.GetStatus()
942-
require.NoError(t, err, "failed to get status of enqueued workflow")
943-
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected enqueued workflow status to be WorkflowStatusCancelled")
944-
945-
return "should-never-see-this", nil
939+
return "", nil
946940
}
947941
RegisterWorkflow(dbosCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow)
948942

@@ -956,19 +950,20 @@ func TestQueueTimeouts(t *testing.T) {
956950
}
957951

958952
enqueuedWorkflowEnqueuesADetachedWorkflow := func(ctx DBOSContext, timeout time.Duration) (string, error) {
953+
myId, err := GetWorkflowID(ctx)
954+
if err != nil {
955+
return "", fmt.Errorf("failed to get workflow ID: %v", err)
956+
}
957+
childID := fmt.Sprintf("%s-child", myId)
959958
// This workflow will enqueue a workflow that is not cancelable
960959
childCtx := WithoutCancel(ctx)
961-
handle, err := RunWorkflow(childCtx, detachedWorkflow, timeout*2, WithQueue(timeoutQueue.Name))
960+
handle, err := RunWorkflow(childCtx, detachedWorkflow, timeout*2, WithQueue(timeoutQueue.Name), WithWorkflowID(childID))
962961
require.NoError(t, err, "failed to start enqueued detached workflow")
963962

964963
// Wait for the enqueued workflow to complete
965964
result, err := handle.GetResult()
966965
require.NoError(t, err, "failed to get result from enqueued detached workflow")
967966
assert.Equal(t, "detached-workflow-completed", result, "expected result to be 'detached-workflow-completed'")
968-
// Check the workflow status: should be success
969-
status, err := handle.GetStatus()
970-
require.NoError(t, err, "failed to get enqueued detached workflow status")
971-
assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected enqueued detached workflow status to be WorkflowStatusSuccess")
972967
return result, nil
973968
}
974969

@@ -1022,7 +1017,8 @@ func TestQueueTimeouts(t *testing.T) {
10221017
cancelCtx, cancelFunc := WithTimeout(dbosCtx, 1*time.Millisecond)
10231018
defer cancelFunc() // Ensure we clean up the context
10241019

1025-
handle, err := RunWorkflow(cancelCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow, "enqueue-timeout-workflow", WithQueue(timeoutQueue.Name))
1020+
childWorkflowID := uuid.NewString()
1021+
handle, err := RunWorkflow(cancelCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow, childWorkflowID, WithQueue(timeoutQueue.Name))
10261022
require.NoError(t, err, "failed to start enqueued workflow")
10271023

10281024
// Wait for the workflow to complete and get the result
@@ -1042,6 +1038,19 @@ func TestQueueTimeouts(t *testing.T) {
10421038
require.NoError(t, err, "failed to get workflow status")
10431039
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
10441040

1041+
// Check the child's status: should be cancelled too
1042+
childHandle, err := RetrieveWorkflow[string](dbosCtx, childWorkflowID)
1043+
require.NoError(t, err, "failed to retrieve child workflow")
1044+
1045+
// Wait for the child workflow status to become cancelled
1046+
require.Eventually(t, func() bool {
1047+
status, err := childHandle.GetStatus()
1048+
if err != nil {
1049+
return false
1050+
}
1051+
return status.Status == WorkflowStatusCancelled
1052+
}, 5*time.Second, 100*time.Millisecond, "expected enqueued workflow status to be WorkflowStatusCancelled")
1053+
10451054
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not")
10461055
})
10471056

@@ -1071,6 +1080,20 @@ func TestQueueTimeouts(t *testing.T) {
10711080
require.NoError(t, err, "failed to get enqueued detached workflow status")
10721081
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected enqueued detached workflow status to be WorkflowStatusCancelled")
10731082

1083+
// Check the child's status: should be success because it is detached
1084+
childID := fmt.Sprintf("%s-child", handle.GetWorkflowID())
1085+
childHandle, err := RetrieveWorkflow[string](dbosCtx, childID)
1086+
require.NoError(t, err, "failed to retrieve detached workflow")
1087+
1088+
// Wait for the child workflow status to become success
1089+
require.Eventually(t, func() bool {
1090+
status, err := childHandle.GetStatus()
1091+
if err != nil {
1092+
return false
1093+
}
1094+
return status.Status == WorkflowStatusSuccess
1095+
}, 5*time.Second, 100*time.Millisecond, "expected detached workflow status to be WorkflowStatusSuccess")
1096+
10741097
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not")
10751098
})
10761099

0 commit comments

Comments
 (0)