Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
ClusterAttributeScope = "ClusterAttributeScope"
ClusterAttributeName = "ClusterAttributeName"
IsCron = "IsCron"
CronSchedule = "CronSchedule"
NumClusters = "NumClusters"
UpdateTime = "UpdateTime"
CustomDomain = "CustomDomain" // to support batch workflow
Expand Down Expand Up @@ -76,6 +77,7 @@ func createDefaultIndexedKeys() map[string]interface{} {
CustomDatetimeField: types.IndexedValueTypeDatetime,
CadenceChangeVersion: types.IndexedValueTypeKeyword,
BinaryChecksums: types.IndexedValueTypeKeyword,
CronSchedule: types.IndexedValueTypeKeyword,
CustomDomain: types.IndexedValueTypeString,
Operator: types.IndexedValueTypeString,
}
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ frontend.validSearchAttributes:
service: 1
user: 1
IsDeleted: 4
CronSchedule: 1
constraints: {}
system.writeVisibilityStoreName:
- value: "db"
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ frontend.validSearchAttributes:
BinaryChecksums: 1
Passed: 4
ShardID: 2
CronSchedule: 1
system.minRetentionDays:
- value: 0
history.EnableConsistentQueryByDomain:
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_pinot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ frontend.validSearchAttributes:
Passed: 4
ShardID: 2
IsDeleted: 4
CronSchedule: 1
system.minRetentionDays:
- value: 0
history.EnableConsistentQueryByDomain:
Expand Down
20 changes: 14 additions & 6 deletions host/elastic_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *ElasticSearchIntegrationSuite) TestListWorkflowByClusterAttribute() {
s.Equal("us-east", descResp.WorkflowExecutionInfo.ActiveClusterSelectionPolicy.GetClusterAttribute().GetName())

query := `ClusterAttributeScope = "region" and ClusterAttributeName = "us-east"`
s.testHelperForReadOnceWithDomain(s.ActiveActiveDomainName, we.GetRunID(), query, false, false)
s.testHelperForReadOnceWithDomain(s.ActiveActiveDomainName, we.GetRunID(), query, false, false, false)
}

func (s *ElasticSearchIntegrationSuite) startWorkflow(
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *ElasticSearchIntegrationSuite) TestListWorkflow_SearchAttribute() {
we, err := s.Engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)
s.testHelperForReadOnce(we.GetRunID(), query, false, false)
s.testHelperForReadOnceWithTestSearchAttributes(we.GetRunID(), query, false, false, true)

// test upsert
dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType,
Expand Down Expand Up @@ -767,10 +767,14 @@ func (s *ElasticSearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, p
}

func (s *ElasticSearchIntegrationSuite) testHelperForReadOnce(runID, query string, isScan bool, isAnyMatchOk bool) {
s.testHelperForReadOnceWithDomain(s.DomainName, runID, query, isScan, isAnyMatchOk)
s.testHelperForReadOnceWithTestSearchAttributes(runID, query, isScan, isAnyMatchOk, false)
}

func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithTestSearchAttributes(runID, query string, isScan bool, isAnyMatchOk bool, checkTestSearchAttr bool) {
s.testHelperForReadOnceWithDomain(s.DomainName, runID, query, isScan, isAnyMatchOk, checkTestSearchAttr)
}

func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithDomain(domainName string, runID, query string, isScan bool, isAnyMatchOk bool) {
func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithDomain(domainName string, runID, query string, isScan bool, isAnyMatchOk bool, checkTestSearchAttr bool) {
var openExecution *types.WorkflowExecutionInfo
listRequest := &types.ListWorkflowExecutionsRequest{
Domain: domainName,
Expand Down Expand Up @@ -814,8 +818,12 @@ Retry:
s.NotNil(openExecution)
s.Equal(runID, openExecution.GetExecution().GetRunID())
s.True(openExecution.GetExecutionTime() >= openExecution.GetStartTime())
if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 {
if checkTestSearchAttr {
// Test explicitly expects the test search attribute to be present
s.NotNil(openExecution.SearchAttributes)
s.True(len(openExecution.SearchAttributes.GetIndexedFields()) > 0)
searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[s.testSearchAttributeKey]
s.NotNil(searchValBytes)
var searchVal string
json.Unmarshal(searchValBytes, &searchVal)
s.Equal(s.testSearchAttributeVal, searchVal)
Expand Down Expand Up @@ -873,7 +881,7 @@ func (s *ElasticSearchIntegrationSuite) TestScanWorkflow_SearchAttribute() {
we, err := s.Engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)
s.testHelperForReadOnce(we.GetRunID(), query, true, false)
s.testHelperForReadOnceWithTestSearchAttributes(we.GetRunID(), query, true, false, true)
}

func (s *ElasticSearchIntegrationSuite) TestScanWorkflow_PageToken() {
Expand Down
3 changes: 3 additions & 0 deletions schema/elasticsearch/os2/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
},
"user": {
"type": "keyword"
},
"CronSchedule": {
"type": "keyword"
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion schema/elasticsearch/v6/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@
"Operator": { "type": "keyword"},
"RolloutID": { "type": "keyword"},
"BinaryChecksums": { "type": "keyword"},
"Passed": { "type": "boolean" }
"Passed": { "type": "boolean" },
"CronSchedule": { "type": "keyword" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between adding it here and adding it outside Attr property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside attr is the customized search attributes, outside is the system columns

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it a system column?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking not all workflows need this field, but it should be ok since the field will not be present in visibility store if we don't send them. And we will no longer need to worry about if oss user has used them in search attribute

}
}
}
Expand Down
3 changes: 2 additions & 1 deletion schema/elasticsearch/v7/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@
"Operator": { "type": "keyword"},
"RolloutID": { "type": "keyword"},
"BinaryChecksums": { "type": "keyword"},
"Passed": { "type": "boolean" }
"Passed": { "type": "boolean" },
"CronSchedule": { "type": "keyword" }
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package task

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -442,6 +444,18 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
domainName := mutableState.GetDomainEntry().GetInfo().Name
children := mutableState.GetPendingChildExecutionInfos()

// Add CronSchedule to search attributes if it's a cron workflow
if isCron {
searchAttr = copySearchAttributes(searchAttr)
if searchAttr == nil {
searchAttr = make(map[string][]byte)
}
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err == nil {
searchAttr[definition.CronSchedule] = cronScheduleBytes
}
}

// we've gathered all necessary information from mutable state.
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
Expand Down Expand Up @@ -1007,6 +1021,17 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()

// Add CronSchedule to search attributes if it's a cron workflow
if isCron {
if searchAttr == nil {
searchAttr = make(map[string][]byte)
}
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err == nil {
searchAttr[definition.CronSchedule] = cronScheduleBytes
}
}

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand Down
23 changes: 23 additions & 0 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/mocks"
Expand Down Expand Up @@ -2242,6 +2243,17 @@ func createRecordWorkflowExecutionStartedRequest(
"Header_context_key": contextValueJSONString,
}
}
// Add CronSchedule to search attributes if it's a cron workflow
if len(executionInfo.CronSchedule) > 0 {
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err != nil {
t.Fatal(err)
}
if searchAttributes == nil {
searchAttributes = make(map[string][]byte)
}
searchAttributes[definition.CronSchedule] = cronScheduleBytes
}
return &persistence.RecordWorkflowExecutionStartedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Expand Down Expand Up @@ -2293,6 +2305,17 @@ func createRecordWorkflowExecutionClosedRequest(
"Header_context_key": contextValueJSONString,
}
}
// Add CronSchedule to search attributes if it's a cron workflow
if len(executionInfo.CronSchedule) > 0 {
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err != nil {
t.Fatal(err)
}
if searchAttributes == nil {
searchAttributes = make(map[string][]byte)
}
searchAttributes[definition.CronSchedule] = cronScheduleBytes
}
return &persistence.RecordWorkflowExecutionClosedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Expand Down
25 changes: 25 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package task

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -286,6 +288,18 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := t.shard.GetTimeSource().Now()

// Add CronSchedule to search attributes if it's a cron workflow
if isCron {
searchAttr = copySearchAttributes(searchAttr)
if searchAttr == nil {
searchAttr = make(map[string][]byte)
}
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err == nil {
searchAttr[definition.CronSchedule] = cronScheduleBytes
}
}

lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return nil, err
Expand Down Expand Up @@ -537,6 +551,17 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
headers := getWorkflowHeaders(startEvent)

// Add CronSchedule to search attributes if it's a cron workflow
if isCron {
if searchAttr == nil {
searchAttr = make(map[string][]byte)
}
cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule)
if err == nil {
searchAttr[definition.CronSchedule] = cronScheduleBytes
}
}

if isRecordStart {
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
return t.recordWorkflowStarted(
Expand Down