Skip to content

Commit d6421fe

Browse files
committed
Add cron schedule to customized search attribute
1 parent ec6a632 commit d6421fe

File tree

9 files changed

+62
-2
lines changed

9 files changed

+62
-2
lines changed

common/definition/indexedKeys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
ClusterAttributeScope = "ClusterAttributeScope"
4141
ClusterAttributeName = "ClusterAttributeName"
4242
IsCron = "IsCron"
43+
CronSchedule = "CronSchedule"
4344
NumClusters = "NumClusters"
4445
UpdateTime = "UpdateTime"
4546
CustomDomain = "CustomDomain" // to support batch workflow
@@ -76,6 +77,7 @@ func createDefaultIndexedKeys() map[string]interface{} {
7677
CustomDatetimeField: types.IndexedValueTypeDatetime,
7778
CadenceChangeVersion: types.IndexedValueTypeKeyword,
7879
BinaryChecksums: types.IndexedValueTypeKeyword,
80+
CronSchedule: types.IndexedValueTypeKeyword,
7981
CustomDomain: types.IndexedValueTypeString,
8082
Operator: types.IndexedValueTypeString,
8183
}

config/dynamicconfig/development.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ frontend.validSearchAttributes:
6666
service: 1
6767
user: 1
6868
IsDeleted: 4
69+
CronSchedule: 1
6970
constraints: {}
7071
system.writeVisibilityStoreName:
7172
- value: "db"

config/dynamicconfig/development_es.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ frontend.validSearchAttributes:
4040
BinaryChecksums: 1
4141
Passed: 4
4242
ShardID: 2
43+
CronSchedule: 1
4344
system.minRetentionDays:
4445
- value: 0
4546
history.EnableConsistentQueryByDomain:

config/dynamicconfig/development_pinot.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ frontend.validSearchAttributes:
4141
Passed: 4
4242
ShardID: 2
4343
IsDeleted: 4
44+
CronSchedule: 1
4445
system.minRetentionDays:
4546
- value: 0
4647
history.EnableConsistentQueryByDomain:

schema/elasticsearch/os2/visibility/index_template.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@
6161
},
6262
"user": {
6363
"type": "keyword"
64+
},
65+
"CronSchedule": {
66+
"type": "keyword"
6467
}
6568
}
6669
},

schema/elasticsearch/v6/visibility/index_template.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@
8383
"Operator": { "type": "keyword"},
8484
"RolloutID": { "type": "keyword"},
8585
"BinaryChecksums": { "type": "keyword"},
86-
"Passed": { "type": "boolean" }
86+
"Passed": { "type": "boolean" },
87+
"CronSchedule": { "type": "keyword" }
8788
}
8889
}
8990
}

schema/elasticsearch/v7/visibility/index_template.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@
8282
"Operator": { "type": "keyword"},
8383
"RolloutID": { "type": "keyword"},
8484
"BinaryChecksums": { "type": "keyword"},
85-
"Passed": { "type": "boolean" }
85+
"Passed": { "type": "boolean" },
86+
"CronSchedule": { "type": "keyword" }
8687
}
8788
}
8889
}

service/history/task/transfer_active_task_executor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package task
2323

2424
import (
2525
"context"
26+
"encoding/json"
2627
"errors"
2728
"fmt"
2829
"time"
@@ -35,6 +36,7 @@ import (
3536
"github.com/uber/cadence/common/cache"
3637
"github.com/uber/cadence/common/clock"
3738
"github.com/uber/cadence/common/constants"
39+
"github.com/uber/cadence/common/definition"
3840
"github.com/uber/cadence/common/log"
3941
"github.com/uber/cadence/common/log/tag"
4042
"github.com/uber/cadence/common/metrics"
@@ -442,6 +444,18 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
442444
domainName := mutableState.GetDomainEntry().GetInfo().Name
443445
children := mutableState.GetPendingChildExecutionInfos()
444446

447+
// Add CronSchedule to search attributes if it's a cron workflow
448+
if isCron {
449+
searchAttr = copySearchAttributes(searchAttr)
450+
if searchAttr == nil {
451+
searchAttr = make(map[string][]byte)
452+
}
453+
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
454+
if err == nil {
455+
searchAttr[definition.CronSchedule] = cronScheduleBytes
456+
}
457+
}
458+
445459
// we've gathered all necessary information from mutable state.
446460
// release the context lock since we no longer need mutable state builder and
447461
// the rest of logic is making RPC call, which takes time.
@@ -1007,6 +1021,17 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
10071021
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
10081022
updateTimestamp := t.shard.GetTimeSource().Now()
10091023

1024+
// Add CronSchedule to search attributes if it's a cron workflow
1025+
if isCron {
1026+
if searchAttr == nil {
1027+
searchAttr = make(map[string][]byte)
1028+
}
1029+
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
1030+
if err == nil {
1031+
searchAttr[definition.CronSchedule] = cronScheduleBytes
1032+
}
1033+
}
1034+
10101035
// release the context lock since we no longer need mutable state builder and
10111036
// the rest of logic is making RPC call, which takes time.
10121037
release(nil)

service/history/task/transfer_standby_task_executor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ package task
2222

2323
import (
2424
"context"
25+
"encoding/json"
2526
"fmt"
2627
"time"
2728

2829
"github.com/uber/cadence/common/constants"
30+
"github.com/uber/cadence/common/definition"
2931
"github.com/uber/cadence/common/log"
3032
"github.com/uber/cadence/common/log/tag"
3133
"github.com/uber/cadence/common/metrics"
@@ -286,6 +288,18 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
286288
isCron := len(executionInfo.CronSchedule) > 0
287289
updateTimestamp := t.shard.GetTimeSource().Now()
288290

291+
// Add CronSchedule to search attributes if it's a cron workflow
292+
if isCron {
293+
searchAttr = copySearchAttributes(searchAttr)
294+
if searchAttr == nil {
295+
searchAttr = make(map[string][]byte)
296+
}
297+
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
298+
if err == nil {
299+
searchAttr[definition.CronSchedule] = cronScheduleBytes
300+
}
301+
}
302+
289303
lastWriteVersion, err := mutableState.GetLastWriteVersion()
290304
if err != nil {
291305
return nil, err
@@ -537,6 +551,17 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
537551
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
538552
headers := getWorkflowHeaders(startEvent)
539553

554+
// Add CronSchedule to search attributes if it's a cron workflow
555+
if isCron {
556+
if searchAttr == nil {
557+
searchAttr = make(map[string][]byte)
558+
}
559+
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
560+
if err == nil {
561+
searchAttr[definition.CronSchedule] = cronScheduleBytes
562+
}
563+
}
564+
540565
if isRecordStart {
541566
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
542567
return t.recordWorkflowStarted(

0 commit comments

Comments
 (0)