Skip to content

Commit 83fdd73

Browse files
committed
Fix bringLocalEventsUpToSourceCurrentBranch
1 parent 8ca8304 commit 83fdd73

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

service/history/ndc/workflow_state_replicator.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,12 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
10961096
isNewBranch = false
10971097

10981098
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
1099+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1100+
if err != nil {
1101+
return err
1102+
}
1103+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1104+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
10991105
}
11001106
return nil
11011107
}
@@ -1151,6 +1157,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
11511157
startEventID = events[len(events)-1].EventId
11521158
startEventVersion = events[len(events)-1].Version
11531159
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(eventBlobs[i].Data))
1160+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1161+
if err != nil {
1162+
return newBranchToken, err
1163+
}
1164+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1165+
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
1166+
11541167
}
11551168
// add more events if there is any
11561169
if startEventID < endEventID {

service/history/ndc/workflow_state_replicator_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,3 +1608,113 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_C
16081608
s.Equal(int32(2), localVersionHistoryies.CurrentVersionHistoryIndex)
16091609
s.NotNil(newRunBranch)
16101610
}
1611+
1612+
func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_ExternalPayloadStats() {
1613+
// Test that the external payload stats are correctly updated when bringLocalEventsUpToSourceCurrentBranch is invoked
1614+
namespaceID := uuid.NewString()
1615+
versionHistories := &historyspb.VersionHistories{
1616+
CurrentVersionHistoryIndex: 0,
1617+
Histories: []*historyspb.VersionHistory{
1618+
{
1619+
BranchToken: []byte("branchToken"),
1620+
Items: []*historyspb.VersionHistoryItem{
1621+
{
1622+
EventId: int64(2),
1623+
Version: int64(1),
1624+
},
1625+
},
1626+
},
1627+
},
1628+
}
1629+
localVersionHistories := &historyspb.VersionHistories{
1630+
CurrentVersionHistoryIndex: 0,
1631+
Histories: []*historyspb.VersionHistory{
1632+
{
1633+
BranchToken: []byte("local-branchToken"),
1634+
},
1635+
},
1636+
}
1637+
1638+
historyEvents := []*historypb.HistoryEvent{
1639+
{
1640+
EventId: 1,
1641+
Version: 1,
1642+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
1643+
Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{
1644+
WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{
1645+
Input: &commonpb.Payloads{
1646+
Payloads: []*commonpb.Payload{
1647+
{
1648+
Data: []byte("test"),
1649+
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
1650+
{SizeBytes: 1024},
1651+
{SizeBytes: 2048},
1652+
},
1653+
},
1654+
},
1655+
},
1656+
},
1657+
},
1658+
},
1659+
{
1660+
EventId: 2,
1661+
Version: 1,
1662+
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
1663+
Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{
1664+
WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{},
1665+
},
1666+
},
1667+
}
1668+
1669+
serializer := serialization.NewSerializer()
1670+
eventBlobs, err := serializer.SerializeEvents(historyEvents)
1671+
s.NoError(err)
1672+
1673+
executionStats := &persistencespb.ExecutionStats{
1674+
HistorySize: 0,
1675+
ExternalPayloadSize: 0,
1676+
ExternalPayloadCount: 0,
1677+
}
1678+
mockMutableState := historyi.NewMockMutableState(s.controller)
1679+
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
1680+
VersionHistories: localVersionHistories,
1681+
ExecutionStats: executionStats,
1682+
}).AnyTimes()
1683+
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
1684+
RunId: s.runID,
1685+
}).AnyTimes()
1686+
mockMutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID)).AnyTimes()
1687+
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any()).Times(1)
1688+
mockMutableState.EXPECT().AddReapplyCandidateEvent(gomock.Any()).AnyTimes()
1689+
1690+
mockWeCtx := historyi.NewMockWorkflowContext(s.controller)
1691+
sourceClusterName := "test-cluster"
1692+
1693+
mockShard := historyi.NewMockShardContext(s.controller)
1694+
taskId := int64(100)
1695+
mockShard.EXPECT().GenerateTaskID().Return(taskId, nil).Times(1)
1696+
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
1697+
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1698+
mockEventsCache := events.NewMockCache(s.controller)
1699+
mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
1700+
mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).AnyTimes()
1701+
s.workflowStateReplicator.shardContext = mockShard
1702+
1703+
s.mockExecutionManager.EXPECT().AppendRawHistoryNodes(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
1704+
1705+
_, err = s.workflowStateReplicator.bringLocalEventsUpToSourceCurrentBranch(
1706+
context.Background(),
1707+
namespace.ID(namespaceID),
1708+
s.workflowID,
1709+
s.runID,
1710+
sourceClusterName,
1711+
mockWeCtx,
1712+
mockMutableState,
1713+
versionHistories,
1714+
[]*commonpb.DataBlob{eventBlobs},
1715+
true)
1716+
s.NoError(err)
1717+
1718+
s.Equal(int64(1024+2048), executionStats.ExternalPayloadSize)
1719+
s.Equal(int64(2), executionStats.ExternalPayloadCount)
1720+
}

0 commit comments

Comments
 (0)