Skip to content

Commit ad1ea02

Browse files
Create GetWorkflow method allowing to wait on workflow completion (#706)
1 parent 7d946c2 commit ad1ea02

File tree

5 files changed

+135
-39
lines changed

5 files changed

+135
-39
lines changed

client/client.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ type (
7878
// The errors it can return:
7979
// - EntityNotExistsError, if domain does not exists
8080
// - BadRequestError
81-
// - WorkflowExecutionAlreadyStartedError
8281
// - InternalServiceError
8382
//
8483
// WorkflowRun has 2 methods:
@@ -95,6 +94,23 @@ type (
9594
// NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
9695
ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error)
9796

97+
// GetWorkfow retrieves a workflow execution and return a WorkflowRun instance (described above)
98+
// - workflow ID of the workflow.
99+
// - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID.
100+
//
101+
// WorkflowRun has 2 methods:
102+
// - GetRunID() string: which return the first started workflow run ID (please see below)
103+
// - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow
104+
// execution result to valuePtr, if workflow execution is a success, or return corresponding
105+
// error. This is a blocking API.
106+
// NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
107+
// return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
108+
// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
109+
// Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError,
110+
// the second run has run ID "run ID 2" and return some result other than ContinueAsNewError:
111+
// GetRunID() will always return "run ID 1" and Get(ctx context.Context, valuePtr interface{}) will return the result of second run.
112+
GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun
113+
98114
// SignalWorkflow sends a signals to a workflow in execution
99115
// - workflow ID of the workflow.
100116
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.

internal/client.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ type (
6565
// The errors it can return:
6666
// - EntityNotExistsError, if domain does not exists
6767
// - BadRequestError
68-
// - WorkflowExecutionAlreadyStartedError
6968
// - InternalServiceError
7069
//
7170
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
@@ -86,6 +85,21 @@ type (
8685
// NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
8786
ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error)
8887

88+
// GetWorkfow retrieves a workflow execution and return a WorkflowRun instance
89+
// - workflow ID of the workflow.
90+
// - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID.
91+
//
92+
// WorkflowRun has three methods:
93+
// - GetID() string: which return workflow ID (which is same as StartWorkflowOptions.ID if provided)
94+
// - GetRunID() string: which return the first started workflow run ID (please see below)
95+
// - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow
96+
// execution result to valuePtr, if workflow execution is a success, or return corresponding
97+
// error. This is a blocking API.
98+
// NOTE: if the retrieved workflow returned ContinueAsNewError during the workflow execution, the
99+
// return result of GetRunID() will be the retrieved workflow run ID, not the new run ID caused by ContinueAsNewError,
100+
// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
101+
GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun
102+
89103
// SignalWorkflow sends a signals to a workflow in execution
90104
// - workflow ID of the workflow.
91105
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.

internal/internal_workflow_client.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,13 @@ func (wc *workflowClient) StartWorkflow(
216216
return executionInfo, nil
217217
}
218218

219-
// ExecuteWorkflow starts a workflow execution and wait until this workflow reaches the end state, such as
220-
// workflow finished successfully or timeout.
219+
// ExecuteWorkflow starts a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow
220+
// reaches the end state, such as workflow finished successfully or timeout.
221221
// The user can use this to start using a functor like below and get the workflow execution result, as encoded.Value
222222
// Either by
223-
// RunWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
223+
// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
224224
// or
225-
// RunWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
225+
// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
226226
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
227227
// subjected to change in the future.
228228
// NOTE: the context.Context should have a fairly large timeout, since workflow execution may take a while to be finished
@@ -260,6 +260,25 @@ func (wc *workflowClient) ExecuteWorkflow(ctx context.Context, options StartWork
260260
}, nil
261261
}
262262

263+
// GetWorkflow gets a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow
264+
// reaches the end state, such as workflow finished successfully or timeout.
265+
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
266+
// subjected to change in the future.
267+
func (wc *workflowClient) GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun {
268+
269+
iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
270+
return wc.GetWorkflowHistory(fnCtx, workflowID, fnRunID, true, s.HistoryEventFilterTypeCloseEvent)
271+
}
272+
273+
return &workflowRunImpl{
274+
workflowID: workflowID,
275+
firstRunID: runID,
276+
currentRunID: runID,
277+
iterFn: iterFn,
278+
dataConverter: wc.dataConverter,
279+
}
280+
}
281+
263282
// SignalWorkflow signals a workflow in execution.
264283
func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error {
265284
input, err := encodeArg(wc.dataConverter, arg)

internal/internal_workflow_client_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,43 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_ContinueAsNew() {
625625
s.Equal(workflowResult, decodedResult)
626626
}
627627

628+
func (s *workflowRunSuite) TestGetWorkflow() {
629+
filterType := shared.HistoryEventFilterTypeCloseEvent
630+
eventType := shared.EventTypeWorkflowExecutionCompleted
631+
workflowResult := time.Hour * 59
632+
encodedResult, _ := encodeArg(getDefaultDataConverter(), workflowResult)
633+
getRequest := getGetWorkflowExecutionHistoryRequest(filterType)
634+
getResponse := &shared.GetWorkflowExecutionHistoryResponse{
635+
History: &shared.History{
636+
Events: []*shared.HistoryEvent{
637+
&shared.HistoryEvent{
638+
EventType: &eventType,
639+
WorkflowExecutionCompletedEventAttributes: &shared.WorkflowExecutionCompletedEventAttributes{
640+
Result: encodedResult,
641+
},
642+
},
643+
},
644+
},
645+
NextPageToken: nil,
646+
}
647+
s.workflowServiceClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), getRequest, gomock.Any(), gomock.Any(), gomock.Any()).Return(getResponse, nil).Times(1)
648+
649+
workflowID := workflowID
650+
runID := runID
651+
652+
workflowRun := s.workflowClient.GetWorkflow(
653+
context.Background(),
654+
workflowID,
655+
runID,
656+
)
657+
s.Equal(workflowRun.GetID(), workflowID)
658+
s.Equal(workflowRun.GetRunID(), runID)
659+
decodedResult := time.Minute
660+
err := workflowRun.Get(context.Background(), &decodedResult)
661+
s.Nil(err)
662+
s.Equal(workflowResult, decodedResult)
663+
}
664+
628665
func getGetWorkflowExecutionHistoryRequest(filterType shared.HistoryEventFilterType) *shared.GetWorkflowExecutionHistoryRequest {
629666
isLongPoll := true
630667

mocks/Client.go

Lines changed: 43 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)