Skip to content

Commit ae9b964

Browse files
authored
Visibility query builder for workflow shadower (#1055)
1 parent e462133 commit ae9b964

File tree

2 files changed

+329
-0
lines changed

2 files changed

+329
-0
lines changed

internal/query_builder.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package internal
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"strings"
7+
"time"
8+
9+
"go.uber.org/cadence/.gen/go/shared"
10+
)
11+
12+
type (
13+
// WorkflowStatus represents the status of a workflow
14+
WorkflowStatus string
15+
)
16+
17+
const (
18+
// WorkflowStatusOpen is the WorkflowStatus for open workflows
19+
WorkflowStatusOpen WorkflowStatus = "OPEN"
20+
// WorkflowStatusClosed is the WorkflowStatus for closed workflows
21+
WorkflowStatusClosed WorkflowStatus = "CLOSED"
22+
)
23+
24+
var (
25+
// WorkflowStatusCompleted is the WorkflowStatus for completed workflow
26+
WorkflowStatusCompleted = WorkflowStatus(shared.WorkflowExecutionCloseStatusCompleted.String())
27+
// WorkflowStatusFailed is the WorkflowStatus for failed workflows
28+
WorkflowStatusFailed = WorkflowStatus(shared.WorkflowExecutionCloseStatusFailed.String())
29+
// WorkflowStatusCanceled is the WorkflowStatus for canceled workflows
30+
WorkflowStatusCanceled = WorkflowStatus(shared.WorkflowExecutionCloseStatusCanceled.String())
31+
// WorkflowStatusTerminated is the WorkflowStatus for terminated workflows
32+
WorkflowStatusTerminated = WorkflowStatus(shared.WorkflowExecutionCloseStatusTerminated.String())
33+
// WorkflowStatusContinuedAsNew is the WorkflowStatus for continuedAsNew workflows
34+
WorkflowStatusContinuedAsNew = WorkflowStatus(shared.WorkflowExecutionCloseStatusContinuedAsNew.String())
35+
// WorkflowStatusTimedOut is the WorkflowStatus for timedout workflows
36+
WorkflowStatusTimedOut = WorkflowStatus(shared.WorkflowExecutionCloseStatusTimedOut.String())
37+
)
38+
39+
const (
40+
keyWorkflowType = "WorkflowType"
41+
keyCloseStatus = "CloseStatus"
42+
keyStartTime = "StartTime"
43+
keyCloseTime = "CloseTime"
44+
)
45+
46+
var (
47+
maxTimestamp = time.Unix(0, math.MaxInt64)
48+
)
49+
50+
type (
51+
// QueryBuilder builds visibility query
52+
QueryBuilder interface {
53+
WorkflowTypes([]string) QueryBuilder
54+
WorkflowStatus([]WorkflowStatus) QueryBuilder
55+
StartTime(time.Time, time.Time) QueryBuilder
56+
Build() string
57+
}
58+
59+
queryBuilderImpl struct {
60+
builder strings.Builder
61+
}
62+
)
63+
64+
// NewQueryBuilder creates a new visibility QueryBuilder
65+
func NewQueryBuilder() QueryBuilder {
66+
return &queryBuilderImpl{}
67+
}
68+
69+
func (q *queryBuilderImpl) WorkflowTypes(types []string) QueryBuilder {
70+
workflowTypeQueries := make([]string, 0, len(types))
71+
for _, workflowType := range types {
72+
workflowTypeQueries = append(workflowTypeQueries, fmt.Sprintf(keyWorkflowType+` = "%v"`, workflowType))
73+
}
74+
q.appendPartialQuery(strings.Join(workflowTypeQueries, " or "))
75+
return q
76+
}
77+
78+
func (q *queryBuilderImpl) WorkflowStatus(statuses []WorkflowStatus) QueryBuilder {
79+
workflowStatusQueries := make([]string, 0, len(statuses))
80+
for _, status := range statuses {
81+
var statusQuery string
82+
switch status {
83+
case WorkflowStatusOpen:
84+
statusQuery = keyCloseTime + " = missing"
85+
case WorkflowStatusClosed:
86+
statusQuery = keyCloseTime + " != missing"
87+
default:
88+
statusQuery = keyCloseStatus + ` = "` + string(status) + `"`
89+
}
90+
workflowStatusQueries = append(workflowStatusQueries, statusQuery)
91+
}
92+
q.appendPartialQuery(strings.Join(workflowStatusQueries, " or "))
93+
return q
94+
}
95+
96+
func (q *queryBuilderImpl) StartTime(minStartTime, maxStartTime time.Time) QueryBuilder {
97+
startTimeQueries := make([]string, 0, 2)
98+
if !minStartTime.IsZero() {
99+
startTimeQueries = append(startTimeQueries, fmt.Sprintf(keyStartTime+` >= %v`, minStartTime.UnixNano()))
100+
}
101+
if !maxStartTime.Equal(maxTimestamp) {
102+
startTimeQueries = append(startTimeQueries, fmt.Sprintf(keyStartTime+` <= %v`, maxStartTime.UnixNano()))
103+
}
104+
105+
q.appendPartialQuery(strings.Join(startTimeQueries, " and "))
106+
return q
107+
}
108+
109+
func (q *queryBuilderImpl) Build() string {
110+
return q.builder.String()
111+
}
112+
113+
func (q *queryBuilderImpl) appendPartialQuery(query string) {
114+
if len(query) == 0 {
115+
return
116+
}
117+
118+
if q.builder.Len() != 0 {
119+
q.builder.WriteString(" and ")
120+
}
121+
122+
q.builder.WriteRune('(')
123+
q.builder.WriteString(query)
124+
q.builder.WriteRune(')')
125+
}
126+
127+
// ToWorkflowStatus converts workflow status from string type to WorkflowStatus type
128+
func ToWorkflowStatus(statusString string) (WorkflowStatus, error) {
129+
status := WorkflowStatus(strings.ToUpper(statusString))
130+
switch status {
131+
case WorkflowStatusOpen, WorkflowStatusClosed, WorkflowStatusCompleted,
132+
WorkflowStatusFailed, WorkflowStatusCanceled, WorkflowStatusTerminated,
133+
WorkflowStatusContinuedAsNew, WorkflowStatusTimedOut:
134+
return status, nil
135+
default:
136+
return "", fmt.Errorf("unknown workflow status: %v", statusString)
137+
}
138+
}

internal/query_builder_test.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package internal
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
"github.com/stretchr/testify/suite"
10+
)
11+
12+
type queryBuilderSuite struct {
13+
*require.Assertions
14+
suite.Suite
15+
}
16+
17+
func TestQueryBuilderSuite(t *testing.T) {
18+
s := new(queryBuilderSuite)
19+
suite.Run(t, s)
20+
}
21+
22+
func (s *queryBuilderSuite) SetupTest() {
23+
s.Assertions = require.New(s.T())
24+
}
25+
26+
func (s *queryBuilderSuite) TestWorkflowTypeQuery() {
27+
testCases := []struct {
28+
msg string
29+
workflowTypes []string
30+
expectedQuery string
31+
}{
32+
{
33+
msg: "empty workflowTypes",
34+
workflowTypes: nil,
35+
expectedQuery: "",
36+
},
37+
{
38+
msg: "single workflowType",
39+
workflowTypes: []string{"testWorkflowType"},
40+
expectedQuery: `(WorkflowType = "testWorkflowType")`,
41+
},
42+
{
43+
msg: "multiple workflowTypes",
44+
workflowTypes: []string{"testWorkflowType1", "testWorkflowType2"},
45+
expectedQuery: `(WorkflowType = "testWorkflowType1" or WorkflowType = "testWorkflowType2")`,
46+
},
47+
}
48+
49+
for _, test := range testCases {
50+
s.T().Run(test.msg, func(t *testing.T) {
51+
builder := NewQueryBuilder()
52+
builder.WorkflowTypes(test.workflowTypes)
53+
s.Equal(test.expectedQuery, builder.Build())
54+
})
55+
}
56+
}
57+
58+
func (s *queryBuilderSuite) TestWorkflowStatusQuery() {
59+
testCases := []struct {
60+
msg string
61+
workflowStatuses []WorkflowStatus
62+
expectedQuery string
63+
}{
64+
{
65+
msg: "empty workflow status",
66+
workflowStatuses: []WorkflowStatus{},
67+
expectedQuery: "",
68+
},
69+
{
70+
msg: "open workflow",
71+
workflowStatuses: []WorkflowStatus{WorkflowStatusOpen},
72+
expectedQuery: "(CloseTime = missing)",
73+
},
74+
{
75+
msg: "closed workflow",
76+
workflowStatuses: []WorkflowStatus{WorkflowStatusClosed},
77+
expectedQuery: "(CloseTime != missing)",
78+
},
79+
{
80+
msg: "multiple workflow statuses",
81+
workflowStatuses: []WorkflowStatus{WorkflowStatusFailed, WorkflowStatusTimedOut},
82+
expectedQuery: `(CloseStatus = "FAILED" or CloseStatus = "TIMED_OUT")`,
83+
},
84+
}
85+
86+
for _, test := range testCases {
87+
s.T().Run(test.msg, func(t *testing.T) {
88+
builder := NewQueryBuilder()
89+
builder.WorkflowStatus(test.workflowStatuses)
90+
s.Equal(test.expectedQuery, builder.Build())
91+
})
92+
}
93+
}
94+
95+
func (s *queryBuilderSuite) TestStartTimeQuery() {
96+
testTimestamp := time.Now()
97+
testCases := []struct {
98+
msg string
99+
minStartTime time.Time
100+
maxStartTime time.Time
101+
expectedQuery string
102+
}{
103+
{
104+
msg: "empty minTimestamp",
105+
maxStartTime: testTimestamp,
106+
expectedQuery: fmt.Sprintf("(StartTime <= %v)", testTimestamp.UnixNano()),
107+
},
108+
{
109+
msg: "max maxTimestamp",
110+
minStartTime: testTimestamp,
111+
maxStartTime: maxTimestamp,
112+
expectedQuery: fmt.Sprintf("(StartTime >= %v)", testTimestamp.UnixNano()),
113+
},
114+
{
115+
msg: "both timestamps are used",
116+
minStartTime: testTimestamp.Add(-time.Hour),
117+
maxStartTime: testTimestamp,
118+
expectedQuery: fmt.Sprintf("(StartTime >= %v and StartTime <= %v)", testTimestamp.Add(-time.Hour).UnixNano(), testTimestamp.UnixNano()),
119+
},
120+
}
121+
122+
for _, test := range testCases {
123+
s.T().Run(test.msg, func(t *testing.T) {
124+
builder := NewQueryBuilder()
125+
builder.StartTime(test.minStartTime, test.maxStartTime)
126+
s.Equal(test.expectedQuery, builder.Build())
127+
})
128+
}
129+
}
130+
131+
func (s *queryBuilderSuite) TestMultipleFilters() {
132+
maxStartTime := time.Now()
133+
minStartTime := maxStartTime.Add(-time.Hour)
134+
135+
builder := NewQueryBuilder().
136+
WorkflowTypes([]string{"testWorkflowType1", "testWorkflowType2"}).
137+
WorkflowStatus([]WorkflowStatus{WorkflowStatusOpen}).
138+
StartTime(minStartTime, maxStartTime)
139+
140+
expectedQuery := fmt.Sprintf(`(WorkflowType = "testWorkflowType1" or WorkflowType = "testWorkflowType2") and (CloseTime = missing) and (StartTime >= %v and StartTime <= %v)`,
141+
minStartTime.UnixNano(),
142+
maxStartTime.UnixNano(),
143+
)
144+
s.Equal(expectedQuery, builder.Build())
145+
}
146+
147+
func (s *queryBuilderSuite) TestToWorkflowStatus() {
148+
testCases := []struct {
149+
msg string
150+
statusString string
151+
expectErr bool
152+
expectedStatus WorkflowStatus
153+
}{
154+
{
155+
msg: "unknown status",
156+
statusString: "unknown",
157+
expectErr: true,
158+
},
159+
{
160+
msg: "lower case status string",
161+
statusString: "open",
162+
expectErr: false,
163+
expectedStatus: WorkflowStatusOpen,
164+
},
165+
{
166+
msg: "mixed case status string",
167+
statusString: "Timed_Out",
168+
expectErr: false,
169+
expectedStatus: WorkflowStatusTimedOut,
170+
},
171+
{
172+
173+
msg: "upper case status string",
174+
statusString: "TERMINATED",
175+
expectErr: false,
176+
expectedStatus: WorkflowStatusTerminated,
177+
},
178+
}
179+
180+
for _, test := range testCases {
181+
s.T().Run(test.msg, func(t *testing.T) {
182+
actualStatus, err := ToWorkflowStatus(test.statusString)
183+
if test.expectErr {
184+
s.Error(err)
185+
return
186+
}
187+
188+
s.Equal(test.expectedStatus, actualStatus)
189+
})
190+
}
191+
}

0 commit comments

Comments
 (0)