diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go index 1cd69b16a4e..36f98c2af16 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go @@ -97,7 +97,10 @@ func (db *CDB) SelectCurrentWorkflow( } currentRunID := result["current_run_id"].(gocql.UUID).String() - executionInfo := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})) + executionInfo, err := parseWorkflowExecutionInfo(result) + if err != nil { + return nil, err + } lastWriteVersion := constants.EmptyVersion if result["workflow_last_write_version"] != nil { lastWriteVersion = result["workflow_last_write_version"].(int64) @@ -205,7 +208,10 @@ func (db *CDB) SelectWorkflowExecution(ctx context.Context, shardID int, domainI } state := &nosqlplugin.WorkflowExecution{} - info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})) + info, err := parseWorkflowExecutionInfo(result) + if err != nil { + return nil, err + } state.ExecutionInfo = info state.VersionHistories = persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string))) // TODO: remove this after all 2DC workflows complete @@ -357,8 +363,12 @@ func (db *CDB) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pag result = make(map[string]interface{}) continue } + executionInfo, err := parseWorkflowExecutionInfo(result) + if err != nil { + return nil, nil, err + } executions = append(executions, &persistence.InternalListConcreteExecutionsEntity{ - ExecutionInfo: parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})), + ExecutionInfo: executionInfo, VersionHistories: persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string))), }) result = make(map[string]interface{}) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go index 64a07ca3f3d..5250bee9d89 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go @@ -310,7 +310,8 @@ const ( // TODO: remove replication_state after all 2DC workflows complete templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, ` + `child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, ` + - `buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum ` + + `buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum, ` + + `next_event_id ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go index f8094afe3f8..0468e4531cf 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go @@ -22,6 +22,7 @@ package cassandra import ( + "fmt" "time" cql "github.com/gocql/gocql" @@ -37,7 +38,11 @@ import ( var _emptyUUID = cql.UUID{} -func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.InternalWorkflowExecutionInfo { +func parseWorkflowExecutionInfo(result map[string]interface{}) (*persistence.InternalWorkflowExecutionInfo, error) { + executionBlob, ok := result["execution"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid execution blob format: missing or invalid 'execution' field") + } info := &persistence.InternalWorkflowExecutionInfo{} var completionEventData []byte var completionEventEncoding constants.EncodingType @@ -46,7 +51,7 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte var activeClusterSelectionPolicy []byte var activeClusterSelectionPolicyEncoding constants.EncodingType - for k, v := range result { + for k, v := range executionBlob { switch k { case "domain_id": info.DomainID = v.(gocql.UUID).String() @@ -106,8 +111,6 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte info.LastFirstEventID = v.(int64) case "last_event_task_id": info.LastEventTaskID = v.(int64) - case "next_event_id": - info.NextEventID = v.(int64) case "last_processed_event": info.LastProcessedEvent = v.(int64) case "start_time": @@ -191,7 +194,12 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte info.CompletionEvent = persistence.NewDataBlob(completionEventData, completionEventEncoding) info.AutoResetPoints = persistence.NewDataBlob(autoResetPoints, autoResetPointsEncoding) info.ActiveClusterSelectionPolicy = persistence.NewDataBlob(activeClusterSelectionPolicy, activeClusterSelectionPolicyEncoding) - return info + + if nextEventID, ok := result["next_event_id"].(int64); ok { + info.NextEventID = nextEventID + } + + return info, nil } // TODO: remove this after all 2DC workflows complete diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils_test.go index 4fd4cc8e0c6..6422fc9c59c 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils_test.go @@ -59,71 +59,76 @@ func Test_parseWorkflowExecutionInfo(t *testing.T) { timeNow := time.Now() tests := []struct { + name string args map[string]interface{} want *persistence.InternalWorkflowExecutionInfo }{ { + name: "full execution blob", args: map[string]interface{}{ - "domain_id": newMockUUID("domain_id"), - "workflow_id": "workflow_id", - "run_id": newMockUUID("run_id"), - "parent_workflow_id": "parent_workflow_id", - "initiated_id": int64(1), - "completion_event_batch_id": int64(2), - "task_list": "task_list", - "task_list_kind": 2, - "workflow_type_name": "workflow_type_name", - "workflow_timeout": 10, - "decision_task_timeout": 5, - "execution_context": []byte("execution context"), - "state": 1, - "close_status": 2, - "last_first_event_id": int64(3), - "last_event_task_id": int64(4), - "next_event_id": int64(5), - "last_processed_event": int64(6), - "start_time": timeNow, - "last_updated_time": timeNow, - "create_request_id": newMockUUID("create_request_id"), - "signal_count": 7, - "history_size": int64(8), - "decision_version": int64(9), - "decision_schedule_id": int64(10), - "decision_started_id": int64(11), - "decision_request_id": "decision_request_id", - "decision_timeout": 8, - "decision_timestamp": int64(200), - "decision_scheduled_timestamp": int64(201), - "decision_original_scheduled_timestamp": int64(202), - "decision_attempt": int64(203), - "cancel_requested": true, - "cancel_request_id": "cancel_request_id", - "sticky_task_list": "sticky_task_list", - "sticky_schedule_to_start_timeout": 9, - "client_library_version": "client_lib_version", - "client_feature_version": "client_feature_version", - "client_impl": "client_impl", - "attempt": 12, - "has_retry_policy": true, - "init_interval": 10, - "backoff_coefficient": 1.5, - "max_interval": 20, - "max_attempts": 13, - "expiration_time": timeNow, - "non_retriable_errors": []string{"error1", "error2"}, - "branch_token": []byte("branch token"), - "cron_schedule": "cron_schedule", - "expiration_seconds": 14, - "search_attributes": searchAttributes, - "memo": memo, - "partition_config": partitionConfig, - "completion_event": completionEventData, - "completion_event_data_encoding": "Proto3", - "auto_reset_points": autoResetPointsData, - "auto_reset_points_encoding": "Proto3", - "active_cluster_selection_policy": activeClusterSelectionPolicyData, - "active_cluster_selection_policy_encoding": "Proto3", + "execution": map[string]interface{}{ + "domain_id": newMockUUID("domain_id"), + "workflow_id": "workflow_id", + "run_id": newMockUUID("run_id"), + "parent_workflow_id": "parent_workflow_id", + "initiated_id": int64(1), + "completion_event_batch_id": int64(2), + "task_list": "task_list", + "task_list_kind": 2, + "workflow_type_name": "workflow_type_name", + "workflow_timeout": 10, + "decision_task_timeout": 5, + "execution_context": []byte("execution context"), + "state": 1, + "close_status": 2, + "last_first_event_id": int64(3), + "last_event_task_id": int64(4), + "last_processed_event": int64(6), + "start_time": timeNow, + "last_updated_time": timeNow, + "create_request_id": newMockUUID("create_request_id"), + "signal_count": 7, + "history_size": int64(8), + "decision_version": int64(9), + "decision_schedule_id": int64(10), + "decision_started_id": int64(11), + "decision_request_id": "decision_request_id", + "decision_timeout": 8, + "decision_timestamp": int64(200), + "decision_scheduled_timestamp": int64(201), + "decision_original_scheduled_timestamp": int64(202), + "decision_attempt": int64(203), + "cancel_requested": true, + "cancel_request_id": "cancel_request_id", + "sticky_task_list": "sticky_task_list", + "sticky_schedule_to_start_timeout": 9, + "client_library_version": "client_lib_version", + "client_feature_version": "client_feature_version", + "client_impl": "client_impl", + "attempt": 12, + "has_retry_policy": true, + "init_interval": 10, + "backoff_coefficient": 1.5, + "max_interval": 20, + "max_attempts": 13, + "expiration_time": timeNow, + "non_retriable_errors": []string{"error1", "error2"}, + "branch_token": []byte("branch token"), + "cron_schedule": "cron_schedule", + "expiration_seconds": 14, + "search_attributes": searchAttributes, + "memo": memo, + "partition_config": partitionConfig, + "completion_event": completionEventData, + "completion_event_data_encoding": "Proto3", + "auto_reset_points": autoResetPointsData, + "auto_reset_points_encoding": "Proto3", + "active_cluster_selection_policy": activeClusterSelectionPolicyData, + "active_cluster_selection_policy_encoding": "Proto3", + }, + "next_event_id": int64(5), }, + want: &persistence.InternalWorkflowExecutionInfo{ DomainID: "domain_id", WorkflowID: "workflow_id", @@ -178,10 +183,13 @@ func Test_parseWorkflowExecutionInfo(t *testing.T) { }, }, { + name: "uuid fields", args: map[string]interface{}{ - "first_run_id": newMockUUID("first_run_id"), - "parent_domain_id": newMockUUID("parent_domain_id"), - "parent_run_id": newMockUUID("parent_run_id"), + "execution": map[string]interface{}{ + "first_run_id": newMockUUID("first_run_id"), + "parent_domain_id": newMockUUID("parent_domain_id"), + "parent_run_id": newMockUUID("parent_run_id"), + }, }, want: &persistence.InternalWorkflowExecutionInfo{ FirstExecutionRunID: "first_run_id", @@ -190,59 +198,140 @@ func Test_parseWorkflowExecutionInfo(t *testing.T) { }, }, { + name: "empty uuid fields", args: map[string]interface{}{ - "first_run_id": newMockUUID(emptyRunID), - "parent_domain_id": newMockUUID(emptyDomainID), - "parent_run_id": newMockUUID(emptyRunID), + "execution": map[string]interface{}{ + "first_run_id": newMockUUID(emptyRunID), + "parent_domain_id": newMockUUID(emptyDomainID), + "parent_run_id": newMockUUID(emptyRunID), + }, }, want: &persistence.InternalWorkflowExecutionInfo{}, }, { + name: "nil uuid field", args: map[string]interface{}{ - "first_run_id": newMockUUID(cql.UUID{}.String()), + "execution": map[string]interface{}{ + "first_run_id": newMockUUID(cql.UUID{}.String()), + }, }, want: &persistence.InternalWorkflowExecutionInfo{ FirstExecutionRunID: "", }, }, + { + name: "denormalized columns override blob - next_event_id", + args: map[string]interface{}{ + "execution": map[string]interface{}{ + "next_event_id": int64(10), + }, + "next_event_id": int64(5), + }, + want: &persistence.InternalWorkflowExecutionInfo{ + NextEventID: int64(5), + }, + }, + { + name: "no denormalized columns - execution blob values ignored for next_event_id", + args: map[string]interface{}{ + "execution": map[string]interface{}{ + "next_event_id": int64(10), // This is ignored, only denormalized column is used + "state": 2, + }, + }, + want: &persistence.InternalWorkflowExecutionInfo{ + NextEventID: int64(0), // Zero value since denormalized column not present + State: 2, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseWorkflowExecutionInfo(tt.args) + assert.NoError(t, err) + assert.Equal(t, result.FirstExecutionRunID, tt.want.FirstExecutionRunID) + assert.Equal(t, result.DomainID, tt.want.DomainID) + assert.Equal(t, result.WorkflowID, tt.want.WorkflowID) + assert.Equal(t, result.RunID, tt.want.RunID) + assert.Equal(t, result.ParentWorkflowID, tt.want.ParentWorkflowID) + assert.Equal(t, result.InitiatedID, tt.want.InitiatedID) + assert.Equal(t, result.CompletionEventBatchID, tt.want.CompletionEventBatchID) + assert.Equal(t, result.TaskList, tt.want.TaskList) + assert.Equal(t, result.WorkflowTypeName, tt.want.WorkflowTypeName) + assert.Equal(t, result.WorkflowTimeout, tt.want.WorkflowTimeout) + assert.Equal(t, result.DecisionStartToCloseTimeout, tt.want.DecisionStartToCloseTimeout) + assert.Equal(t, result.ExecutionContext, tt.want.ExecutionContext) + assert.Equal(t, result.State, tt.want.State) + assert.Equal(t, result.CloseStatus, tt.want.CloseStatus) + assert.Equal(t, result.LastFirstEventID, tt.want.LastFirstEventID) + assert.Equal(t, result.LastEventTaskID, tt.want.LastEventTaskID) + assert.Equal(t, result.NextEventID, tt.want.NextEventID) + assert.Equal(t, result.LastProcessedEvent, tt.want.LastProcessedEvent) + assert.Equal(t, result.StartTimestamp, tt.want.StartTimestamp) + assert.Equal(t, result.LastUpdatedTimestamp, tt.want.LastUpdatedTimestamp) + assert.Equal(t, result.CreateRequestID, tt.want.CreateRequestID) + assert.Equal(t, result.SignalCount, tt.want.SignalCount) + assert.Equal(t, result.HistorySize, tt.want.HistorySize) + assert.Equal(t, result.DecisionVersion, tt.want.DecisionVersion) + assert.Equal(t, result.DecisionScheduleID, tt.want.DecisionScheduleID) + assert.Equal(t, result.DecisionStartedID, tt.want.DecisionStartedID) + assert.Equal(t, result.DecisionRequestID, tt.want.DecisionRequestID) + assert.Equal(t, result.DecisionTimeout, tt.want.DecisionTimeout) + assert.Equal(t, result.CancelRequested, tt.want.CancelRequested) + assert.Equal(t, result.DecisionStartedTimestamp, tt.want.DecisionStartedTimestamp) + assert.Equal(t, result.DecisionScheduledTimestamp, tt.want.DecisionScheduledTimestamp) + assert.Equal(t, result.DecisionOriginalScheduledTimestamp, tt.want.DecisionOriginalScheduledTimestamp) + assert.Equal(t, result.DecisionAttempt, tt.want.DecisionAttempt) + assert.Equal(t, result.ParentDomainID, tt.want.ParentDomainID) + assert.Equal(t, result.ActiveClusterSelectionPolicy, tt.want.ActiveClusterSelectionPolicy) + }) + } +} + +func Test_parseWorkflowExecutionInfo_ErrorCases(t *testing.T) { + tests := []struct { + name string + args map[string]interface{} + wantErr bool + errContains string + }{ + { + name: "missing execution field", + args: map[string]interface{}{ + "next_event_id": int64(5), + }, + wantErr: true, + errContains: "missing or invalid 'execution' field", + }, + { + name: "invalid execution field type", + args: map[string]interface{}{ + "execution": "not a map", + }, + wantErr: true, + errContains: "missing or invalid 'execution' field", + }, + { + name: "execution field is nil", + args: map[string]interface{}{ + "execution": nil, + }, + wantErr: true, + errContains: "missing or invalid 'execution' field", + }, } for _, tt := range tests { - result := parseWorkflowExecutionInfo(tt.args) - assert.Equal(t, result.FirstExecutionRunID, tt.want.FirstExecutionRunID) - assert.Equal(t, result.DomainID, tt.want.DomainID) - assert.Equal(t, result.WorkflowID, tt.want.WorkflowID) - assert.Equal(t, result.RunID, tt.want.RunID) - assert.Equal(t, result.ParentWorkflowID, tt.want.ParentWorkflowID) - assert.Equal(t, result.InitiatedID, tt.want.InitiatedID) - assert.Equal(t, result.CompletionEventBatchID, tt.want.CompletionEventBatchID) - assert.Equal(t, result.TaskList, tt.want.TaskList) - assert.Equal(t, result.WorkflowTypeName, tt.want.WorkflowTypeName) - assert.Equal(t, result.WorkflowTimeout, tt.want.WorkflowTimeout) - assert.Equal(t, result.DecisionStartToCloseTimeout, tt.want.DecisionStartToCloseTimeout) - assert.Equal(t, result.ExecutionContext, tt.want.ExecutionContext) - assert.Equal(t, result.State, tt.want.State) - assert.Equal(t, result.CloseStatus, tt.want.CloseStatus) - assert.Equal(t, result.LastFirstEventID, tt.want.LastFirstEventID) - assert.Equal(t, result.LastEventTaskID, tt.want.LastEventTaskID) - assert.Equal(t, result.NextEventID, tt.want.NextEventID) - assert.Equal(t, result.LastProcessedEvent, tt.want.LastProcessedEvent) - assert.Equal(t, result.StartTimestamp, tt.want.StartTimestamp) - assert.Equal(t, result.LastUpdatedTimestamp, tt.want.LastUpdatedTimestamp) - assert.Equal(t, result.CreateRequestID, tt.want.CreateRequestID) - assert.Equal(t, result.SignalCount, tt.want.SignalCount) - assert.Equal(t, result.HistorySize, tt.want.HistorySize) - assert.Equal(t, result.DecisionVersion, tt.want.DecisionVersion) - assert.Equal(t, result.DecisionScheduleID, tt.want.DecisionScheduleID) - assert.Equal(t, result.DecisionStartedID, tt.want.DecisionStartedID) - assert.Equal(t, result.DecisionRequestID, tt.want.DecisionRequestID) - assert.Equal(t, result.DecisionTimeout, tt.want.DecisionTimeout) - assert.Equal(t, result.CancelRequested, tt.want.CancelRequested) - assert.Equal(t, result.DecisionStartedTimestamp, tt.want.DecisionStartedTimestamp) - assert.Equal(t, result.DecisionScheduledTimestamp, tt.want.DecisionScheduledTimestamp) - assert.Equal(t, result.DecisionOriginalScheduledTimestamp, tt.want.DecisionOriginalScheduledTimestamp) - assert.Equal(t, result.DecisionAttempt, tt.want.DecisionAttempt) - assert.Equal(t, result.ParentDomainID, tt.want.ParentDomainID) - assert.Equal(t, result.ActiveClusterSelectionPolicy, tt.want.ActiveClusterSelectionPolicy) + t.Run(tt.name, func(t *testing.T) { + result, err := parseWorkflowExecutionInfo(tt.args) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), tt.errContains) + } else { + assert.NoError(t, err) + assert.NotNil(t, result) + } + }) } } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go index 5691d97f9a9..d1e633c29db 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go @@ -71,6 +71,7 @@ func executeCreateWorkflowBatchTransaction( actualRangeID := int64(0) currentExecutionAlreadyExists := false var actualExecution map[string]interface{} + var actualExecutionFullRecord map[string]interface{} runIDMismatch := false actualCurrRunID := "" lastWriteVersionMismatch := false @@ -98,6 +99,7 @@ func executeCreateWorkflowBatchTransaction( } else if rowType == rowTypeExecution && runID == permanentRunID { if currentWorkflowRequest.WriteMode == nosqlplugin.CurrentWorkflowWriteModeInsert { currentExecutionAlreadyExists = true + actualExecutionFullRecord = previous actualExecution, _ = previous["execution"].(map[string]interface{}) if actualExecution != nil { if previous["workflow_last_write_version"] != nil { @@ -177,7 +179,10 @@ func executeCreateWorkflowBatchTransaction( // CreateWorkflowExecution failed because there is already a current execution record for this workflow if currentExecutionAlreadyExists { if actualExecution != nil { - executionInfo := parseWorkflowExecutionInfo(actualExecution) + executionInfo, err := parseWorkflowExecutionInfo(actualExecutionFullRecord) + if err != nil { + return err + } msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v", currentWorkflowRequest.Row.WorkflowID, executionInfo.RunID) return &nosqlplugin.WorkflowOperationConditionFailure{ WorkflowExecutionAlreadyExists: &nosqlplugin.WorkflowExecutionAlreadyExists{