Skip to content

Commit c7ee7cc

Browse files
committed
fix a bug in recording dequeued workflows as steps of the parent
1 parent 79ef8ba commit c7ee7cc

File tree

2 files changed

+140
-10
lines changed

2 files changed

+140
-10
lines changed

dbos/queues_test.go

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"reflect"
9+
"runtime"
810
"sync"
911
"sync/atomic"
1012
"testing"
@@ -63,6 +65,29 @@ func TestWorkflowQueues(t *testing.T) {
6365
// Register workflows with dbosContext
6466
RegisterWorkflow(dbosCtx, queueWorkflow)
6567

68+
// Custom name workflows
69+
queueWorkflowCustomName := func(ctx DBOSContext, input string) (string, error) {
70+
return input, nil
71+
}
72+
RegisterWorkflow(dbosCtx, queueWorkflowCustomName, WithWorkflowName("custom-name"))
73+
74+
queueWorkflowCustomNameEnqueingAnotherCustomNameWorkflow := func(ctx DBOSContext, input string) (string, error) {
75+
// Start a child workflow
76+
childHandle, err := RunAsWorkflow(ctx, queueWorkflowCustomName, input+"-enqueued", WithQueue(queue.Name))
77+
if err != nil {
78+
return "", fmt.Errorf("failed to start child workflow: %v", err)
79+
}
80+
81+
// Get result from child workflow
82+
childResult, err := childHandle.GetResult()
83+
if err != nil {
84+
return "", fmt.Errorf("failed to get child result: %v", err)
85+
}
86+
87+
return childResult, nil
88+
}
89+
RegisterWorkflow(dbosCtx, queueWorkflowCustomNameEnqueingAnotherCustomNameWorkflow, WithWorkflowName("custom-name-enqueuing"))
90+
6691
// Queue deduplication test workflows
6792
var dedupWorkflowEvent *Event
6893
childWorkflow := func(ctx DBOSContext, var1 string) (string, error) {
@@ -133,6 +158,24 @@ func TestWorkflowQueues(t *testing.T) {
133158
}
134159
RegisterWorkflow(dbosCtx, enqueueWorkflowDLQ, WithMaxRetries(dlqMaxRetries))
135160

161+
// Create a workflow that enqueues another workflow to test step tracking
162+
workflowEnqueuesAnother := func(ctx DBOSContext, input string) (string, error) {
163+
// Enqueue a child workflow
164+
childHandle, err := RunAsWorkflow(ctx, queueWorkflow, input+"-child", WithQueue(queue.Name))
165+
if err != nil {
166+
return "", fmt.Errorf("failed to enqueue child workflow: %v", err)
167+
}
168+
169+
// Get result from the child workflow
170+
childResult, err := childHandle.GetResult()
171+
if err != nil {
172+
return "", fmt.Errorf("failed to get child result: %v", err)
173+
}
174+
175+
return childResult, nil
176+
}
177+
RegisterWorkflow(dbosCtx, workflowEnqueuesAnother)
178+
136179
err := dbosCtx.Launch()
137180
require.NoError(t, err)
138181

@@ -147,6 +190,26 @@ func TestWorkflowQueues(t *testing.T) {
147190
require.NoError(t, err)
148191
assert.Equal(t, "test-input", res)
149192

193+
// List steps: the workflow should have 1 step
194+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
195+
require.NoError(t, err)
196+
assert.Len(t, steps, 1)
197+
assert.Equal(t, 0, steps[0].StepID)
198+
199+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
200+
})
201+
202+
t.Run("EnqueueWorkflowCustomName", func(t *testing.T) {
203+
handle, err := RunAsWorkflow(dbosCtx, queueWorkflowCustomName, "test-input", WithQueue(queue.Name))
204+
require.NoError(t, err)
205+
206+
_, ok := handle.(*workflowPollingHandle[string])
207+
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", handle)
208+
209+
res, err := handle.GetResult()
210+
require.NoError(t, err)
211+
assert.Equal(t, "test-input", res)
212+
150213
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
151214
})
152215

@@ -161,10 +224,19 @@ func TestWorkflowQueues(t *testing.T) {
161224
expectedResult := "test-input-child"
162225
assert.Equal(t, expectedResult, res)
163226

227+
// List steps: the workflow should have 2 steps (Start the child and GetResult)
228+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
229+
require.NoError(t, err)
230+
assert.Len(t, steps, 2)
231+
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
232+
assert.Equal(t, 0, steps[0].StepID)
233+
assert.Equal(t, "DBOS.getResult", steps[1].StepName)
234+
assert.Equal(t, 1, steps[1].StepID)
235+
164236
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
165237
})
166238

167-
t.Run("WorkflowEnqueuesAnotherWorkflow", func(t *testing.T) {
239+
t.Run("WorkflowEnqueuesAnother", func(t *testing.T) {
168240
handle, err := RunAsWorkflow(dbosCtx, queueWorkflowThatEnqueues, "test-input", WithQueue(queue.Name))
169241
require.NoError(t, err)
170242

@@ -175,9 +247,67 @@ func TestWorkflowQueues(t *testing.T) {
175247
expectedResult := "test-input-enqueued"
176248
assert.Equal(t, expectedResult, res)
177249

250+
// List steps: the workflow should have 2 steps (Start the child and GetResult)
251+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
252+
require.NoError(t, err)
253+
assert.Len(t, steps, 2)
254+
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
255+
assert.Equal(t, 0, steps[0].StepID)
256+
assert.Equal(t, "DBOS.getResult", steps[1].StepName)
257+
assert.Equal(t, 1, steps[1].StepID)
258+
178259
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
179260
})
180261

262+
t.Run("CustomNameWorkflowEnqueuesAnotherCustomNameWorkflow", func(t *testing.T) {
263+
handle, err := RunAsWorkflow(dbosCtx, queueWorkflowCustomNameEnqueingAnotherCustomNameWorkflow, "test-input", WithQueue(queue.Name))
264+
require.NoError(t, err)
265+
266+
res, err := handle.GetResult()
267+
require.NoError(t, err)
268+
269+
// Expected result: enqueued workflow returns "test-input-enqueued"
270+
expectedResult := "test-input-enqueued"
271+
assert.Equal(t, expectedResult, res)
272+
273+
// List steps: the workflow should have 2 steps (Start the child and GetResult)
274+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
275+
require.NoError(t, err)
276+
assert.Len(t, steps, 2)
277+
assert.Equal(t, "custom-name", steps[0].StepName)
278+
assert.Equal(t, 0, steps[0].StepID)
279+
assert.Equal(t, "DBOS.getResult", steps[1].StepName)
280+
assert.Equal(t, 1, steps[1].StepID)
281+
282+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
283+
})
284+
285+
t.Run("EnqueuedWorkflowEnqueuesAnother", func(t *testing.T) {
286+
// Run the pre-registered workflow that enqueues another workflow
287+
// Enqueue the parent workflow to a queue
288+
handle, err := RunAsWorkflow(dbosCtx, workflowEnqueuesAnother, "test-input", WithQueue(queue.Name))
289+
require.NoError(t, err)
290+
291+
res, err := handle.GetResult()
292+
require.NoError(t, err)
293+
294+
// Expected result: child workflow returns "test-input-child"
295+
expectedResult := "test-input-child"
296+
assert.Equal(t, expectedResult, res)
297+
298+
// Check that the parent workflow (the one we ran directly) has 2 steps:
299+
// one for enqueueing the child and one for calling GetResult
300+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
301+
require.NoError(t, err)
302+
assert.Len(t, steps, 2)
303+
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
304+
assert.Equal(t, 0, steps[0].StepID)
305+
assert.Equal(t, "DBOS.getResult", steps[1].StepName)
306+
assert.Equal(t, 1, steps[1].StepID)
307+
308+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow enqueues another workflow test")
309+
})
310+
181311
t.Run("DynamicRegistration", func(t *testing.T) {
182312
// Attempting to register a queue after launch should panic
183313
defer func() {

dbos/workflow.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -710,15 +710,6 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
710710
return nil, err
711711
}
712712

713-
// Return a polling handle if: we are enqueueing, the workflow is already in a terminal state (success or error),
714-
if len(params.queueName) > 0 || insertStatusResult.status == WorkflowStatusSuccess || insertStatusResult.status == WorkflowStatusError {
715-
// Commit the transaction to update the number of attempts and/or enact the enqueue
716-
if err := tx.Commit(uncancellableCtx); err != nil {
717-
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to commit transaction: %v", err))
718-
}
719-
return newWorkflowPollingHandle[any](uncancellableCtx, workflowStatus.ID), nil
720-
}
721-
722713
// Record child workflow relationship if this is a child workflow
723714
if isChildWorkflow {
724715
// Get the step ID that was used for generating the child workflow ID
@@ -736,6 +727,15 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
736727
}
737728
}
738729

730+
// Return a polling handle if: we are enqueueing, the workflow is already in a terminal state (success or error),
731+
if len(params.queueName) > 0 || insertStatusResult.status == WorkflowStatusSuccess || insertStatusResult.status == WorkflowStatusError {
732+
// Commit the transaction to update the number of attempts and/or enact the enqueue
733+
if err := tx.Commit(uncancellableCtx); err != nil {
734+
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to commit transaction: %v", err))
735+
}
736+
return newWorkflowPollingHandle[any](uncancellableCtx, workflowStatus.ID), nil
737+
}
738+
739739
// Channel to receive the outcome from the goroutine
740740
// The buffer size of 1 allows the goroutine to send the outcome without blocking
741741
// In addition it allows the channel to be garbage collected

0 commit comments

Comments
 (0)