Skip to content

Commit d37290d

Browse files
authored
Added CloseTime filter to shadower (#1309)
1 parent 2e73362 commit d37290d

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

internal/query_builder.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ var (
7070
)
7171

7272
type (
73-
// QueryBuilder builds visibility query
73+
// QueryBuilder builds visibility query. It's shadower's own Query builders that processes the shadow filter
74+
// options into a query to pull t he required workflows.
75+
7476
QueryBuilder interface {
7577
WorkflowTypes([]string) QueryBuilder
7678
WorkflowStatus([]WorkflowStatus) QueryBuilder
7779
StartTime(time.Time, time.Time) QueryBuilder
80+
CloseTime(time.Time, time.Time) QueryBuilder
7881
Build() string
7982
}
8083

@@ -131,6 +134,19 @@ func (q *queryBuilderImpl) StartTime(minStartTime, maxStartTime time.Time) Query
131134
return q
132135
}
133136

137+
func (q *queryBuilderImpl) CloseTime(minCloseTime, maxCloseTime time.Time) QueryBuilder {
138+
CloseTimeQueries := make([]string, 0, 2)
139+
if !minCloseTime.IsZero() {
140+
CloseTimeQueries = append(CloseTimeQueries, fmt.Sprintf(keyCloseTime+` >= %v`, minCloseTime.UnixNano()))
141+
}
142+
if !maxCloseTime.Equal(maxTimestamp) {
143+
CloseTimeQueries = append(CloseTimeQueries, fmt.Sprintf(keyCloseTime+` <= %v`, maxCloseTime.UnixNano()))
144+
}
145+
146+
q.appendPartialQuery(strings.Join(CloseTimeQueries, " and "))
147+
return q
148+
}
149+
134150
func (q *queryBuilderImpl) Build() string {
135151
return q.builder.String()
136152
}

internal/workflow_shadower.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ type (
7373
// default: no time filter, which matches all workflow start timestamp
7474
WorkflowStartTimeFilter TimeFilter
7575

76+
// Optional: Min and Max workflow close timestamp.
77+
// Timestamps will be used to construct WorkflowQuery. Only workflows closed within the time range will be replayed. If this filter is being used along with the thee StartTime filter then make sure the Min Close time stamp
78+
// is within the range of Start timestamp.
79+
// default: no time filter, which matches all workflow closed timestamp
80+
WorkflowCloseTimeFilter TimeFilter
81+
7682
// Optional: sampling rate for the workflows matches WorkflowQuery
7783
// only sampled workflows will be replayed
7884
// default: 1.0
@@ -83,7 +89,7 @@ type (
8389
// default: ShadowModeNormal, which means shadowing will complete after all workflows have been replayed
8490
Mode ShadowMode
8591

86-
// Reqired if Mode is set to ShadowModeContinuous: controls when shadowing should complete
92+
// Required if Mode is set to ShadowModeContinuous: controls when shadowing should complete
8793
ExitCondition ShadowExitCondition
8894

8995
// Optional: workflow shadowing concurrency (# of concurrent workflow replay activities)
@@ -204,7 +210,7 @@ func (s *WorkflowShadower) Run() error {
204210
return s.shadowWorker()
205211
}
206212

207-
// Stop stops WorkflowShadower and wait up to one miniute for all goroutines to finish before returning
213+
// Stop stops WorkflowShadower and wait up to one minute for all goroutines to finish before returning
208214
func (s *WorkflowShadower) Stop() {
209215
if !atomic.CompareAndSwapInt32(&s.status, statusStarted, statusStopped) {
210216
return
@@ -300,7 +306,7 @@ func (o *ShadowOptions) validateAndPopulateFields() error {
300306
}
301307

302308
if len(o.WorkflowQuery) != 0 && (len(o.WorkflowTypes) != 0 || len(o.WorkflowStatus) != 0 || !o.WorkflowStartTimeFilter.isEmpty()) {
303-
return errors.New("workflow types, status and start time filter can't be specified when workflow query is specified")
309+
return errors.New("workflow types, status, start time and close time filter can't be specified when workflow query is specified")
304310
}
305311

306312
if len(o.WorkflowQuery) == 0 {
@@ -314,6 +320,8 @@ func (o *ShadowOptions) validateAndPopulateFields() error {
314320
}
315321
statuses = append(statuses, status)
316322
}
323+
//All the open statuses are taken by default. This list seems to not work as expected.
324+
//TODO: verify that the status list works as expected. currently all wfs of all types get picked up.
317325
if len(statuses) == 0 {
318326
statuses = []WorkflowStatus{WorkflowStatusOpen}
319327
}
@@ -326,6 +334,12 @@ func (o *ShadowOptions) validateAndPopulateFields() error {
326334
queryBuilder.StartTime(o.WorkflowStartTimeFilter.MinTimestamp, o.WorkflowStartTimeFilter.MaxTimestamp)
327335
}
328336

337+
if !o.WorkflowCloseTimeFilter.isEmpty() {
338+
if err := o.WorkflowCloseTimeFilter.validateAndPopulateFields(); err != nil {
339+
return fmt.Errorf("invalid close time filter, error: %v", err)
340+
}
341+
queryBuilder.CloseTime(o.WorkflowCloseTimeFilter.MinTimestamp, o.WorkflowCloseTimeFilter.MaxTimestamp)
342+
}
329343
o.WorkflowQuery = queryBuilder.Build()
330344
}
331345

0 commit comments

Comments
 (0)