Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func (db *CDB) SelectCurrentWorkflow(
}

currentRunID := result["current_run_id"].(gocql.UUID).String()
executionInfo := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
executionInfo, err := parseWorkflowExecutionInfo(result)
if err != nil {
return nil, err
}
lastWriteVersion := constants.EmptyVersion
if result["workflow_last_write_version"] != nil {
lastWriteVersion = result["workflow_last_write_version"].(int64)
Expand Down Expand Up @@ -205,7 +208,10 @@ func (db *CDB) SelectWorkflowExecution(ctx context.Context, shardID int, domainI
}

state := &nosqlplugin.WorkflowExecution{}
info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
info, err := parseWorkflowExecutionInfo(result)
if err != nil {
return nil, err
}
state.ExecutionInfo = info
state.VersionHistories = persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string)))
// TODO: remove this after all 2DC workflows complete
Expand Down Expand Up @@ -357,8 +363,12 @@ func (db *CDB) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pag
result = make(map[string]interface{})
continue
}
executionInfo, err := parseWorkflowExecutionInfo(result)
if err != nil {
return nil, nil, err
}
executions = append(executions, &persistence.InternalListConcreteExecutionsEntity{
ExecutionInfo: parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
ExecutionInfo: executionInfo,
VersionHistories: persistence.NewDataBlob(result["version_histories"].([]byte), constants.EncodingType(result["version_histories_encoding"].(string))),
})
result = make(map[string]interface{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ const (
// TODO: remove replication_state after all 2DC workflows complete
templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, ` +
`child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, ` +
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum ` +
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum, ` +
`next_event_id ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package cassandra

import (
"fmt"
"time"

cql "github.com/gocql/gocql"
Expand All @@ -37,7 +38,11 @@ import (

var _emptyUUID = cql.UUID{}

func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.InternalWorkflowExecutionInfo {
func parseWorkflowExecutionInfo(result map[string]interface{}) (*persistence.InternalWorkflowExecutionInfo, error) {
executionBlob, ok := result["execution"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid execution blob format: missing or invalid 'execution' field")
}
info := &persistence.InternalWorkflowExecutionInfo{}
var completionEventData []byte
var completionEventEncoding constants.EncodingType
Expand All @@ -46,7 +51,7 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
var activeClusterSelectionPolicy []byte
var activeClusterSelectionPolicyEncoding constants.EncodingType

for k, v := range result {
for k, v := range executionBlob {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
Expand Down Expand Up @@ -106,8 +111,6 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
info.LastFirstEventID = v.(int64)
case "last_event_task_id":
info.LastEventTaskID = v.(int64)
case "next_event_id":
info.NextEventID = v.(int64)
case "last_processed_event":
info.LastProcessedEvent = v.(int64)
case "start_time":
Expand Down Expand Up @@ -191,7 +194,12 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
info.CompletionEvent = persistence.NewDataBlob(completionEventData, completionEventEncoding)
info.AutoResetPoints = persistence.NewDataBlob(autoResetPoints, autoResetPointsEncoding)
info.ActiveClusterSelectionPolicy = persistence.NewDataBlob(activeClusterSelectionPolicy, activeClusterSelectionPolicyEncoding)
return info

if nextEventID, ok := result["next_event_id"].(int64); ok {
info.NextEventID = nextEventID
}
Comment on lines +198 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a duplicate way to do what the key/value map iteration above is doing - why do we need this too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is from the result not the execution. The data is duplicated in the db, but we use the next_event_id column for conditional writes. The key/value map iteration is only exploring the execution field data, not the whole execution record

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, this serves as a fallback, we should remove the logic setting this value when iterating the map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. The only thing here is that we use the same path for current executions (although we don't populate the execution blob in there). Are you ok with removing it or is it ok to leave the fallback?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove it


return info, nil
}

// TODO: remove this after all 2DC workflows complete
Expand Down
Loading
Loading