Skip to content

Commit 18b584f

Browse files
authored
fix: use next_event_id column as source of truth when reading workflow execution from Cassandra (#7738)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** Read from the denormalized columns next_event_id (that duplicate the data next_event_id from the execution field) and set to InternalWorkflowExecutionInfo when reading concrete execution data from Cassandra. <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** Cassandra stores values from the same row but different columns in different places on disk, rather than as a single, contiguous row block. It's possible that the denormalized columns get out of sync with the execution blob in the execution field. This denormalized column is used as conditional write when updating the execution record for concrete workflow executions. By reading it and setting it on InternalWorkflowExecutionInfo we can leverage the checksum verification to detect differences between the denormalized next_event_id column and the next_event_id in the execution blob field (used to calculate the checksum) and identify corrupt workflows quicker and with more precision. <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** go test ./common/persistence/nosql/nosqlplugin/cassandra -run Test_parseWorkflowExecutionInfo <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** We are changing how we read data from Cassandra and if we are doing incorrectly that could cause workflows to be corrupt/stuck. We are also changing the argument to the parsing function and we had to modify/use the whole result instead of passing only the "execution". If this is wrong it could cause issues in parsing and affect workflows. <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** Improve workflow corruption detection in Cassandra by reading from next_event_id denormalized column and checking against checksum in checksum verification. <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain) --------- Signed-off-by: fimanishi <fimanishi@gmail.com>
1 parent 87ecb5a commit 18b584f

File tree

5 files changed

+225
-112
lines changed

5 files changed

+225
-112
lines changed

common/persistence/nosql/nosqlplugin/cassandra/workflow.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ func (db *CDB) SelectCurrentWorkflow(
9797
}
9898

9999
currentRunID := result["current_run_id"].(gocql.UUID).String()
100-
executionInfo := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
100+
executionInfo, err := parseWorkflowExecutionInfo(result)
101+
if err != nil {
102+
return nil, err
103+
}
101104
lastWriteVersion := constants.EmptyVersion
102105
if result["workflow_last_write_version"] != nil {
103106
lastWriteVersion = result["workflow_last_write_version"].(int64)
@@ -205,7 +208,10 @@ func (db *CDB) SelectWorkflowExecution(ctx context.Context, shardID int, domainI
205208
}
206209

207210
state := &nosqlplugin.WorkflowExecution{}
208-
info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
211+
info, err := parseWorkflowExecutionInfo(result)
212+
if err != nil {
213+
return nil, err
214+
}
209215
state.ExecutionInfo = info
210216
state.VersionHistories = persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string)))
211217
// TODO: remove this after all 2DC workflows complete
@@ -357,8 +363,12 @@ func (db *CDB) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pag
357363
result = make(map[string]interface{})
358364
continue
359365
}
366+
executionInfo, err := parseWorkflowExecutionInfo(result)
367+
if err != nil {
368+
return nil, nil, err
369+
}
360370
executions = append(executions, &persistence.InternalListConcreteExecutionsEntity{
361-
ExecutionInfo: parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
371+
ExecutionInfo: executionInfo,
362372
VersionHistories: persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string))),
363373
})
364374
result = make(map[string]interface{})

common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ const (
312312
// TODO: remove replication_state after all 2DC workflows complete
313313
templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, ` +
314314
`child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, ` +
315-
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum ` +
315+
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum, ` +
316+
`next_event_id ` +
316317
`FROM executions ` +
317318
`WHERE shard_id = ? ` +
318319
`and type = ? ` +

common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package cassandra
2323

2424
import (
25+
"fmt"
2526
"time"
2627

2728
cql "github.com/gocql/gocql"
@@ -37,7 +38,11 @@ import (
3738

3839
var _emptyUUID = cql.UUID{}
3940

40-
func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.InternalWorkflowExecutionInfo {
41+
func parseWorkflowExecutionInfo(result map[string]interface{}) (*persistence.InternalWorkflowExecutionInfo, error) {
42+
executionBlob, ok := result["execution"].(map[string]interface{})
43+
if !ok {
44+
return nil, fmt.Errorf("invalid execution blob format: missing or invalid 'execution' field")
45+
}
4146
info := &persistence.InternalWorkflowExecutionInfo{}
4247
var completionEventData []byte
4348
var completionEventEncoding constants.EncodingType
@@ -46,7 +51,7 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
4651
var activeClusterSelectionPolicy []byte
4752
var activeClusterSelectionPolicyEncoding constants.EncodingType
4853

49-
for k, v := range result {
54+
for k, v := range executionBlob {
5055
switch k {
5156
case "domain_id":
5257
info.DomainID = v.(gocql.UUID).String()
@@ -106,8 +111,6 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
106111
info.LastFirstEventID = v.(int64)
107112
case "last_event_task_id":
108113
info.LastEventTaskID = v.(int64)
109-
case "next_event_id":
110-
info.NextEventID = v.(int64)
111114
case "last_processed_event":
112115
info.LastProcessedEvent = v.(int64)
113116
case "start_time":
@@ -191,7 +194,12 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
191194
info.CompletionEvent = persistence.NewDataBlob(completionEventData, completionEventEncoding)
192195
info.AutoResetPoints = persistence.NewDataBlob(autoResetPoints, autoResetPointsEncoding)
193196
info.ActiveClusterSelectionPolicy = persistence.NewDataBlob(activeClusterSelectionPolicy, activeClusterSelectionPolicyEncoding)
194-
return info
197+
198+
if nextEventID, ok := result["next_event_id"].(int64); ok {
199+
info.NextEventID = nextEventID
200+
}
201+
202+
return info, nil
195203
}
196204

197205
// TODO: remove this after all 2DC workflows complete

0 commit comments

Comments
 (0)