From f0514cc63365510f51e3b12488f82fe95b44c653 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Wed, 19 Nov 2025 13:34:50 -0800 Subject: [PATCH 1/4] Add cron schedule to customized search attribute Signed-off-by: Neil Xie --- common/definition/indexedKeys.go | 2 ++ config/dynamicconfig/development.yaml | 1 + config/dynamicconfig/development_es.yaml | 1 + config/dynamicconfig/development_pinot.yaml | 1 + .../os2/visibility/index_template.json | 3 +++ .../v6/visibility/index_template.json | 3 ++- .../v7/visibility/index_template.json | 3 ++- .../task/transfer_active_task_executor.go | 25 +++++++++++++++++++ .../task/transfer_standby_task_executor.go | 25 +++++++++++++++++++ 9 files changed, 62 insertions(+), 2 deletions(-) diff --git a/common/definition/indexedKeys.go b/common/definition/indexedKeys.go index 087298a8c84..ff3e8d074c4 100644 --- a/common/definition/indexedKeys.go +++ b/common/definition/indexedKeys.go @@ -40,6 +40,7 @@ const ( ClusterAttributeScope = "ClusterAttributeScope" ClusterAttributeName = "ClusterAttributeName" IsCron = "IsCron" + CronSchedule = "CronSchedule" NumClusters = "NumClusters" UpdateTime = "UpdateTime" CustomDomain = "CustomDomain" // to support batch workflow @@ -76,6 +77,7 @@ func createDefaultIndexedKeys() map[string]interface{} { CustomDatetimeField: types.IndexedValueTypeDatetime, CadenceChangeVersion: types.IndexedValueTypeKeyword, BinaryChecksums: types.IndexedValueTypeKeyword, + CronSchedule: types.IndexedValueTypeKeyword, CustomDomain: types.IndexedValueTypeString, Operator: types.IndexedValueTypeString, } diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index 125829da1de..b0cc3e02b9e 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -66,6 +66,7 @@ frontend.validSearchAttributes: service: 1 user: 1 IsDeleted: 4 + CronSchedule: 1 constraints: {} system.writeVisibilityStoreName: - value: "db" diff --git a/config/dynamicconfig/development_es.yaml b/config/dynamicconfig/development_es.yaml index c7fc0dd838d..cb30d360c50 100644 --- a/config/dynamicconfig/development_es.yaml +++ b/config/dynamicconfig/development_es.yaml @@ -40,6 +40,7 @@ frontend.validSearchAttributes: BinaryChecksums: 1 Passed: 4 ShardID: 2 + CronSchedule: 1 system.minRetentionDays: - value: 0 history.EnableConsistentQueryByDomain: diff --git a/config/dynamicconfig/development_pinot.yaml b/config/dynamicconfig/development_pinot.yaml index 29637314451..b757e7ae6ca 100644 --- a/config/dynamicconfig/development_pinot.yaml +++ b/config/dynamicconfig/development_pinot.yaml @@ -41,6 +41,7 @@ frontend.validSearchAttributes: Passed: 4 ShardID: 2 IsDeleted: 4 + CronSchedule: 1 system.minRetentionDays: - value: 0 history.EnableConsistentQueryByDomain: diff --git a/schema/elasticsearch/os2/visibility/index_template.json b/schema/elasticsearch/os2/visibility/index_template.json index 5984323c85e..245d4c3ecda 100644 --- a/schema/elasticsearch/os2/visibility/index_template.json +++ b/schema/elasticsearch/os2/visibility/index_template.json @@ -61,6 +61,9 @@ }, "user": { "type": "keyword" + }, + "CronSchedule": { + "type": "keyword" } } }, diff --git a/schema/elasticsearch/v6/visibility/index_template.json b/schema/elasticsearch/v6/visibility/index_template.json index d401954131f..20042dbc181 100644 --- a/schema/elasticsearch/v6/visibility/index_template.json +++ b/schema/elasticsearch/v6/visibility/index_template.json @@ -83,7 +83,8 @@ "Operator": { "type": "keyword"}, "RolloutID": { "type": "keyword"}, "BinaryChecksums": { "type": "keyword"}, - "Passed": { "type": "boolean" } + "Passed": { "type": "boolean" }, + "CronSchedule": { "type": "keyword" } } } } diff --git a/schema/elasticsearch/v7/visibility/index_template.json b/schema/elasticsearch/v7/visibility/index_template.json index f426b4f69f4..1da61b8ecca 100644 --- a/schema/elasticsearch/v7/visibility/index_template.json +++ b/schema/elasticsearch/v7/visibility/index_template.json @@ -82,7 +82,8 @@ "Operator": { "type": "keyword"}, "RolloutID": { "type": "keyword"}, "BinaryChecksums": { "type": "keyword"}, - "Passed": { "type": "boolean" } + "Passed": { "type": "boolean" }, + "CronSchedule": { "type": "keyword" } } } } diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 6d6143938f1..3afd916ebe5 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -23,6 +23,7 @@ package task import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -35,6 +36,7 @@ import ( "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -442,6 +444,18 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper( domainName := mutableState.GetDomainEntry().GetInfo().Name children := mutableState.GetPendingChildExecutionInfos() + // Add CronSchedule to search attributes if it's a cron workflow + if isCron { + searchAttr = copySearchAttributes(searchAttr) + if searchAttr == nil { + searchAttr = make(map[string][]byte) + } + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err == nil { + searchAttr[definition.CronSchedule] = cronScheduleBytes + } + } + // we've gathered all necessary information from mutable state. // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. @@ -1007,6 +1021,17 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) updateTimestamp := t.shard.GetTimeSource().Now() + // Add CronSchedule to search attributes if it's a cron workflow + if isCron { + if searchAttr == nil { + searchAttr = make(map[string][]byte) + } + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err == nil { + searchAttr[definition.CronSchedule] = cronScheduleBytes + } + } + // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 7edb45f5ef2..d64f2147198 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -22,10 +22,12 @@ package task import ( "context" + "encoding/json" "fmt" "time" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -286,6 +288,18 @@ func (t *transferStandbyTaskExecutor) processCloseExecution( isCron := len(executionInfo.CronSchedule) > 0 updateTimestamp := t.shard.GetTimeSource().Now() + // Add CronSchedule to search attributes if it's a cron workflow + if isCron { + searchAttr = copySearchAttributes(searchAttr) + if searchAttr == nil { + searchAttr = make(map[string][]byte) + } + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err == nil { + searchAttr[definition.CronSchedule] = cronScheduleBytes + } + } + lastWriteVersion, err := mutableState.GetLastWriteVersion() if err != nil { return nil, err @@ -537,6 +551,17 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper searchAttr := copySearchAttributes(executionInfo.SearchAttributes) headers := getWorkflowHeaders(startEvent) + // Add CronSchedule to search attributes if it's a cron workflow + if isCron { + if searchAttr == nil { + searchAttr = make(map[string][]byte) + } + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err == nil { + searchAttr[definition.CronSchedule] = cronScheduleBytes + } + } + if isRecordStart { workflowStartedScope.IncCounter(metrics.WorkflowStartedCount) return t.recordWorkflowStarted( From 653167cb859c72bd6386694e627874ca5b628c5b Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Wed, 19 Nov 2025 14:15:52 -0800 Subject: [PATCH 2/4] Fix test Signed-off-by: Neil Xie --- .../transfer_active_task_executor_test.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 09778258dcb..03a446b4e4d 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -2242,6 +2242,17 @@ func createRecordWorkflowExecutionStartedRequest( "Header_context_key": contextValueJSONString, } } + // Add CronSchedule to search attributes if it's a cron workflow + if len(executionInfo.CronSchedule) > 0 { + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err != nil { + t.Fatal(err) + } + if searchAttributes == nil { + searchAttributes = make(map[string][]byte) + } + searchAttributes[definition.CronSchedule] = cronScheduleBytes + } return &persistence.RecordWorkflowExecutionStartedRequest{ Domain: domainName, DomainUUID: taskInfo.DomainID, @@ -2293,6 +2304,17 @@ func createRecordWorkflowExecutionClosedRequest( "Header_context_key": contextValueJSONString, } } + // Add CronSchedule to search attributes if it's a cron workflow + if len(executionInfo.CronSchedule) > 0 { + cronScheduleBytes, err := json.Marshal(executionInfo.CronSchedule) + if err != nil { + t.Fatal(err) + } + if searchAttributes == nil { + searchAttributes = make(map[string][]byte) + } + searchAttributes[definition.CronSchedule] = cronScheduleBytes + } return &persistence.RecordWorkflowExecutionClosedRequest{ Domain: domainName, DomainUUID: taskInfo.DomainID, From d95cd2c58dcb8a045dbc58a157eba3f9513cbfc6 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Wed, 19 Nov 2025 15:07:55 -0800 Subject: [PATCH 3/4] Fix integration test Signed-off-by: Neil Xie --- host/elastic_search_test.go | 18 +++++++++++++----- .../task/transfer_active_task_executor_test.go | 1 + 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/host/elastic_search_test.go b/host/elastic_search_test.go index a1a5f1b042e..dea59c5a83a 100644 --- a/host/elastic_search_test.go +++ b/host/elastic_search_test.go @@ -291,7 +291,7 @@ func (s *ElasticSearchIntegrationSuite) TestListWorkflow_SearchAttribute() { we, err := s.Engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal) - s.testHelperForReadOnce(we.GetRunID(), query, false, false) + s.testHelperForReadOnceWithTestSearchAttributes(we.GetRunID(), query, false, false, true) // test upsert dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType, @@ -767,10 +767,14 @@ func (s *ElasticSearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, p } func (s *ElasticSearchIntegrationSuite) testHelperForReadOnce(runID, query string, isScan bool, isAnyMatchOk bool) { - s.testHelperForReadOnceWithDomain(s.DomainName, runID, query, isScan, isAnyMatchOk) + s.testHelperForReadOnceWithTestSearchAttributes(runID, query, isScan, isAnyMatchOk, false) +} + +func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithTestSearchAttributes(runID, query string, isScan bool, isAnyMatchOk bool, checkTestSearchAttr bool) { + s.testHelperForReadOnceWithDomain(s.DomainName, runID, query, isScan, isAnyMatchOk, checkTestSearchAttr) } -func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithDomain(domainName string, runID, query string, isScan bool, isAnyMatchOk bool) { +func (s *ElasticSearchIntegrationSuite) testHelperForReadOnceWithDomain(domainName string, runID, query string, isScan bool, isAnyMatchOk bool, checkTestSearchAttr bool) { var openExecution *types.WorkflowExecutionInfo listRequest := &types.ListWorkflowExecutionsRequest{ Domain: domainName, @@ -814,8 +818,12 @@ Retry: s.NotNil(openExecution) s.Equal(runID, openExecution.GetExecution().GetRunID()) s.True(openExecution.GetExecutionTime() >= openExecution.GetStartTime()) - if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 { + if checkTestSearchAttr { + // Test explicitly expects the test search attribute to be present + s.NotNil(openExecution.SearchAttributes) + s.True(len(openExecution.SearchAttributes.GetIndexedFields()) > 0) searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[s.testSearchAttributeKey] + s.NotNil(searchValBytes) var searchVal string json.Unmarshal(searchValBytes, &searchVal) s.Equal(s.testSearchAttributeVal, searchVal) @@ -873,7 +881,7 @@ func (s *ElasticSearchIntegrationSuite) TestScanWorkflow_SearchAttribute() { we, err := s.Engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal) - s.testHelperForReadOnce(we.GetRunID(), query, true, false) + s.testHelperForReadOnceWithTestSearchAttributes(we.GetRunID(), query, true, false, true) } func (s *ElasticSearchIntegrationSuite) TestScanWorkflow_PageToken() { diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 03a446b4e4d..b230ff8f639 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -46,6 +46,7 @@ import ( "github.com/uber/cadence/common/archiver/provider" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/mocks" From 554b773da9538604c89b8727f5e470ff15039fe1 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Thu, 20 Nov 2025 09:15:47 -0800 Subject: [PATCH 4/4] One more fix for integration test Signed-off-by: Neil Xie --- host/elastic_search_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host/elastic_search_test.go b/host/elastic_search_test.go index dea59c5a83a..7a68b1166b1 100644 --- a/host/elastic_search_test.go +++ b/host/elastic_search_test.go @@ -202,7 +202,7 @@ func (s *ElasticSearchIntegrationSuite) TestListWorkflowByClusterAttribute() { s.Equal("us-east", descResp.WorkflowExecutionInfo.ActiveClusterSelectionPolicy.GetClusterAttribute().GetName()) query := `ClusterAttributeScope = "region" and ClusterAttributeName = "us-east"` - s.testHelperForReadOnceWithDomain(s.ActiveActiveDomainName, we.GetRunID(), query, false, false) + s.testHelperForReadOnceWithDomain(s.ActiveActiveDomainName, we.GetRunID(), query, false, false, false) } func (s *ElasticSearchIntegrationSuite) startWorkflow(