Skip to content
Merged
34 changes: 34 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ type (
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse

// GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions
GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest

// DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions
DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest

// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
ParentClosePolicy = internal.ParentClosePolicy

Expand Down Expand Up @@ -226,6 +232,26 @@ type (
// }
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
// Example:-
// To iterate all events,
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
// events := []*shared.HistoryEvent{}
// for iter.HasNext() {
// event, err := iter.Next()
// if err != nil {
// return err
// }
// events = append(events, event)
// }
// Returns the following errors:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)

// CompleteActivity reports activity completed.
// activity Execute method can return activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
Expand Down Expand Up @@ -383,6 +409,14 @@ type (
// - EntityNotExistError
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// The errors it can return:
Expand Down
17 changes: 17 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ type (
// }
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
// The errors it can return:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)

// CompleteActivity reports activity completed.
// activity Execute method can return acitivity.activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
Expand Down Expand Up @@ -367,6 +376,14 @@ type (
// - EntityNotExistError
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// The errors it can return:
Expand Down
99 changes: 83 additions & 16 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri
return err
}

// GetWorkflowHistoryWithOptionsRequest is the request to GetWorkflowHistoryWithOptions
type GetWorkflowHistoryWithOptionsRequest struct {
// WorkflowID specifies the workflow to request. Required.
WorkflowID string
// RunID is an optional field used to identify a specific run of the workflow.
// If RunID is not provided the latest run will be used.
RunID string
// IsLongPoll is an optional field indicating whether to continue polling for new events until the workflow is terminal.
// If IsLongPoll is true, the client can continue to iterate on new events that occur after the initial request.
// Note that this means the client request will remain open until the workflow is terminal.
IsLongPoll bool
// FilterType is an optional field used to specify which events to return.
// CloseEvent will only return the last event in the workflow history.
FilterType s.HistoryEventFilterType
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
// If not set, server will use the default consistency level.
QueryConsistencyLevel *s.QueryConsistencyLevel
}

// GetWorkflowHistory return a channel which contains the history events of a given workflow
func (wc *workflowClient) GetWorkflowHistory(
ctx context.Context,
Expand All @@ -453,19 +473,37 @@ func (wc *workflowClient) GetWorkflowHistory(
isLongPoll bool,
filterType s.HistoryEventFilterType,
) HistoryEventIterator {
request := &GetWorkflowHistoryWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
IsLongPoll: isLongPoll,
FilterType: filterType,
QueryConsistencyLevel: nil, // Use server default
}
iter, _ := wc.GetWorkflowHistoryWithOptions(ctx, request)
return iter
}

// GetWorkflowHistoryWithOptions gets history events with additional options including query consistency level.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// The errors it can return:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error) {
domain := wc.domain
paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
request := &s.GetWorkflowExecutionHistoryRequest{
req := &s.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Execution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: getRunID(runID),
WorkflowId: common.StringPtr(request.WorkflowID),
RunId: getRunID(request.RunID),
},
WaitForNewEvent: common.BoolPtr(isLongPoll),
HistoryEventFilterType: &filterType,
WaitForNewEvent: common.BoolPtr(request.IsLongPoll),
HistoryEventFilterType: &request.FilterType,
NextPageToken: nextToken,
SkipArchival: common.BoolPtr(isLongPoll),
SkipArchival: common.BoolPtr(request.IsLongPoll),
QueryConsistencyLevel: request.QueryConsistencyLevel,
}

var response *s.GetWorkflowExecutionHistoryResponse
Expand All @@ -477,7 +515,7 @@ func (wc *workflowClient) GetWorkflowHistory(
func() error {
var err1 error
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
if isLongPoll {
if request.IsLongPoll {
builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
deadline, ok := ctx.Deadline()
if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
Expand All @@ -487,14 +525,14 @@ func (wc *workflowClient) GetWorkflowHistory(
}
})
defer cancel()
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...)
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, req, opt...)

if err1 != nil {
return err1
}

if response.RawHistory != nil {
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType)
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, request.FilterType)
if err != nil {
return err
}
Expand All @@ -511,15 +549,15 @@ func (wc *workflowClient) GetWorkflowHistory(
if err != nil {
return nil, err
}
if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
if request.IsLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
if isFinalLongPoll {
// essentially a deadline exceeded, the last attempt did not get a result.
// this is necessary because the server does not know if we are able to try again,
// so it returns an empty result slightly before a timeout occurs, so the next
// attempt's token can be returned if it wishes to retry.
return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
}
request.NextPageToken = response.NextPageToken
req.NextPageToken = response.NextPageToken
continue Loop
}
break Loop
Expand All @@ -529,7 +567,7 @@ func (wc *workflowClient) GetWorkflowHistory(

return &historyEventIteratorImpl{
paginate: paginate,
}
}, nil
}

func isEntityNonExistFromPassive(err error) bool {
Expand Down Expand Up @@ -782,26 +820,55 @@ func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearch
return response, nil
}

// DescribeWorkflowExecutionWithOptionsRequest is the request to DescribeWorkflowExecutionWithOptions
type DescribeWorkflowExecutionWithOptionsRequest struct {
// WorkflowID specifies the workflow to request. Required.
WorkflowID string
// RunID is an optional field used to identify a specific run of the workflow.
// If RunID is not provided the latest run will be used.
RunID string
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
// If not set, server will use the default consistency level.
QueryConsistencyLevel *s.QueryConsistencyLevel
}

// DescribeWorkflowExecution returns information about the specified workflow execution.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) {
request := &s.DescribeWorkflowExecutionRequest{
request := &DescribeWorkflowExecutionWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryConsistencyLevel: nil, // Use server default
}
return wc.DescribeWorkflowExecutionWithOptions(ctx, request)
}

// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) {
req := &s.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(wc.domain),
Execution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
WorkflowId: common.StringPtr(request.WorkflowID),
RunId: common.StringPtr(request.RunID),
},
QueryConsistencyLevel: request.QueryConsistencyLevel,
}
var response *s.DescribeWorkflowExecutionResponse
err := backoff.Retry(ctx,
func() error {
var err1 error
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
defer cancel()
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...)
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, req, opt...)
return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
if err != nil {
Expand Down
Loading