Skip to content

Commit 73b12d5

Browse files
authored
Added an option to exclude the list of workflows by Type (#1335)
1 parent 7f81710 commit 73b12d5

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

internal/query_builder.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ var (
7171

7272
type (
7373
// QueryBuilder builds visibility query. It's shadower's own Query builders that processes the shadow filter
74-
// options into a query to pull t he required workflows.
74+
// options into a query to pull the required workflows.
7575

7676
QueryBuilder interface {
7777
WorkflowTypes([]string) QueryBuilder
78+
ExcludeWorkflowTypes([]string) QueryBuilder
7879
WorkflowStatus([]WorkflowStatus) QueryBuilder
7980
StartTime(time.Time, time.Time) QueryBuilder
8081
CloseTime(time.Time, time.Time) QueryBuilder
@@ -100,6 +101,18 @@ func (q *queryBuilderImpl) WorkflowTypes(types []string) QueryBuilder {
100101
return q
101102
}
102103

104+
func (q *queryBuilderImpl) ExcludeWorkflowTypes(types []string) QueryBuilder {
105+
if len(types) == 0 {
106+
return q
107+
}
108+
excludeTypeQueries := make([]string, 0, len(types))
109+
for _, workflowType := range types {
110+
excludeTypeQueries = append(excludeTypeQueries, fmt.Sprintf(keyWorkflowType+` != "%v"`, workflowType))
111+
}
112+
q.appendPartialQuery(strings.Join(excludeTypeQueries, " and "))
113+
return q
114+
}
115+
103116
func (q *queryBuilderImpl) WorkflowStatus(statuses []WorkflowStatus) QueryBuilder {
104117
workflowStatusQueries := make([]string, 0, len(statuses))
105118
for _, status := range statuses {

internal/query_builder_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,38 @@ func (s *queryBuilderSuite) TestStartTimeQuery() {
153153
}
154154
}
155155

156+
func (s *queryBuilderSuite) TestExcludeWorkflowTypesQuery() {
157+
testCases := []struct {
158+
msg string
159+
excludeTypes []string
160+
expectedQuery string
161+
}{
162+
{
163+
msg: "empty excludeTypes",
164+
excludeTypes: nil,
165+
expectedQuery: "",
166+
},
167+
{
168+
msg: "single excludeType",
169+
excludeTypes: []string{"excludedWorkflowType"},
170+
expectedQuery: `(WorkflowType != "excludedWorkflowType")`,
171+
},
172+
{
173+
msg: "multiple excludeTypes",
174+
excludeTypes: []string{"excludedWorkflowType1", "excludedWorkflowType2"},
175+
expectedQuery: `(WorkflowType != "excludedWorkflowType1" and WorkflowType != "excludedWorkflowType2")`,
176+
},
177+
}
178+
179+
for _, test := range testCases {
180+
s.T().Run(test.msg, func(t *testing.T) {
181+
builder := NewQueryBuilder()
182+
builder.ExcludeWorkflowTypes(test.excludeTypes)
183+
s.Equal(test.expectedQuery, builder.Build())
184+
})
185+
}
186+
}
187+
156188
func (s *queryBuilderSuite) TestMultipleFilters() {
157189
maxStartTime := time.Now()
158190
minStartTime := maxStartTime.Add(-time.Hour)

internal/workflow_shadower.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ type (
6262
// default: empty list, which matches all workflow types
6363
WorkflowTypes []string
6464

65+
// Optional: A list of workflow type names that need to be excluded in the query.
66+
// The list will be used to construct WorkflowQuery.The listed workflow types will be excluded from replay.
67+
// default: empty list, which matches all workflow types
68+
ExcludeTypes []string
69+
6570
// Optional: A list of workflow status.
6671
// The list will be used to construct WorkflowQuery. Only workflows with status listed will be replayed.
6772
// accepted values (case-insensitive): OPEN, CLOSED, ALL, COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT
@@ -310,7 +315,15 @@ func (o *ShadowOptions) validateAndPopulateFields() error {
310315
}
311316

312317
if len(o.WorkflowQuery) == 0 {
313-
queryBuilder := NewQueryBuilder().WorkflowTypes(o.WorkflowTypes)
318+
queryBuilder := NewQueryBuilder()
319+
320+
if len(o.WorkflowTypes) > 0 {
321+
queryBuilder.WorkflowTypes(o.WorkflowTypes)
322+
}
323+
324+
if len(o.ExcludeTypes) > 0 {
325+
queryBuilder.ExcludeWorkflowTypes(o.ExcludeTypes)
326+
}
314327

315328
statuses := make([]WorkflowStatus, 0, len(o.WorkflowStatus))
316329
for _, statusString := range o.WorkflowStatus {

internal/workflow_shadower_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
package internal
2222

2323
import (
24+
"context"
25+
"fmt"
2426
"sync"
2527
"testing"
2628
"time"
@@ -243,6 +245,30 @@ func (s *workflowShadowerSuite) TestShadowOptionsValidation() {
243245
}
244246
}
245247

248+
func (s *workflowShadowerSuite) TestShadowOptionsWithExcludeTypes() {
249+
excludeTypes := []string{"excludedType1", "excludedType2"}
250+
options := ShadowOptions{
251+
WorkflowTypes: []string{"includedType1", "includedType2"},
252+
ExcludeTypes: excludeTypes,
253+
Mode: ShadowModeNormal,
254+
}
255+
expectedQuery := fmt.Sprintf(
256+
`(WorkflowType = "includedType1" or WorkflowType = "includedType2") and (WorkflowType != "excludedType1" and WorkflowType != "excludedType2") and (CloseTime = missing)`,
257+
)
258+
shadower, err := NewWorkflowShadower(s.mockService, "testDomain", options, ReplayOptions{}, nil)
259+
s.NoError(err)
260+
s.mockService.EXPECT().
261+
ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).
262+
DoAndReturn(func(ctx context.Context, request *shared.ListWorkflowExecutionsRequest, opts ...interface{}) (*shared.ListWorkflowExecutionsResponse, error) {
263+
s.Equal(expectedQuery, *request.Query)
264+
return &shared.ListWorkflowExecutionsResponse{
265+
Executions: nil,
266+
NextPageToken: nil,
267+
}, nil
268+
}).Times(1)
269+
s.NoError(shadower.shadowWorker())
270+
}
271+
246272
func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ExpirationTime() {
247273
totalWorkflows := 50
248274
timePerWorkflow := 7 * time.Second

0 commit comments

Comments
 (0)