Skip to content

Commit b97bc74

Browse files
authored
Add List/Scan/Count Workflow APIs (#716)
* Add List/Scan/Count Workflow APIs * Address comments * Address comments
1 parent ad1ea02 commit b97bc74

File tree

5 files changed

+257
-1
lines changed

5 files changed

+257
-1
lines changed

client/client.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,38 @@ type (
236236
// - EntityNotExistError
237237
ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error)
238238

239+
// ListWorkflow gets workflow executions based on query. This API only works with ElasticSearch,
240+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause,
241+
// examples:
242+
// - "(WorkflowID = 'wid1' or (WorkflowType = 'type2' and WorkflowID = 'wid2'))".
243+
// - "CloseTime between '2019-08-27T15:04:05+00:00' and '2019-08-28T15:04:05+00:00'".
244+
// - to list only open workflow use "CloseTime = missing"
245+
// Retrieved workflow executions are sorted by StartTime in descending order when list open workflow,
246+
// and sorted by CloseTime in descending order for other queries.
247+
// The errors it can return:
248+
// - BadRequestError
249+
// - InternalServiceError
250+
ListWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error)
251+
252+
// ScanWorkflow gets workflow executions based on query. This API only works with ElasticSearch,
253+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause
254+
// (see ListWorkflow for query examples).
255+
// ScanWorkflow should be used when retrieving large amount of workflows and order is not needed.
256+
// It will use more ElasticSearch resources than ListWorkflow, but will be several times faster
257+
// when retrieving millions of workflows.
258+
// The errors it can return:
259+
// - BadRequestError
260+
// - InternalServiceError
261+
ScanWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error)
262+
263+
// CountWorkflow gets number of workflow executions based on query. This API only works with ElasticSearch,
264+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause
265+
// (see ListWorkflow for query examples).
266+
// The errors it can return:
267+
// - BadRequestError
268+
// - InternalServiceError
269+
CountWorkflow(ctx context.Context, request *s.CountWorkflowExecutionsRequest) (*s.CountWorkflowExecutionsResponse, error)
270+
239271
// QueryWorkflow queries a given workflow's last execution and returns the query result synchronously. Parameter workflowID
240272
// and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the
241273
// target workflow execution that this query will be send to. If runID is not specified (empty string), server will

internal/client.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,38 @@ type (
219219
// - EntityNotExistError
220220
ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error)
221221

222+
// ListWorkflow gets workflow executions based on query. This API only works with ElasticSearch,
223+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause,
224+
// examples:
225+
// - "(WorkflowID = 'wid1' or (WorkflowType = 'type2' and WorkflowID = 'wid2'))".
226+
// - "CloseTime between '2019-08-27T15:04:05+00:00' and '2019-08-28T15:04:05+00:00'".
227+
// - to list only open workflow use "CloseTime = missing"
228+
// Retrieved workflow executions are sorted by StartTime in descending order when list open workflow,
229+
// and sorted by CloseTime in descending order for other queries.
230+
// The errors it can return:
231+
// - BadRequestError
232+
// - InternalServiceError
233+
ListWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error)
234+
235+
// ScanWorkflow gets workflow executions based on query. This API only works with ElasticSearch,
236+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause
237+
// (see ListWorkflow for query examples).
238+
// ScanWorkflow should be used when retrieving large amount of workflows and order is not needed.
239+
// It will use more ElasticSearch resources than ListWorkflow, but will be several times faster
240+
// when retrieving millions of workflows.
241+
// The errors it can return:
242+
// - BadRequestError
243+
// - InternalServiceError
244+
ScanWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error)
245+
246+
// CountWorkflow gets number of workflow executions based on query. This API only works with ElasticSearch,
247+
// and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause
248+
// (see ListWorkflow for query examples).
249+
// The errors it can return:
250+
// - BadRequestError
251+
// - InternalServiceError
252+
CountWorkflow(ctx context.Context, request *s.CountWorkflowExecutionsRequest) (*s.CountWorkflowExecutionsResponse, error)
253+
222254
// QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID
223255
// and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the
224256
// target workflow execution that this query will be send to. If runID is not specified (empty string), server will

internal/internal_workflow_client.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ func (wc *workflowClient) ListClosedWorkflow(ctx context.Context, request *s.Lis
576576
return response, nil
577577
}
578578

579-
// ListClosedWorkflow gets open workflow executions based on request filters
579+
// ListOpenWorkflow gets open workflow executions based on request filters
580580
// The errors it can throw:
581581
// - BadRequestError
582582
// - InternalServiceError
@@ -600,6 +600,66 @@ func (wc *workflowClient) ListOpenWorkflow(ctx context.Context, request *s.ListO
600600
return response, nil
601601
}
602602

603+
// ListWorkflow implementation
604+
func (wc *workflowClient) ListWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) {
605+
if len(request.GetDomain()) == 0 {
606+
request.Domain = common.StringPtr(wc.domain)
607+
}
608+
var response *s.ListWorkflowExecutionsResponse
609+
err := backoff.Retry(ctx,
610+
func() error {
611+
var err1 error
612+
tchCtx, cancel, opt := newChannelContext(ctx)
613+
defer cancel()
614+
response, err1 = wc.workflowService.ListWorkflowExecutions(tchCtx, request, opt...)
615+
return err1
616+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
617+
if err != nil {
618+
return nil, err
619+
}
620+
return response, nil
621+
}
622+
623+
// ScanWorkflow implementation
624+
func (wc *workflowClient) ScanWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) {
625+
if len(request.GetDomain()) == 0 {
626+
request.Domain = common.StringPtr(wc.domain)
627+
}
628+
var response *s.ListWorkflowExecutionsResponse
629+
err := backoff.Retry(ctx,
630+
func() error {
631+
var err1 error
632+
tchCtx, cancel, opt := newChannelContext(ctx)
633+
defer cancel()
634+
response, err1 = wc.workflowService.ScanWorkflowExecutions(tchCtx, request, opt...)
635+
return err1
636+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
637+
if err != nil {
638+
return nil, err
639+
}
640+
return response, nil
641+
}
642+
643+
// CountWorkflow implementation
644+
func (wc *workflowClient) CountWorkflow(ctx context.Context, request *s.CountWorkflowExecutionsRequest) (*s.CountWorkflowExecutionsResponse, error) {
645+
if len(request.GetDomain()) == 0 {
646+
request.Domain = common.StringPtr(wc.domain)
647+
}
648+
var response *s.CountWorkflowExecutionsResponse
649+
err := backoff.Retry(ctx,
650+
func() error {
651+
var err1 error
652+
tchCtx, cancel, opt := newChannelContext(ctx)
653+
defer cancel()
654+
response, err1 = wc.workflowService.CountWorkflowExecutions(tchCtx, request, opt...)
655+
return err1
656+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
657+
if err != nil {
658+
return nil, err
659+
}
660+
return response, nil
661+
}
662+
603663
// DescribeWorkflowExecution returns information about the specified workflow execution.
604664
// The errors it can return:
605665
// - BadRequestError

internal/internal_workflow_client_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,3 +845,66 @@ func (s *workflowClientTestSuite) TestGetWorkflowMemo() {
845845
_, err = getWorkflowMemo(input1, nil)
846846
s.Error(err)
847847
}
848+
849+
func (s *workflowClientTestSuite) TestListWorkflow() {
850+
request := &shared.ListWorkflowExecutionsRequest{}
851+
response := &shared.ListWorkflowExecutionsResponse{}
852+
s.service.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil).
853+
Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) {
854+
s.Equal(domain, request.GetDomain())
855+
})
856+
resp, err := s.client.ListWorkflow(context.Background(), request)
857+
s.Nil(err)
858+
s.Equal(response, resp)
859+
860+
responseErr := &shared.BadRequestError{}
861+
request.Domain = common.StringPtr("another")
862+
s.service.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, responseErr).
863+
Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) {
864+
s.Equal("another", request.GetDomain())
865+
})
866+
resp, err = s.client.ListWorkflow(context.Background(), request)
867+
s.Equal(responseErr, err)
868+
}
869+
870+
func (s *workflowClientTestSuite) TestScanWorkflow() {
871+
request := &shared.ListWorkflowExecutionsRequest{}
872+
response := &shared.ListWorkflowExecutionsResponse{}
873+
s.service.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil).
874+
Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) {
875+
s.Equal(domain, request.GetDomain())
876+
})
877+
resp, err := s.client.ScanWorkflow(context.Background(), request)
878+
s.Nil(err)
879+
s.Equal(response, resp)
880+
881+
responseErr := &shared.BadRequestError{}
882+
request.Domain = common.StringPtr("another")
883+
s.service.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, responseErr).
884+
Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) {
885+
s.Equal("another", request.GetDomain())
886+
})
887+
resp, err = s.client.ScanWorkflow(context.Background(), request)
888+
s.Equal(responseErr, err)
889+
}
890+
891+
func (s *workflowClientTestSuite) TestCountWorkflow() {
892+
request := &shared.CountWorkflowExecutionsRequest{}
893+
response := &shared.CountWorkflowExecutionsResponse{}
894+
s.service.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil).
895+
Do(func(_ interface{}, req *shared.CountWorkflowExecutionsRequest, _ ...interface{}) {
896+
s.Equal(domain, request.GetDomain())
897+
})
898+
resp, err := s.client.CountWorkflow(context.Background(), request)
899+
s.Nil(err)
900+
s.Equal(response, resp)
901+
902+
responseErr := &shared.BadRequestError{}
903+
request.Domain = common.StringPtr("another")
904+
s.service.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, responseErr).
905+
Do(func(_ interface{}, req *shared.CountWorkflowExecutionsRequest, _ ...interface{}) {
906+
s.Equal("another", request.GetDomain())
907+
})
908+
resp, err = s.client.CountWorkflow(context.Background(), request)
909+
s.Equal(responseErr, err)
910+
}

mocks/Client.go

Lines changed: 69 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)