Skip to content

Commit 5a70ce8

Browse files
committed
Fix deterministic check for searchable change version (#889)
This PR includes: Fix non-deterministic error for searchable change version. Fix first upsertSearchAttributes contains redundant attrs from following upsert calls.
1 parent 6c23f50 commit 5a70ce8

File tree

11 files changed

+1225
-9
lines changed

11 files changed

+1225
-9
lines changed

internal/internal_event_handlers.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,12 @@ func (wc *workflowEnvironmentImpl) updateWorkflowInfoWithSearchAttributes(attrib
317317

318318
func mergeSearchAttributes(current, upsert *shared.SearchAttributes) *shared.SearchAttributes {
319319
if current == nil || len(current.IndexedFields) == 0 {
320-
return upsert
320+
if upsert == nil || len(upsert.IndexedFields) == 0 {
321+
return nil
322+
}
323+
current = &shared.SearchAttributes{
324+
IndexedFields: make(map[string][]byte),
325+
}
321326
}
322327

323328
fields := current.IndexedFields
@@ -571,9 +576,6 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio
571576
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
572577
if version, ok := wc.changeVersions[changeID]; ok {
573578
validateVersion(changeID, version, minSupported, maxSupported)
574-
if wc.isReplay {
575-
wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
576-
}
577579
return version
578580
}
579581

internal/internal_event_handlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ func Test_MergeSearchAttributes(t *testing.T) {
194194
name: "currentIsNil",
195195
current: nil,
196196
upsert: &s.SearchAttributes{},
197-
expected: &s.SearchAttributes{},
197+
expected: nil,
198198
},
199199
{
200200
name: "currentIsEmpty",
201201
current: &s.SearchAttributes{IndexedFields: make(map[string][]byte)},
202202
upsert: &s.SearchAttributes{},
203-
expected: &s.SearchAttributes{},
203+
expected: nil,
204204
},
205205
{
206206
name: "normalMerge",

internal/internal_task_handlers.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,20 @@ func skipDeterministicCheckForEvent(e *s.HistoryEvent) bool {
10641064
return false
10651065
}
10661066

1067+
// special check for upsert change version event
1068+
func skipDeterministicCheckForUpsertChangeVersion(events []*s.HistoryEvent, idx int) bool {
1069+
e := events[idx]
1070+
if e.GetEventType() == s.EventTypeMarkerRecorded &&
1071+
e.MarkerRecordedEventAttributes.GetMarkerName() == versionMarkerName &&
1072+
idx < len(events)-1 &&
1073+
events[idx+1].GetEventType() == s.EventTypeUpsertWorkflowSearchAttributes {
1074+
if _, ok := events[idx+1].UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields[CadenceChangeVersion]; ok {
1075+
return true
1076+
}
1077+
}
1078+
return false
1079+
}
1080+
10671081
func matchReplayWithHistory(replayDecisions []*s.Decision, historyEvents []*s.HistoryEvent) error {
10681082
di := 0
10691083
hi := 0
@@ -1074,6 +1088,10 @@ matchLoop:
10741088
var e *s.HistoryEvent
10751089
if hi < hSize {
10761090
e = historyEvents[hi]
1091+
if skipDeterministicCheckForUpsertChangeVersion(historyEvents, hi) {
1092+
hi += 2
1093+
continue matchLoop
1094+
}
10771095
if skipDeterministicCheckForEvent(e) {
10781096
hi++
10791097
continue matchLoop
@@ -1289,7 +1307,10 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
12891307
}
12901308
eventAttributes := e.UpsertWorkflowSearchAttributesEventAttributes
12911309
decisionAttributes := d.UpsertWorkflowSearchAttributesDecisionAttributes
1292-
return isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes)
1310+
if strictMode && !isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes) {
1311+
return false
1312+
}
1313+
return true
12931314
}
12941315

12951316
return false

internal/internal_task_handlers_test.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,7 @@ func Test_NonDeterministicCheck(t *testing.T) {
10671067
func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) {
10681068
diType := s.DecisionTypeUpsertWorkflowSearchAttributes
10691069
eType := s.EventTypeUpsertWorkflowSearchAttributes
1070+
strictMode := false
10701071

10711072
testCases := []struct {
10721073
name string
@@ -1097,7 +1098,7 @@ func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) {
10971098
EventType: &eType,
10981099
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
10991100
},
1100-
expected: false,
1101+
expected: true,
11011102
},
11021103
{
11031104
name: "attributes match",
@@ -1119,7 +1120,37 @@ func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) {
11191120

11201121
for _, testCase := range testCases {
11211122
t.Run(testCase.name, func(t *testing.T) {
1122-
require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, false))
1123+
require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, strictMode))
1124+
})
1125+
}
1126+
1127+
strictMode = true
1128+
1129+
testCases = []struct {
1130+
name string
1131+
decision *s.Decision
1132+
event *s.HistoryEvent
1133+
expected bool
1134+
}{
1135+
{
1136+
name: "attributes not match",
1137+
decision: &s.Decision{
1138+
DecisionType: &diType,
1139+
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
1140+
SearchAttributes: &s.SearchAttributes{},
1141+
},
1142+
},
1143+
event: &s.HistoryEvent{
1144+
EventType: &eType,
1145+
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
1146+
},
1147+
expected: false,
1148+
},
1149+
}
1150+
1151+
for _, testCase := range testCases {
1152+
t.Run(testCase.name, func(t *testing.T) {
1153+
require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, strictMode))
11231154
})
11241155
}
11251156
}

internal/internal_worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,7 @@ func (aw *aggregatedWorker) Start() error {
10361036
var binaryChecksum string
10371037
var binaryChecksumLock sync.Mutex
10381038

1039+
// SetBinaryChecksum set binary checksum
10391040
func SetBinaryChecksum(checksum string) {
10401041
binaryChecksumLock.Lock()
10411042
defer binaryChecksumLock.Unlock()

0 commit comments

Comments
 (0)