Skip to content

Commit 9b0bb37

Browse files
committed
Add support for strong consistency on GetWorkflowHistory and DescribeWorkflowExecution
1 parent 2af19f2 commit 9b0bb37

File tree

5 files changed

+496
-16
lines changed

5 files changed

+496
-16
lines changed

client/client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ type (
7878
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions
7979
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse
8080

81+
// GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions
82+
GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest
83+
84+
// DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions
85+
DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest
86+
8187
// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
8288
ParentClosePolicy = internal.ParentClosePolicy
8389

@@ -226,6 +232,26 @@ type (
226232
// }
227233
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
228234

235+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
236+
// See GetWorkflowHistoryWithOptionsRequest for more information.
237+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
238+
// Example:-
239+
// To iterate all events,
240+
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
241+
// events := []*shared.HistoryEvent{}
242+
// for iter.HasNext() {
243+
// event, err := iter.Next()
244+
// if err != nil {
245+
// return err
246+
// }
247+
// events = append(events, event)
248+
// }
249+
// Returns the following errors:
250+
// - EntityNotExistsError
251+
// - BadRequestError
252+
// - InternalServiceError
253+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)
254+
229255
// CompleteActivity reports activity completed.
230256
// activity Execute method can return activity.ErrResultPending to
231257
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -383,6 +409,14 @@ type (
383409
// - EntityNotExistError
384410
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
385411

412+
// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
413+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
414+
// The errors it can return:
415+
// - BadRequestError
416+
// - InternalServiceError
417+
// - EntityNotExistError
418+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
419+
386420
// DescribeTaskList returns information about the target tasklist, right now this API returns the
387421
// pollers which polled this tasklist in last few minutes.
388422
// The errors it can return:

internal/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ type (
219219
// }
220220
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
221221

222+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
223+
// See GetWorkflowHistoryWithOptionsRequest for more information.
224+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
225+
// The errors it can return:
226+
// - EntityNotExistsError
227+
// - BadRequestError
228+
// - InternalServiceError
229+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)
230+
222231
// CompleteActivity reports activity completed.
223232
// activity Execute method can return acitivity.activity.ErrResultPending to
224233
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -367,6 +376,14 @@ type (
367376
// - EntityNotExistError
368377
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
369378

379+
// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level.
380+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
381+
// The errors it can return:
382+
// - BadRequestError
383+
// - InternalServiceError
384+
// - EntityNotExistError
385+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
386+
370387
// DescribeTaskList returns information about the target tasklist, right now this API returns the
371388
// pollers which polled this tasklist in last few minutes.
372389
// The errors it can return:

internal/internal_workflow_client.go

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -453,19 +453,37 @@ func (wc *workflowClient) GetWorkflowHistory(
453453
isLongPoll bool,
454454
filterType s.HistoryEventFilterType,
455455
) HistoryEventIterator {
456+
request := &GetWorkflowHistoryWithOptionsRequest{
457+
WorkflowID: workflowID,
458+
RunID: runID,
459+
IsLongPoll: isLongPoll,
460+
FilterType: filterType,
461+
QueryConsistencyLevel: nil, // Use server default
462+
}
463+
iter, _ := wc.GetWorkflowHistoryWithOptions(ctx, request)
464+
return iter
465+
}
456466

467+
// GetWorkflowHistoryWithOptions gets history events with additional options including query consistency level.
468+
// See GetWorkflowHistoryWithOptionsRequest for more information.
469+
// The errors it can return:
470+
// - EntityNotExistsError
471+
// - BadRequestError
472+
// - InternalServiceError
473+
func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error) {
457474
domain := wc.domain
458475
paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
459-
request := &s.GetWorkflowExecutionHistoryRequest{
476+
req := &s.GetWorkflowExecutionHistoryRequest{
460477
Domain: common.StringPtr(domain),
461478
Execution: &s.WorkflowExecution{
462-
WorkflowId: common.StringPtr(workflowID),
463-
RunId: getRunID(runID),
479+
WorkflowId: common.StringPtr(request.WorkflowID),
480+
RunId: getRunID(request.RunID),
464481
},
465-
WaitForNewEvent: common.BoolPtr(isLongPoll),
466-
HistoryEventFilterType: &filterType,
482+
WaitForNewEvent: common.BoolPtr(request.IsLongPoll),
483+
HistoryEventFilterType: &request.FilterType,
467484
NextPageToken: nextToken,
468-
SkipArchival: common.BoolPtr(isLongPoll),
485+
SkipArchival: common.BoolPtr(request.IsLongPoll),
486+
QueryConsistencyLevel: request.QueryConsistencyLevel,
469487
}
470488

471489
var response *s.GetWorkflowExecutionHistoryResponse
@@ -477,7 +495,7 @@ func (wc *workflowClient) GetWorkflowHistory(
477495
func() error {
478496
var err1 error
479497
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
480-
if isLongPoll {
498+
if request.IsLongPoll {
481499
builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
482500
deadline, ok := ctx.Deadline()
483501
if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
@@ -487,14 +505,14 @@ func (wc *workflowClient) GetWorkflowHistory(
487505
}
488506
})
489507
defer cancel()
490-
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...)
508+
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, req, opt...)
491509

492510
if err1 != nil {
493511
return err1
494512
}
495513

496514
if response.RawHistory != nil {
497-
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType)
515+
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, request.FilterType)
498516
if err != nil {
499517
return err
500518
}
@@ -511,15 +529,15 @@ func (wc *workflowClient) GetWorkflowHistory(
511529
if err != nil {
512530
return nil, err
513531
}
514-
if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
532+
if request.IsLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
515533
if isFinalLongPoll {
516534
// essentially a deadline exceeded, the last attempt did not get a result.
517535
// this is necessary because the server does not know if we are able to try again,
518536
// so it returns an empty result slightly before a timeout occurs, so the next
519537
// attempt's token can be returned if it wishes to retry.
520538
return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
521539
}
522-
request.NextPageToken = response.NextPageToken
540+
req.NextPageToken = response.NextPageToken
523541
continue Loop
524542
}
525543
break Loop
@@ -529,7 +547,7 @@ func (wc *workflowClient) GetWorkflowHistory(
529547

530548
return &historyEventIteratorImpl{
531549
paginate: paginate,
532-
}
550+
}, nil
533551
}
534552

535553
func isEntityNonExistFromPassive(err error) bool {
@@ -788,20 +806,36 @@ func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearch
788806
// - InternalServiceError
789807
// - EntityNotExistError
790808
func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) {
791-
request := &s.DescribeWorkflowExecutionRequest{
809+
request := &DescribeWorkflowExecutionWithOptionsRequest{
810+
WorkflowID: workflowID,
811+
RunID: runID,
812+
QueryConsistencyLevel: nil, // Use server default
813+
}
814+
return wc.DescribeWorkflowExecutionWithOptions(ctx, request)
815+
}
816+
817+
// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level.
818+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
819+
// The errors it can return:
820+
// - BadRequestError
821+
// - InternalServiceError
822+
// - EntityNotExistError
823+
func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) {
824+
req := &s.DescribeWorkflowExecutionRequest{
792825
Domain: common.StringPtr(wc.domain),
793826
Execution: &s.WorkflowExecution{
794-
WorkflowId: common.StringPtr(workflowID),
795-
RunId: common.StringPtr(runID),
827+
WorkflowId: common.StringPtr(request.WorkflowID),
828+
RunId: common.StringPtr(request.RunID),
796829
},
830+
QueryConsistencyLevel: request.QueryConsistencyLevel,
797831
}
798832
var response *s.DescribeWorkflowExecutionResponse
799833
err := backoff.Retry(ctx,
800834
func() error {
801835
var err1 error
802836
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
803837
defer cancel()
804-
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...)
838+
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, req, opt...)
805839
return err1
806840
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
807841
if err != nil {
@@ -875,6 +909,39 @@ type QueryWorkflowWithOptionsResponse struct {
875909
QueryRejected *s.QueryRejected
876910
}
877911

912+
// GetWorkflowHistoryWithOptionsRequest is the request to GetWorkflowHistoryWithOptions
913+
type GetWorkflowHistoryWithOptionsRequest struct {
914+
// WorkflowID specifies the workflow to request. Required.
915+
WorkflowID string
916+
// RunID is an optional field used to identify a specific run of the workflow.
917+
// If RunID is not provided the latest run will be used.
918+
RunID string
919+
// IsLongPoll is an optional field indicating whether to continue polling for new events until the workflow is terminal.
920+
// If IsLongPoll is true, the client can continue to iterate on new events that occur after the initial request.
921+
// Note that this means the client request will remain open until the workflow is terminal.
922+
IsLongPoll bool
923+
// FilterType is an optional field used to specify which events to return.
924+
// CloseEvent will only return the last event in the workflow history.
925+
FilterType s.HistoryEventFilterType
926+
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
927+
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
928+
// If not set, server will use the default consistency level.
929+
QueryConsistencyLevel *s.QueryConsistencyLevel
930+
}
931+
932+
// DescribeWorkflowExecutionWithOptionsRequest is the request to DescribeWorkflowExecutionWithOptions
933+
type DescribeWorkflowExecutionWithOptionsRequest struct {
934+
// WorkflowID specifies the workflow to request. Required.
935+
WorkflowID string
936+
// RunID is an optional field used to identify a specific run of the workflow.
937+
// If RunID is not provided the latest run will be used.
938+
RunID string
939+
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
940+
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
941+
// If not set, server will use the default consistency level.
942+
QueryConsistencyLevel *s.QueryConsistencyLevel
943+
}
944+
878945
// QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously.
879946
// See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResult for more information.
880947
// The errors it can return:

0 commit comments

Comments
 (0)