Skip to content

Commit 38cf444

Browse files
committed
simplify handler implementation
1 parent c88b2e5 commit 38cf444

File tree

2 files changed

+68
-82
lines changed

2 files changed

+68
-82
lines changed

dbos/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
3232
continue
3333
}
3434
if cleared {
35-
workflowHandles = append(workflowHandles, &workflowPollingHandle[any]{workflowID: workflow.ID, dbosContext: ctx})
35+
workflowHandles = append(workflowHandles, newWorkflowPollingHandle[any](ctx, workflow.ID))
3636
}
3737
continue
3838
}

dbos/workflow.go

Lines changed: 67 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,57 @@ type WorkflowHandle[R any] interface {
8989
GetWorkflowID() string // Get the unique workflow identifier
9090
}
9191

92+
// baseWorkflowHandle contains common fields and methods for workflow handles
93+
type baseWorkflowHandle struct {
94+
workflowID string
95+
dbosContext DBOSContext
96+
}
97+
98+
// GetStatus returns the current status of the workflow from the database
99+
func (h *baseWorkflowHandle) GetStatus() (WorkflowStatus, error) {
100+
workflowStatuses, err := h.dbosContext.(*dbosContext).systemDB.listWorkflows(h.dbosContext, listWorkflowsDBInput{
101+
workflowIDs: []string{h.workflowID},
102+
loadInput: true,
103+
loadOutput: true,
104+
})
105+
if err != nil {
106+
return WorkflowStatus{}, fmt.Errorf("failed to get workflow status: %w", err)
107+
}
108+
if len(workflowStatuses) == 0 {
109+
return WorkflowStatus{}, newNonExistentWorkflowError(h.workflowID)
110+
}
111+
return workflowStatuses[0], nil
112+
}
113+
114+
func (h *baseWorkflowHandle) GetWorkflowID() string {
115+
return h.workflowID
116+
}
117+
118+
// newWorkflowHandle creates a new workflowHandle with the given parameters
119+
func newWorkflowHandle[R any](ctx DBOSContext, workflowID string, outcomeChan chan workflowOutcome[R]) *workflowHandle[R] {
120+
return &workflowHandle[R]{
121+
baseWorkflowHandle: baseWorkflowHandle{
122+
workflowID: workflowID,
123+
dbosContext: ctx,
124+
},
125+
outcomeChan: outcomeChan,
126+
}
127+
}
128+
129+
// newWorkflowPollingHandle creates a new workflowPollingHandle with the given parameters
130+
func newWorkflowPollingHandle[R any](ctx DBOSContext, workflowID string) *workflowPollingHandle[R] {
131+
return &workflowPollingHandle[R]{
132+
baseWorkflowHandle: baseWorkflowHandle{
133+
workflowID: workflowID,
134+
dbosContext: ctx,
135+
},
136+
}
137+
}
138+
92139
// workflowHandle is a concrete implementation of WorkflowHandle
93140
type workflowHandle[R any] struct {
94-
workflowID string
141+
baseWorkflowHandle
95142
outcomeChan chan workflowOutcome[R]
96-
dbosContext DBOSContext
97143
}
98144

99145
// GetResult waits for the workflow to complete and returns the result
@@ -127,29 +173,8 @@ func (h *workflowHandle[R]) GetResult() (R, error) {
127173
return outcome.result, outcome.err
128174
}
129175

130-
// GetStatus returns the current status of the workflow from the database
131-
func (h *workflowHandle[R]) GetStatus() (WorkflowStatus, error) {
132-
workflowStatuses, err := h.dbosContext.(*dbosContext).systemDB.listWorkflows(h.dbosContext, listWorkflowsDBInput{
133-
workflowIDs: []string{h.workflowID},
134-
loadInput: true,
135-
loadOutput: true,
136-
})
137-
if err != nil {
138-
return WorkflowStatus{}, fmt.Errorf("failed to get workflow status: %w", err)
139-
}
140-
if len(workflowStatuses) == 0 {
141-
return WorkflowStatus{}, newNonExistentWorkflowError(h.workflowID)
142-
}
143-
return workflowStatuses[0], nil
144-
}
145-
146-
func (h *workflowHandle[R]) GetWorkflowID() string {
147-
return h.workflowID
148-
}
149-
150176
type workflowPollingHandle[R any] struct {
151-
workflowID string
152-
dbosContext DBOSContext
177+
baseWorkflowHandle
153178
}
154179

155180
func (h *workflowPollingHandle[R]) GetResult() (R, error) {
@@ -186,26 +211,6 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) {
186211
return *new(R), err
187212
}
188213

189-
// GetStatus returns the current status of the workflow from the database
190-
func (h *workflowPollingHandle[R]) GetStatus() (WorkflowStatus, error) {
191-
workflowStatuses, err := h.dbosContext.(*dbosContext).systemDB.listWorkflows(h.dbosContext, listWorkflowsDBInput{
192-
workflowIDs: []string{h.workflowID},
193-
loadInput: true,
194-
loadOutput: true,
195-
})
196-
if err != nil {
197-
return WorkflowStatus{}, fmt.Errorf("failed to get workflow status: %w", err)
198-
}
199-
if len(workflowStatuses) == 0 {
200-
return WorkflowStatus{}, newNonExistentWorkflowError(h.workflowID)
201-
}
202-
return workflowStatuses[0], nil
203-
}
204-
205-
func (h *workflowPollingHandle[R]) GetWorkflowID() string {
206-
return h.workflowID
207-
}
208-
209214
/**********************************/
210215
/******* WORKFLOW REGISTRY *******/
211216
/**********************************/
@@ -409,7 +414,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
409414
if err != nil {
410415
return nil, err
411416
}
412-
return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it
417+
return newWorkflowPollingHandle[any](ctx, handle.GetWorkflowID()), nil // this is only used by recovery and queue runner so far -- queue runner dismisses it
413418
})
414419
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name)
415420

@@ -536,10 +541,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
536541
// If we got a polling handle, return its typed version
537542
if pollingHandle, ok := handle.(*workflowPollingHandle[any]); ok {
538543
// We need to convert the polling handle to a typed handle
539-
typedPollingHandle := &workflowPollingHandle[R]{
540-
workflowID: pollingHandle.workflowID,
541-
dbosContext: pollingHandle.dbosContext,
542-
}
544+
typedPollingHandle := newWorkflowPollingHandle[R](pollingHandle.dbosContext, pollingHandle.workflowID)
543545
return typedPollingHandle, nil
544546
}
545547

@@ -567,11 +569,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
567569
}
568570
}()
569571

570-
typedHandle := &workflowHandle[R]{
571-
workflowID: handle.workflowID,
572-
outcomeChan: typedOutcomeChan,
573-
dbosContext: handle.dbosContext,
574-
}
572+
typedHandle := newWorkflowHandle[R](handle.dbosContext, handle.workflowID, typedOutcomeChan)
575573

576574
return typedHandle, nil
577575
}
@@ -634,7 +632,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
634632
return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("checking child workflow: %v", err))
635633
}
636634
if childWorkflowID != nil {
637-
return &workflowPollingHandle[any]{workflowID: *childWorkflowID, dbosContext: uncancellableCtx}, nil
635+
return newWorkflowPollingHandle[any](uncancellableCtx, *childWorkflowID), nil
638636
}
639637
}
640638

@@ -706,7 +704,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
706704
if err := tx.Commit(uncancellableCtx); err != nil {
707705
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to commit transaction: %v", err))
708706
}
709-
return &workflowPollingHandle[any]{workflowID: workflowStatus.ID, dbosContext: uncancellableCtx}, nil
707+
return newWorkflowPollingHandle[any](uncancellableCtx, workflowStatus.ID), nil
710708
}
711709

712710
// Record child workflow relationship if this is a child workflow
@@ -798,7 +796,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
798796
close(outcomeChan)
799797
}()
800798

801-
return &workflowHandle[any]{workflowID: workflowID, outcomeChan: outcomeChan, dbosContext: uncancellableCtx}, nil
799+
return newWorkflowHandle[any](uncancellableCtx, workflowID, outcomeChan), nil
802800
}
803801

804802
/******************************/
@@ -1303,7 +1301,7 @@ func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (Workfl
13031301
if len(workflowStatus) == 0 {
13041302
return nil, newNonExistentWorkflowError(workflowID)
13051303
}
1306-
return &workflowPollingHandle[any]{workflowID: workflowID, dbosContext: c}, nil
1304+
return newWorkflowPollingHandle[any](c, workflowID), nil
13071305
}
13081306

13091307
// RetrieveWorkflow returns a typed handle to an existing workflow.
@@ -1323,9 +1321,9 @@ func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (Workfl
13231321
// } else {
13241322
// log.Printf("Result: %d", result)
13251323
// }
1326-
func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (workflowPollingHandle[R], error) {
1324+
func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPollingHandle[R], error) {
13271325
if ctx == nil {
1328-
return workflowPollingHandle[R]{}, errors.New("dbosCtx cannot be nil")
1326+
return nil, errors.New("dbosCtx cannot be nil")
13291327
}
13301328

13311329
// Register the output for gob encoding
@@ -1336,12 +1334,12 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (workflowPollin
13361334
workflowIDs: []string{workflowID},
13371335
})
13381336
if err != nil {
1339-
return workflowPollingHandle[R]{}, fmt.Errorf("failed to retrieve workflow status: %w", err)
1337+
return nil, fmt.Errorf("failed to retrieve workflow status: %w", err)
13401338
}
13411339
if len(workflowStatus) == 0 {
1342-
return workflowPollingHandle[R]{}, newNonExistentWorkflowError(workflowID)
1340+
return nil, newNonExistentWorkflowError(workflowID)
13431341
}
1344-
return workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil
1342+
return newWorkflowPollingHandle[R](ctx, workflowID), nil
13451343
}
13461344

13471345
type EnqueueOptions struct {
@@ -1406,10 +1404,7 @@ func (c *dbosContext) Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHan
14061404
return nil, fmt.Errorf("failed to commit transaction: %w", err)
14071405
}
14081406

1409-
return &workflowPollingHandle[any]{
1410-
workflowID: workflowID,
1411-
dbosContext: uncancellableCtx,
1412-
}, nil
1407+
return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil
14131408
}
14141409

14151410
type GenericEnqueueOptions[P any] struct {
@@ -1504,10 +1499,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo
15041499
return nil, err
15051500
}
15061501

1507-
return &workflowPollingHandle[R]{
1508-
workflowID: handle.GetWorkflowID(),
1509-
dbosContext: ctx,
1510-
}, nil
1502+
return newWorkflowPollingHandle[R](ctx, handle.GetWorkflowID()), nil
15111503
}
15121504

15131505
// CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED.
@@ -1548,7 +1540,7 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow
15481540
if err != nil {
15491541
return nil, err
15501542
}
1551-
return &workflowPollingHandle[any]{workflowID: workflowID, dbosContext: c}, nil
1543+
return newWorkflowPollingHandle[any](c, workflowID), nil
15521544
}
15531545

15541546
// ResumeWorkflow resumes a cancelled workflow by setting its status back to ENQUEUED.
@@ -1583,7 +1575,7 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R
15831575
if err != nil {
15841576
return nil, err
15851577
}
1586-
return &workflowPollingHandle[R]{workflowID: workflowID, dbosContext: ctx}, nil
1578+
return newWorkflowPollingHandle[R](ctx, workflowID), nil
15871579
}
15881580

15891581
// ForkWorkflowInput holds configuration parameters for forking workflows.
@@ -1622,10 +1614,7 @@ func (c *dbosContext) ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (Work
16221614
return nil, err
16231615
}
16241616

1625-
return &workflowPollingHandle[any]{
1626-
workflowID: forkedWorkflowID,
1627-
dbosContext: c,
1628-
}, nil
1617+
return newWorkflowPollingHandle[any](c, forkedWorkflowID), nil
16291618
}
16301619

16311620
// ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step.
@@ -1679,10 +1668,7 @@ func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHand
16791668
if err != nil {
16801669
return nil, err
16811670
}
1682-
return &workflowPollingHandle[R]{
1683-
workflowID: handle.GetWorkflowID(),
1684-
dbosContext: ctx,
1685-
}, nil
1671+
return newWorkflowPollingHandle[R](ctx, handle.GetWorkflowID()), nil
16861672
}
16871673

16881674
// listWorkflowsParams holds configuration parameters for listing workflows

0 commit comments

Comments
 (0)