Skip to content

Commit 3e27b5d

Browse files
committed
Add feature control knob
1 parent ea0c285 commit 3e27b5d

File tree

7 files changed

+90
-36
lines changed

7 files changed

+90
-36
lines changed

common/dynamicconfig/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2667,6 +2667,12 @@ to the CHASM (V2) implementation on active scheduler workflows.`,
26672667
instead of the previous HSM backed implementation.`,
26682668
)
26692669

2670+
ExternalPayloadsEnabled = NewNamespaceBoolSetting(
2671+
"history.externalPayloadsEnabled",
2672+
false,
2673+
`ExternalPayloadsEnabled controls whether external payload features are enabled for a namespace.`,
2674+
)
2675+
26702676
// keys for worker
26712677

26722678
WorkerPersistenceMaxQPS = NewGlobalIntSetting(

service/history/configs/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type Config struct {
7171
ChasmMaxInMemoryPureTasks dynamicconfig.IntPropertyFn
7272
EnableCHASMSchedulerCreation dynamicconfig.BoolPropertyFnWithNamespaceFilter
7373
EnableCHASMSchedulerMigration dynamicconfig.BoolPropertyFnWithNamespaceFilter
74+
ExternalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
7475

7576
// EventsCache settings
7677
// Change of these configs require shard restart
@@ -463,7 +464,8 @@ func NewConfig(
463464
EnableCHASMSchedulerCreation: dynamicconfig.EnableCHASMSchedulerCreation.Get(dc),
464465
EnableCHASMSchedulerMigration: dynamicconfig.EnableCHASMSchedulerMigration.Get(dc),
465466

466-
EnableCHASMCallbacks: dynamicconfig.EnableCHASMCallbacks.Get(dc),
467+
EnableCHASMCallbacks: dynamicconfig.EnableCHASMCallbacks.Get(dc),
468+
ExternalPayloadsEnabled: dynamicconfig.ExternalPayloadsEnabled.Get(dc),
467469

468470
EventsShardLevelCacheMaxSizeBytes: dynamicconfig.EventsCacheMaxSizeBytes.Get(dc), // 512KB
469471
EventsHostLevelCacheMaxSizeBytes: dynamicconfig.EventsHostLevelCacheMaxSizeBytes.Get(dc), // 256MB

service/history/ndc/state_rebuilder.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,19 @@ func (r *StateRebuilderImpl) buildMutableStateFromEvent(
231231
targetBranchToken []byte,
232232
requestID string,
233233
) (historyi.MutableState, int64, error) {
234+
namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(targetWorkflowIdentifier.NamespaceID))
235+
if err != nil {
236+
return nil, 0, err
237+
}
238+
234239
iter := collection.NewPagingIterator(r.getPaginationFn(
235240
ctx,
236241
common.FirstEventID,
237242
baseLastEventID+1,
238243
baseBranchToken,
244+
namespaceEntry.Name().String(),
239245
))
240246

241-
namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(targetWorkflowIdentifier.NamespaceID))
242-
if err != nil {
243-
return nil, 0, err
244-
}
245-
246247
rebuiltMutableState, stateBuilder := r.initializeBuilders(
247248
namespaceEntry,
248249
targetWorkflowIdentifier,
@@ -356,6 +357,7 @@ func (r *StateRebuilderImpl) getPaginationFn(
356357
firstEventID int64,
357358
nextEventID int64,
358359
branchToken []byte,
360+
namespaceName string,
359361
) collection.PaginationFn[HistoryBlobsPaginationItem] {
360362
return func(paginationToken []byte) ([]HistoryBlobsPaginationItem, []byte, error) {
361363
resp, err := r.executionMgr.ReadHistoryBranchByBatch(ctx, &persistence.ReadHistoryBranchRequest{
@@ -380,12 +382,14 @@ func (r *StateRebuilderImpl) getPaginationFn(
380382
paginateItems = append(paginateItems, nextBatch)
381383

382384
// Calculate and accumulate external payload size and count for this batch of history events
383-
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(history.Events)
384-
if err != nil {
385-
return nil, nil, err
385+
if r.shard.GetConfig().ExternalPayloadsEnabled(namespaceName) {
386+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(history.Events)
387+
if err != nil {
388+
return nil, nil, err
389+
}
390+
r.rebuiltExternalPayloadSize += externalPayloadSize
391+
r.rebuiltExternalPayloadCount += externalPayloadCount
386392
}
387-
r.rebuiltExternalPayloadSize += externalPayloadSize
388-
r.rebuiltExternalPayloadCount += externalPayloadCount
389393
}
390394
return paginateItems, resp.NextPageToken, nil
391395
}

service/history/ndc/state_rebuilder_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func (s *stateRebuilderSuite) SetupTest() {
7575
s.mockTaskRefresher = workflow.NewMockTaskRefresher(s.controller)
7676
config := tests.NewDynamicConfig()
7777
config.EnableTransitionHistory = dynamicconfig.GetBoolPropertyFn(true)
78+
config.ExternalPayloadsEnabled = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
7879
s.mockShard = shard.NewTestContext(
7980
s.controller,
8081
&persistencespb.ShardInfo{
@@ -222,7 +223,7 @@ func (s *stateRebuilderSuite) TestPagination() {
222223
Size: 67890,
223224
}, nil)
224225

225-
paginationFn := s.nDCStateRebuilder.getPaginationFn(context.Background(), firstEventID, nextEventID, branchToken)
226+
paginationFn := s.nDCStateRebuilder.getPaginationFn(context.Background(), firstEventID, nextEventID, branchToken, tests.Namespace.String())
226227
iter := collection.NewPagingIterator(paginationFn)
227228

228229
var result []HistoryBlobsPaginationItem

service/history/ndc/workflow_state_replicator.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,12 +1117,14 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
11171117
isNewBranch = false
11181118

11191119
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
1120-
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1121-
if err != nil {
1122-
return err
1120+
if r.shardContext.GetConfig().ExternalPayloadsEnabled(nsName.String()) {
1121+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1122+
if err != nil {
1123+
return err
1124+
}
1125+
localMutableState.AddExternalPayloadSize(externalPayloadSize)
1126+
localMutableState.AddExternalPayloadCount(externalPayloadCount)
11231127
}
1124-
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1125-
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
11261128
}
11271129
return nil
11281130
}
@@ -1181,13 +1183,14 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
11811183
startEventID = events[len(events)-1].EventId
11821184
startEventVersion = events[len(events)-1].Version
11831185
localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(eventBlobs[i].Data))
1184-
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1185-
if err != nil {
1186-
return newBranchToken, err
1186+
if r.shardContext.GetConfig().ExternalPayloadsEnabled(localMutableState.GetNamespaceEntry().Name().String()) {
1187+
externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events)
1188+
if err != nil {
1189+
return newBranchToken, err
1190+
}
1191+
localMutableState.AddExternalPayloadSize(externalPayloadSize)
1192+
localMutableState.AddExternalPayloadCount(externalPayloadCount)
11871193
}
1188-
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize
1189-
localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount
1190-
11911194
}
11921195
// add more events if there is any
11931196
if startEventID < endEventID {

service/history/ndc/workflow_state_replicator_test.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.temporal.io/server/chasm"
2424
"go.temporal.io/server/common"
2525
"go.temporal.io/server/common/definition"
26+
"go.temporal.io/server/common/dynamicconfig"
2627
"go.temporal.io/server/common/locks"
2728
"go.temporal.io/server/common/log"
2829
"go.temporal.io/server/common/namespace"
@@ -75,13 +76,15 @@ func (s *workflowReplicatorSuite) SetupTest() {
7576
s.controller = gomock.NewController(s.T())
7677
// s.mockTaskRefresher = workflow.NewMockTaskRefresher(s.controller)
7778

79+
config := tests.NewDynamicConfig()
80+
config.ExternalPayloadsEnabled = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
7881
s.mockShard = shard.NewTestContext(
7982
s.controller,
8083
&persistencespb.ShardInfo{
8184
ShardId: 10,
8285
RangeId: 1,
8386
},
84-
tests.NewDynamicConfig(),
87+
config,
8588
)
8689

8790
reg := hsm.NewRegistry()
@@ -1147,8 +1150,17 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
11471150
mockShard.EXPECT().GenerateTaskID().Return(taskId3, nil).Times(1)
11481151
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
11491152
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1153+
mockShard.EXPECT().GetConfig().Return(s.mockShard.GetConfig()).AnyTimes()
11501154
s.workflowStateReplicator.shardContext = mockShard
1151-
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(namespace.Name("test-namespace"), nil).AnyTimes()
1155+
nsName := namespace.Name("test-namespace")
1156+
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(nsName, nil).AnyTimes()
1157+
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
1158+
&persistencespb.NamespaceInfo{Id: namespaceID, Name: nsName.String()},
1159+
&persistencespb.NamespaceConfig{},
1160+
"test-cluster",
1161+
)).AnyTimes()
1162+
mockMutableState.EXPECT().AddExternalPayloadSize(gomock.Any()).AnyTimes()
1163+
mockMutableState.EXPECT().AddExternalPayloadCount(gomock.Any()).AnyTimes()
11521164
s.mockRemoteAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(gomock.Any(), &adminservice.GetWorkflowExecutionRawHistoryV2Request{
11531165
NamespaceId: namespaceID,
11541166
Execution: &commonpb.WorkflowExecution{WorkflowId: s.workflowID, RunId: s.runID},
@@ -1295,8 +1307,17 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
12951307
mockShard.EXPECT().GenerateTaskID().Return(taskId3, nil).Times(1)
12961308
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
12971309
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1310+
mockShard.EXPECT().GetConfig().Return(s.mockShard.GetConfig()).AnyTimes()
12981311
s.workflowStateReplicator.shardContext = mockShard
1299-
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(namespace.Name("test-namespace"), nil).AnyTimes()
1312+
nsName := namespace.Name("test-namespace")
1313+
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(nsName, nil).AnyTimes()
1314+
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
1315+
&persistencespb.NamespaceInfo{Id: namespaceID, Name: nsName.String()},
1316+
&persistencespb.NamespaceConfig{},
1317+
"test-cluster",
1318+
)).AnyTimes()
1319+
mockMutableState.EXPECT().AddExternalPayloadSize(gomock.Any()).AnyTimes()
1320+
mockMutableState.EXPECT().AddExternalPayloadCount(gomock.Any()).AnyTimes()
13001321
s.mockRemoteAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(gomock.Any(), &adminservice.GetWorkflowExecutionRawHistoryV2Request{
13011322
NamespaceId: namespaceID,
13021323
Execution: &commonpb.WorkflowExecution{WorkflowId: s.workflowID, RunId: s.runID},
@@ -1468,8 +1489,17 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
14681489
mockShard.EXPECT().GenerateTaskID().Return(taskId3, nil).Times(1)
14691490
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
14701491
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1492+
mockShard.EXPECT().GetConfig().Return(s.mockShard.GetConfig()).AnyTimes()
14711493
s.workflowStateReplicator.shardContext = mockShard
1472-
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(namespace.Name("test-namespace"), nil).AnyTimes()
1494+
nsName := namespace.Name("test-namespace")
1495+
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(nsName, nil).AnyTimes()
1496+
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
1497+
&persistencespb.NamespaceInfo{Id: namespaceID, Name: nsName.String()},
1498+
&persistencespb.NamespaceConfig{},
1499+
"test-cluster",
1500+
)).AnyTimes()
1501+
mockMutableState.EXPECT().AddExternalPayloadSize(gomock.Any()).AnyTimes()
1502+
mockMutableState.EXPECT().AddExternalPayloadCount(gomock.Any()).AnyTimes()
14731503
s.mockRemoteAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(gomock.Any(), &adminservice.GetWorkflowExecutionRawHistoryV2Request{
14741504
NamespaceId: namespaceID,
14751505
Execution: &commonpb.WorkflowExecution{WorkflowId: s.workflowID, RunId: s.runID},
@@ -1703,13 +1733,22 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_E
17031733
mockWeCtx := historyi.NewMockWorkflowContext(s.controller)
17041734
sourceClusterName := "test-cluster"
17051735

1706-
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(namespace.Name("test-namespace"), nil).AnyTimes()
1736+
nsName := namespace.Name("test-namespace")
1737+
s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(nsName, nil).AnyTimes()
1738+
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
1739+
&persistencespb.NamespaceInfo{Id: namespaceID, Name: nsName.String()},
1740+
&persistencespb.NamespaceConfig{},
1741+
"test-cluster",
1742+
)).AnyTimes()
1743+
mockMutableState.EXPECT().AddExternalPayloadSize(int64(1024 + 2048)).Times(1)
1744+
mockMutableState.EXPECT().AddExternalPayloadCount(int64(2)).Times(1)
17071745

17081746
mockShard := historyi.NewMockShardContext(s.controller)
17091747
taskId := int64(100)
17101748
mockShard.EXPECT().GenerateTaskID().Return(taskId, nil).Times(1)
17111749
mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes()
17121750
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1751+
mockShard.EXPECT().GetConfig().Return(s.mockShard.GetConfig()).AnyTimes()
17131752
mockEventsCache := events.NewMockCache(s.controller)
17141753
mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
17151754
mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).AnyTimes()
@@ -1729,7 +1768,4 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_E
17291768
[]*commonpb.DataBlob{eventBlobs},
17301769
true)
17311770
s.NoError(err)
1732-
1733-
s.Equal(int64(1024+2048), executionStats.ExternalPayloadSize)
1734-
s.Equal(int64(2), executionStats.ExternalPayloadCount)
17351771
}

service/history/workflow/mutable_state_impl.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7536,12 +7536,14 @@ func (ms *MutableStateImpl) closeTransactionPrepareEvents(
75367536
ms.executionInfo.LastFirstEventTxnId = historyNodeTxnIDs[index]
75377537

75387538
// Calculate and add the external payload size and count for this batch
7539-
externalPayloadSize, externalPayloadCount, err := CalculateExternalPayloadSize(eventBatch)
7540-
if err != nil {
7541-
return nil, nil, nil, false, err
7539+
if ms.config.ExternalPayloadsEnabled(ms.GetNamespaceEntry().Name().String()) {
7540+
externalPayloadSize, externalPayloadCount, err := CalculateExternalPayloadSize(eventBatch)
7541+
if err != nil {
7542+
return nil, nil, nil, false, err
7543+
}
7544+
ms.AddExternalPayloadSize(externalPayloadSize)
7545+
ms.AddExternalPayloadCount(externalPayloadCount)
75427546
}
7543-
ms.AddExternalPayloadSize(externalPayloadSize)
7544-
ms.AddExternalPayloadCount(externalPayloadCount)
75457547
}
75467548

75477549
if err := ms.validateNoEventsAfterWorkflowFinish(

0 commit comments

Comments
 (0)