Skip to content

Commit 855c07e

Browse files
committed
cancellable get result
1 parent 28d50f8 commit 855c07e

File tree

2 files changed

+148
-13
lines changed

2 files changed

+148
-13
lines changed

dbos/workflow.go

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,32 @@ type workflowOutcome[R any] struct {
8282
// The type parameter R represents the expected return type of the workflow.
8383
// Handles can be used to wait for workflow completion, check status, and retrieve results.
8484
type WorkflowHandle[R any] interface {
85-
GetResult() (R, error) // Wait for workflow completion and return the result
86-
GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting
87-
GetWorkflowID() string // Get the unique workflow identifier
85+
GetResult(opts ...GetResultOption) (R, error) // Wait for workflow completion and return the result
86+
GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting
87+
GetWorkflowID() string // Get the unique workflow identifier
8888
}
8989

9090
type baseWorkflowHandle struct {
9191
workflowID string
9292
dbosContext DBOSContext
9393
}
9494

95+
// GetResultOption is a functional option for configuring GetResult behavior.
96+
type GetResultOption func(*getResultOptions)
97+
98+
// getResultOptions holds the configuration for GetResult execution.
99+
type getResultOptions struct {
100+
timeout time.Duration
101+
}
102+
103+
// WithHandleTimeout sets a timeout for the GetResult operation.
104+
// If the timeout is reached before the workflow completes, GetResult will return a timeout error.
105+
func WithHandleTimeout(timeout time.Duration) GetResultOption {
106+
return func(opts *getResultOptions) {
107+
opts.timeout = timeout
108+
}
109+
}
110+
95111
// GetStatus returns the current status of the workflow from the database
96112
// If the DBOSContext is running in client mode, do not load input and outputs
97113
func (h *baseWorkflowHandle) GetStatus() (WorkflowStatus, error) {
@@ -162,12 +178,33 @@ type workflowHandle[R any] struct {
162178
outcomeChan chan workflowOutcome[R]
163179
}
164180

165-
func (h *workflowHandle[R]) GetResult() (R, error) {
166-
outcome, ok := <-h.outcomeChan // Blocking read
167-
if !ok {
168-
// Return an error if the channel was closed. In normal operations this would happen if GetResul() is called twice on a handler. The first call should get the buffered result, the second call find zero values (channel is empty and closed).
169-
return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?")
181+
func (h *workflowHandle[R]) GetResult(opts ...GetResultOption) (R, error) {
182+
options := &getResultOptions{}
183+
for _, opt := range opts {
184+
opt(options)
170185
}
186+
187+
var timeoutChan <-chan time.Time
188+
if options.timeout > 0 {
189+
timeoutChan = time.After(options.timeout)
190+
}
191+
192+
select {
193+
case outcome, ok := <-h.outcomeChan:
194+
if !ok {
195+
// Return error if channel closed (happens when GetResult() called twice)
196+
return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?")
197+
}
198+
return h.processOutcome(outcome)
199+
case <-h.dbosContext.Done():
200+
return *new(R), h.dbosContext.Err()
201+
case <-timeoutChan:
202+
return *new(R), fmt.Errorf("workflow result timeout after %v", options.timeout)
203+
}
204+
}
205+
206+
// processOutcome handles the common logic for processing workflow outcomes
207+
func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error) {
171208
// If we are calling GetResult inside a workflow, record the result as a step result
172209
workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState)
173210
isWithinWorkflow := ok && workflowState != nil
@@ -198,9 +235,22 @@ type workflowPollingHandle[R any] struct {
198235
baseWorkflowHandle
199236
}
200237

201-
func (h *workflowPollingHandle[R]) GetResult() (R, error) {
202-
result, err := retryWithResult(h.dbosContext, func() (any, error) {
203-
return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(h.dbosContext, h.workflowID)
238+
func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) {
239+
options := &getResultOptions{}
240+
for _, opt := range opts {
241+
opt(options)
242+
}
243+
244+
// Use timeout if specified, otherwise use DBOS context directly
245+
ctx := h.dbosContext
246+
var cancel context.CancelFunc
247+
if options.timeout > 0 {
248+
ctx, cancel = WithTimeout(h.dbosContext, options.timeout)
249+
defer cancel()
250+
}
251+
252+
result, err := retryWithResult(ctx, func() (any, error) {
253+
return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID)
204254
}, withRetrierLogger(h.dbosContext.(*dbosContext).logger))
205255
if result != nil {
206256
typedResult, ok := result.(R)
@@ -240,8 +290,8 @@ type workflowHandleProxy[R any] struct {
240290
wrappedHandle WorkflowHandle[any]
241291
}
242292

243-
func (h *workflowHandleProxy[R]) GetResult() (R, error) {
244-
result, err := h.wrappedHandle.GetResult()
293+
func (h *workflowHandleProxy[R]) GetResult(opts ...GetResultOption) (R, error) {
294+
result, err := h.wrappedHandle.GetResult(opts...)
245295
if err != nil {
246296
var zero R
247297
return zero, err

dbos/workflows_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ func simpleStep(_ context.Context) (string, error) {
3838
return "from step", nil
3939
}
4040

41+
func slowWorkflow(dbosCtx DBOSContext, input string) (string, error) {
42+
// Simulate a slow workflow that takes time to complete
43+
time.Sleep(500 * time.Millisecond)
44+
return input, nil
45+
}
46+
4147
func simpleStepError(_ context.Context) (string, error) {
4248
return "", fmt.Errorf("step failure")
4349
}
@@ -4523,3 +4529,82 @@ func TestWorkflowIdentity(t *testing.T) {
45234529
assert.Equal(t, []string{"reader", "writer"}, status.AuthenticatedRoles)
45244530
})
45254531
}
4532+
4533+
func TestWorkflowHandleTimeout(t *testing.T) {
4534+
dbosCtx := setupDBOS(t, true, true)
4535+
RegisterWorkflow(dbosCtx, simpleWorkflow)
4536+
4537+
t.Run("WorkflowHandleTimeout", func(t *testing.T) {
4538+
// Test timeout on workflowHandle (channel-based)
4539+
handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test")
4540+
require.NoError(t, err, "failed to start workflow")
4541+
4542+
// Test with a very short timeout - should timeout
4543+
start := time.Now()
4544+
_, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond))
4545+
duration := time.Since(start)
4546+
4547+
require.Error(t, err, "expected timeout error")
4548+
assert.Contains(t, err.Error(), "workflow result timeout")
4549+
assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly")
4550+
})
4551+
4552+
t.Run("WorkflowHandleNoTimeout", func(t *testing.T) {
4553+
// Test without timeout - should work normally
4554+
handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test")
4555+
require.NoError(t, err, "failed to start workflow")
4556+
4557+
result, err := handle.GetResult()
4558+
require.NoError(t, err, "GetResult without timeout should succeed")
4559+
assert.Equal(t, "test", result)
4560+
})
4561+
4562+
t.Run("WorkflowHandleGetResultAfterChannelClose", func(t *testing.T) {
4563+
// Test getting result after the outcome channel would be closed
4564+
handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test")
4565+
require.NoError(t, err, "failed to start workflow")
4566+
4567+
// Get result first time - this will close the outcome channel
4568+
result1, err := handle.GetResult()
4569+
require.NoError(t, err, "first GetResult should succeed")
4570+
assert.Equal(t, "test", result1)
4571+
4572+
// Sleep briefly to ensure channel is closed
4573+
time.Sleep(10 * time.Millisecond)
4574+
4575+
// Get result second time - should fail since channel is closed
4576+
_, err = handle.GetResult()
4577+
require.Error(t, err, "second GetResult should fail")
4578+
assert.Contains(t, err.Error(), "workflow result channel is already closed")
4579+
})
4580+
}
4581+
4582+
func TestWorkflowPollingHandleTimeout(t *testing.T) {
4583+
dbosCtx := setupDBOS(t, true, true)
4584+
RegisterWorkflow(dbosCtx, simpleWorkflow)
4585+
4586+
t.Run("WorkflowPollingHandleTimeout", func(t *testing.T) {
4587+
// Test timeout on workflowPollingHandle (database polling)
4588+
handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test")
4589+
require.NoError(t, err, "failed to start workflow")
4590+
4591+
// Test with a very short timeout - should timeout
4592+
start := time.Now()
4593+
_, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond))
4594+
duration := time.Since(start)
4595+
4596+
require.Error(t, err, "expected timeout error")
4597+
assert.Contains(t, err.Error(), "workflow result timeout after 1ms")
4598+
assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly")
4599+
})
4600+
4601+
t.Run("WorkflowPollingHandleNoTimeout", func(t *testing.T) {
4602+
// Test without timeout - should work normally
4603+
handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test")
4604+
require.NoError(t, err, "failed to start workflow")
4605+
4606+
result, err := handle.GetResult()
4607+
require.NoError(t, err, "GetResult without timeout should succeed")
4608+
assert.Equal(t, "test", result)
4609+
})
4610+
}

0 commit comments

Comments
 (0)