Skip to content

Commit c7d356a

Browse files
committed
Fix bringLocalEventsUpToSourceCurrentBranch
1 parent 2bcb12b commit c7d356a

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

service/history/ndc/workflow_state_replicator.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,12 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
11171117
isNewBranch = false
11181118

11191119
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
1120+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1121+
if err != nil {
1122+
return err
1123+
}
1124+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1125+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
11201126
}
11211127
return nil
11221128
}
@@ -1175,6 +1181,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
11751181
startEventID = events[len(events)-1].EventId
11761182
startEventVersion = events[len(events)-1].Version
11771183
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(eventBlobs[i].Data))
1184+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1185+
if err != nil {
1186+
return newBranchToken, err
1187+
}
1188+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1189+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
1190+
11781191
}
11791192
// add more events if there is any
11801193
if startEventID < endEventID {

service/history/ndc/workflow_state_replicator_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1621,3 +1621,113 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_C
16211621
s.Equal(int32(2), localVersionHistoryies.CurrentVersionHistoryIndex)
16221622
s.NotNil(newRunBranch)
16231623
}
1624+
1625+
func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_ExternalPayloadStats() {
1626+
// Test that the external payload stats are correctly updated when bringLocalEventsUpToSourceCurrentBranch is invoked
1627+
namespaceID := uuid.NewString()
1628+
versionHistories := &historyspb.VersionHistories{
1629+
CurrentVersionHistoryIndex: 0,
1630+
Histories: []*historyspb.VersionHistory{
1631+
{
1632+
BranchToken: []byte("branchToken"),
1633+
Items: []*historyspb.VersionHistoryItem{
1634+
{
1635+
EventId: int64(2),
1636+
Version: int64(1),
1637+
},
1638+
},
1639+
},
1640+
},
1641+
}
1642+
localVersionHistories := &historyspb.VersionHistories{
1643+
CurrentVersionHistoryIndex: 0,
1644+
Histories: []*historyspb.VersionHistory{
1645+
{
1646+
BranchToken: []byte("local-branchToken"),
1647+
},
1648+
},
1649+
}
1650+
1651+
historyEvents := []*historypb.HistoryEvent{
1652+
{
1653+
EventId: 1,
1654+
Version: 1,
1655+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
1656+
Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{
1657+
WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{
1658+
Input: &commonpb.Payloads{
1659+
Payloads: []*commonpb.Payload{
1660+
{
1661+
Data: []byte("test"),
1662+
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
1663+
{SizeBytes: 1024},
1664+
{SizeBytes: 2048},
1665+
},
1666+
},
1667+
},
1668+
},
1669+
},
1670+
},
1671+
},
1672+
{
1673+
EventId: 2,
1674+
Version: 1,
1675+
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
1676+
Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{
1677+
WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{},
1678+
},
1679+
},
1680+
}
1681+
1682+
serializer := serialization.NewSerializer()
1683+
eventBlobs, err := serializer.SerializeEvents(historyEvents)
1684+
s.NoError(err)
1685+
1686+
executionStats := &persistencespb.ExecutionStats{
1687+
HistorySize: 0,
1688+
ExternalPayloadSize: 0,
1689+
ExternalPayloadCount: 0,
1690+
}
1691+
mockMutableState := historyi.NewMockMutableState(s.controller)
1692+
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
1693+
VersionHistories: localVersionHistories,
1694+
ExecutionStats: executionStats,
1695+
}).AnyTimes()
1696+
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
1697+
RunId: s.runID,
1698+
}).AnyTimes()
1699+
mockMutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID)).AnyTimes()
1700+
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any()).Times(1)
1701+
mockMutableState.EXPECT().AddReapplyCandidateEvent(gomock.Any()).AnyTimes()
1702+
1703+
mockWeCtx := historyi.NewMockWorkflowContext(s.controller)
1704+
sourceClusterName := "test-cluster"
1705+
1706+
mockShard := historyi.NewMockShardContext(s.controller)
1707+
taskId := int64(100)
1708+
mockShard.EXPECT().GenerateTaskID().Return(taskId, nil).Times(1)
1709+
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
1710+
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1711+
mockEventsCache := events.NewMockCache(s.controller)
1712+
mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
1713+
mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).AnyTimes()
1714+
s.workflowStateReplicator.shardContext = mockShard
1715+
1716+
s.mockExecutionManager.EXPECT().AppendRawHistoryNodes(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
1717+
1718+
_, err = s.workflowStateReplicator.bringLocalEventsUpToSourceCurrentBranch(
1719+
context.Background(),
1720+
namespace.ID(namespaceID),
1721+
s.workflowID,
1722+
s.runID,
1723+
sourceClusterName,
1724+
mockWeCtx,
1725+
mockMutableState,
1726+
versionHistories,
1727+
[]*commonpb.DataBlob{eventBlobs},
1728+
true)
1729+
s.NoError(err)
1730+
1731+
s.Equal(int64(1024+2048), executionStats.ExternalPayloadSize)
1732+
s.Equal(int64(2), executionStats.ExternalPayloadCount)
1733+
}

0 commit comments

Comments
 (0)