Skip to content

Commit 624721d

Browse files
authored
Extracting the replayer specific utilities into a separate file for readability. (#1244)
* Fixed the spelling of replay_test file. * Added the Activity Registration required failure scenario to replayer test suite * Documentation change * Removed extra file * Extracting the replayer specific utilities into a separate file for readability. * Pulling more helper functions * Separated all the matching cases
1 parent 1fed700 commit 624721d

File tree

2 files changed

+302
-294
lines changed

2 files changed

+302
-294
lines changed

internal/internal_task_handlers.go

Lines changed: 0 additions & 294 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ package internal
2424
// All code in this file is private to the package.
2525

2626
import (
27-
"bytes"
2827
"context"
2928
"errors"
3029
"fmt"
@@ -44,7 +43,6 @@ import (
4443
"go.uber.org/cadence/internal/common/backoff"
4544
"go.uber.org/cadence/internal/common/cache"
4645
"go.uber.org/cadence/internal/common/metrics"
47-
"go.uber.org/cadence/internal/common/util"
4846
)
4947

5048
const (
@@ -1188,298 +1186,6 @@ func (w *workflowExecutionContextImpl) GetDecisionTimeout() time.Duration {
11881186
return time.Second * time.Duration(w.workflowInfo.TaskStartToCloseTimeoutSeconds)
11891187
}
11901188

1191-
func skipDeterministicCheckForDecision(d *s.Decision) bool {
1192-
if d.GetDecisionType() == s.DecisionTypeRecordMarker {
1193-
markerName := d.RecordMarkerDecisionAttributes.GetMarkerName()
1194-
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
1195-
return true
1196-
}
1197-
}
1198-
return false
1199-
}
1200-
1201-
func skipDeterministicCheckForEvent(e *s.HistoryEvent) bool {
1202-
if e.GetEventType() == s.EventTypeMarkerRecorded {
1203-
markerName := e.MarkerRecordedEventAttributes.GetMarkerName()
1204-
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
1205-
return true
1206-
}
1207-
}
1208-
return false
1209-
}
1210-
1211-
// special check for upsert change version event
1212-
func skipDeterministicCheckForUpsertChangeVersion(events []*s.HistoryEvent, idx int) bool {
1213-
e := events[idx]
1214-
if e.GetEventType() == s.EventTypeMarkerRecorded &&
1215-
e.MarkerRecordedEventAttributes.GetMarkerName() == versionMarkerName &&
1216-
idx < len(events)-1 &&
1217-
events[idx+1].GetEventType() == s.EventTypeUpsertWorkflowSearchAttributes {
1218-
if _, ok := events[idx+1].UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields[CadenceChangeVersion]; ok {
1219-
return true
1220-
}
1221-
}
1222-
return false
1223-
}
1224-
1225-
func matchReplayWithHistory(replayDecisions []*s.Decision, historyEvents []*s.HistoryEvent) error {
1226-
di := 0
1227-
hi := 0
1228-
hSize := len(historyEvents)
1229-
dSize := len(replayDecisions)
1230-
matchLoop:
1231-
for hi < hSize || di < dSize {
1232-
var e *s.HistoryEvent
1233-
if hi < hSize {
1234-
e = historyEvents[hi]
1235-
if skipDeterministicCheckForUpsertChangeVersion(historyEvents, hi) {
1236-
hi += 2
1237-
continue matchLoop
1238-
}
1239-
if skipDeterministicCheckForEvent(e) {
1240-
hi++
1241-
continue matchLoop
1242-
}
1243-
}
1244-
1245-
var d *s.Decision
1246-
if di < dSize {
1247-
d = replayDecisions[di]
1248-
if skipDeterministicCheckForDecision(d) {
1249-
di++
1250-
continue matchLoop
1251-
}
1252-
}
1253-
1254-
if d == nil {
1255-
return fmt.Errorf("nondeterministic workflow: missing replay decision for %s", util.HistoryEventToString(e))
1256-
}
1257-
1258-
if e == nil {
1259-
return fmt.Errorf("nondeterministic workflow: extra replay decision for %s", util.DecisionToString(d))
1260-
}
1261-
1262-
if !isDecisionMatchEvent(d, e, false) {
1263-
return fmt.Errorf("nondeterministic workflow: history event is %s, replay decision is %s",
1264-
util.HistoryEventToString(e), util.DecisionToString(d))
1265-
}
1266-
1267-
di++
1268-
hi++
1269-
}
1270-
return nil
1271-
}
1272-
1273-
func lastPartOfName(name string) string {
1274-
name = strings.TrimSuffix(name, "-fm")
1275-
lastDotIdx := strings.LastIndex(name, ".")
1276-
if lastDotIdx < 0 || lastDotIdx == len(name)-1 {
1277-
return name
1278-
}
1279-
return name[lastDotIdx+1:]
1280-
}
1281-
1282-
func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) bool {
1283-
switch d.GetDecisionType() {
1284-
case s.DecisionTypeScheduleActivityTask:
1285-
if e.GetEventType() != s.EventTypeActivityTaskScheduled {
1286-
return false
1287-
}
1288-
eventAttributes := e.ActivityTaskScheduledEventAttributes
1289-
decisionAttributes := d.ScheduleActivityTaskDecisionAttributes
1290-
1291-
if eventAttributes.GetActivityId() != decisionAttributes.GetActivityId() ||
1292-
lastPartOfName(eventAttributes.ActivityType.GetName()) != lastPartOfName(decisionAttributes.ActivityType.GetName()) ||
1293-
(strictMode && eventAttributes.TaskList.GetName() != decisionAttributes.TaskList.GetName()) ||
1294-
(strictMode && bytes.Compare(eventAttributes.Input, decisionAttributes.Input) != 0) {
1295-
return false
1296-
}
1297-
1298-
return true
1299-
1300-
case s.DecisionTypeRequestCancelActivityTask:
1301-
if e.GetEventType() != s.EventTypeActivityTaskCancelRequested {
1302-
return false
1303-
}
1304-
decisionAttributes := d.RequestCancelActivityTaskDecisionAttributes
1305-
eventAttributes := e.ActivityTaskCancelRequestedEventAttributes
1306-
if eventAttributes.GetActivityId() != decisionAttributes.GetActivityId() {
1307-
return false
1308-
}
1309-
1310-
return true
1311-
1312-
case s.DecisionTypeStartTimer:
1313-
if e.GetEventType() != s.EventTypeTimerStarted {
1314-
return false
1315-
}
1316-
eventAttributes := e.TimerStartedEventAttributes
1317-
decisionAttributes := d.StartTimerDecisionAttributes
1318-
1319-
if eventAttributes.GetTimerId() != decisionAttributes.GetTimerId() ||
1320-
(strictMode && eventAttributes.GetStartToFireTimeoutSeconds() != decisionAttributes.GetStartToFireTimeoutSeconds()) {
1321-
return false
1322-
}
1323-
1324-
return true
1325-
1326-
case s.DecisionTypeCancelTimer:
1327-
if e.GetEventType() != s.EventTypeTimerCanceled && e.GetEventType() != s.EventTypeCancelTimerFailed {
1328-
return false
1329-
}
1330-
decisionAttributes := d.CancelTimerDecisionAttributes
1331-
if e.GetEventType() == s.EventTypeTimerCanceled {
1332-
eventAttributes := e.TimerCanceledEventAttributes
1333-
if eventAttributes.GetTimerId() != decisionAttributes.GetTimerId() {
1334-
return false
1335-
}
1336-
} else if e.GetEventType() == s.EventTypeCancelTimerFailed {
1337-
eventAttributes := e.CancelTimerFailedEventAttributes
1338-
if eventAttributes.GetTimerId() != decisionAttributes.GetTimerId() {
1339-
return false
1340-
}
1341-
}
1342-
1343-
return true
1344-
1345-
case s.DecisionTypeCompleteWorkflowExecution:
1346-
if e.GetEventType() != s.EventTypeWorkflowExecutionCompleted {
1347-
return false
1348-
}
1349-
if strictMode {
1350-
eventAttributes := e.WorkflowExecutionCompletedEventAttributes
1351-
decisionAttributes := d.CompleteWorkflowExecutionDecisionAttributes
1352-
1353-
if bytes.Compare(eventAttributes.Result, decisionAttributes.Result) != 0 {
1354-
return false
1355-
}
1356-
}
1357-
1358-
return true
1359-
1360-
case s.DecisionTypeFailWorkflowExecution:
1361-
if e.GetEventType() != s.EventTypeWorkflowExecutionFailed {
1362-
return false
1363-
}
1364-
if strictMode {
1365-
eventAttributes := e.WorkflowExecutionFailedEventAttributes
1366-
decisionAttributes := d.FailWorkflowExecutionDecisionAttributes
1367-
1368-
if eventAttributes.GetReason() != decisionAttributes.GetReason() ||
1369-
bytes.Compare(eventAttributes.Details, decisionAttributes.Details) != 0 {
1370-
return false
1371-
}
1372-
}
1373-
1374-
return true
1375-
1376-
case s.DecisionTypeRecordMarker:
1377-
if e.GetEventType() != s.EventTypeMarkerRecorded {
1378-
return false
1379-
}
1380-
eventAttributes := e.MarkerRecordedEventAttributes
1381-
decisionAttributes := d.RecordMarkerDecisionAttributes
1382-
if eventAttributes.GetMarkerName() != decisionAttributes.GetMarkerName() {
1383-
return false
1384-
}
1385-
1386-
return true
1387-
1388-
case s.DecisionTypeRequestCancelExternalWorkflowExecution:
1389-
if e.GetEventType() != s.EventTypeRequestCancelExternalWorkflowExecutionInitiated {
1390-
return false
1391-
}
1392-
eventAttributes := e.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
1393-
decisionAttributes := d.RequestCancelExternalWorkflowExecutionDecisionAttributes
1394-
if checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain()) ||
1395-
eventAttributes.WorkflowExecution.GetWorkflowId() != decisionAttributes.GetWorkflowId() {
1396-
return false
1397-
}
1398-
1399-
return true
1400-
1401-
case s.DecisionTypeSignalExternalWorkflowExecution:
1402-
if e.GetEventType() != s.EventTypeSignalExternalWorkflowExecutionInitiated {
1403-
return false
1404-
}
1405-
eventAttributes := e.SignalExternalWorkflowExecutionInitiatedEventAttributes
1406-
decisionAttributes := d.SignalExternalWorkflowExecutionDecisionAttributes
1407-
if checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain()) ||
1408-
eventAttributes.GetSignalName() != decisionAttributes.GetSignalName() ||
1409-
eventAttributes.WorkflowExecution.GetWorkflowId() != decisionAttributes.Execution.GetWorkflowId() {
1410-
return false
1411-
}
1412-
1413-
return true
1414-
1415-
case s.DecisionTypeCancelWorkflowExecution:
1416-
if e.GetEventType() != s.EventTypeWorkflowExecutionCanceled {
1417-
return false
1418-
}
1419-
if strictMode {
1420-
eventAttributes := e.WorkflowExecutionCanceledEventAttributes
1421-
decisionAttributes := d.CancelWorkflowExecutionDecisionAttributes
1422-
if bytes.Compare(eventAttributes.Details, decisionAttributes.Details) != 0 {
1423-
return false
1424-
}
1425-
}
1426-
return true
1427-
1428-
case s.DecisionTypeContinueAsNewWorkflowExecution:
1429-
if e.GetEventType() != s.EventTypeWorkflowExecutionContinuedAsNew {
1430-
return false
1431-
}
1432-
1433-
return true
1434-
1435-
case s.DecisionTypeStartChildWorkflowExecution:
1436-
if e.GetEventType() != s.EventTypeStartChildWorkflowExecutionInitiated {
1437-
return false
1438-
}
1439-
eventAttributes := e.StartChildWorkflowExecutionInitiatedEventAttributes
1440-
decisionAttributes := d.StartChildWorkflowExecutionDecisionAttributes
1441-
if lastPartOfName(eventAttributes.WorkflowType.GetName()) != lastPartOfName(decisionAttributes.WorkflowType.GetName()) ||
1442-
(strictMode && checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain())) ||
1443-
(strictMode && eventAttributes.TaskList.GetName() != decisionAttributes.TaskList.GetName()) {
1444-
return false
1445-
}
1446-
1447-
return true
1448-
1449-
case s.DecisionTypeUpsertWorkflowSearchAttributes:
1450-
if e.GetEventType() != s.EventTypeUpsertWorkflowSearchAttributes {
1451-
return false
1452-
}
1453-
eventAttributes := e.UpsertWorkflowSearchAttributesEventAttributes
1454-
decisionAttributes := d.UpsertWorkflowSearchAttributesDecisionAttributes
1455-
if strictMode && !isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes) {
1456-
return false
1457-
}
1458-
return true
1459-
}
1460-
1461-
return false
1462-
}
1463-
1464-
func isSearchAttributesMatched(attrFromEvent, attrFromDecision *s.SearchAttributes) bool {
1465-
if attrFromEvent != nil && attrFromDecision != nil {
1466-
return reflect.DeepEqual(attrFromEvent.IndexedFields, attrFromDecision.IndexedFields)
1467-
}
1468-
return attrFromEvent == nil && attrFromDecision == nil
1469-
}
1470-
1471-
// return true if the check fails:
1472-
//
1473-
// domain is not empty in decision
1474-
// and domain is not replayDomain
1475-
// and domains unmatch in decision and events
1476-
func checkDomainsInDecisionAndEvent(eventDomainName, decisionDomainName string) bool {
1477-
if decisionDomainName == "" || IsReplayDomain(decisionDomainName) {
1478-
return false
1479-
}
1480-
return eventDomainName != decisionDomainName
1481-
}
1482-
14831189
func (wth *workflowTaskHandlerImpl) completeWorkflow(
14841190
eventHandler *workflowExecutionEventHandlerImpl,
14851191
task *s.PollForDecisionTaskResponse,

0 commit comments

Comments
 (0)