Skip to content

Commit b47a630

Browse files
authored
Byte size replication cache per shard (#7243)
<!-- Describe what has changed in this PR --> **What changed?** Enforced per-shard replication cache limits by item count and total byte size. Added ByteSize() implementations for ReplicationTask and all referenced sub-types to enable accurate memory accounting. Added util test methods to verify ByteSize implementation and ensure that additions to structs using ByteSize will have ByteSize implemented and implemented correctly. <!-- Tell your future self why have you made these changes --> **Why?** Replication task sizes vary widely—from really small to hundreds of KiB. A count-only limit lets a few large tasks blow past the intended memory budget, leading to potential OOM in the history service. Adding a byte-size bound (with accurate ByteSize() accounting) keeps each shard within a predictable memory maximum usage. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests and deployment tests. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** Replication cache continues to be disabled by default. Enabling it without a max size value or a value that is too large may cause memory issues. <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** Introduce per-shard replication cache limits by count and total bytes (opt-in). Implement ByteSize() across ReplicationTask and nested types for accurate memory accounting, reducing risk of memory spikes in the history service. <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: fimanishi <[email protected]>
1 parent 72d3984 commit b47a630

File tree

14 files changed

+730
-62
lines changed

14 files changed

+730
-62
lines changed

cmd/server/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ require (
7979
github.com/coreos/go-semver v0.3.0 // indirect
8080
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
8181
github.com/fatih/color v1.13.0 // indirect
82+
github.com/google/gofuzz v1.0.0 // indirect
8283
github.com/mattn/go-colorable v0.1.9 // indirect
8384
github.com/mattn/go-isatty v0.0.14 // indirect
8485
github.com/ncruces/go-sqlite3 v0.22.0 // indirect

common/archiver/gcloud/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ require (
7373
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
7474
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
7575
github.com/golang/protobuf v1.5.3 // indirect
76+
github.com/google/gofuzz v1.0.0 // indirect
7677
github.com/google/s2a-go v0.1.4 // indirect
7778
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
7879
github.com/googleapis/gax-go/v2 v2.12.0 // indirect

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,9 +1091,15 @@ const (
10911091
// ReplicatorCacheCapacity is the capacity of replication cache in number of tasks
10921092
// KeyName: history.replicatorCacheCapacity
10931093
// Value type: Int
1094-
// Default value: 10000
1094+
// Default value: 0
10951095
// Allowed filters: N/A
10961096
ReplicatorCacheCapacity
1097+
// ReplicatorCacheMaxSize is the max size of the replication cache in bytes
1098+
// KeyName: history.replicatorCacheSize
1099+
// Value type: Int
1100+
// Default value: 0
1101+
// Allowed filters: N/A
1102+
ReplicatorCacheMaxSize
10971103

10981104
// ExecutionMgrNumConns is persistence connections number for ExecutionManager
10991105
// KeyName: history.executionMgrNumConns
@@ -3693,6 +3699,11 @@ var IntKeys = map[IntKey]DynamicInt{
36933699
Description: "ReplicatorCacheCapacity is the capacity of replication cache in number of tasks",
36943700
DefaultValue: 0,
36953701
},
3702+
ReplicatorCacheMaxSize: {
3703+
KeyName: "history.replicatorCacheSize",
3704+
Description: "ReplicatorCacheMaxSize is the max size of the replication cache in bytes",
3705+
DefaultValue: 0,
3706+
},
36963707
ExecutionMgrNumConns: {
36973708
KeyName: "history.executionMgrNumConns",
36983709
Description: "Deprecated: not used. ExecutionMgrNumConns is persistence connections number for ExecutionManager",

common/types/admin.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
package types
2222

23-
import "sort"
23+
import (
24+
"sort"
25+
"unsafe"
26+
)
2427

2528
// AddSearchAttributeRequest is an internal type (TBD...)
2629
type AddSearchAttributeRequest struct {
@@ -317,6 +320,14 @@ type IsolationGroupPartition struct {
317320
State IsolationGroupState
318321
}
319322

323+
// ByteSize returns an approximate size of the object in bytes
324+
func (i IsolationGroupPartition) ByteSize() uint64 {
325+
var size uint64
326+
size += uint64(unsafe.Sizeof(i))
327+
size += uint64(len(i.Name))
328+
return size
329+
}
330+
320331
// IsolationGroupConfiguration is an internal representation of a set of
321332
// isolation-groups as a mapping and may refer to either globally or per-domain (or both) configurations.
322333
// and their statuses. It's redundantly indexed by IsolationGroup name to simplify lookups.
@@ -356,6 +367,19 @@ func (i IsolationGroupConfiguration) DeepCopy() IsolationGroupConfiguration {
356367
return out
357368
}
358369

370+
// ByteSize returns an approximate size of the object in bytes
371+
func (i *IsolationGroupConfiguration) ByteSize() uint64 {
372+
if i == nil {
373+
return 0
374+
}
375+
376+
size := uint64(unsafe.Sizeof(*i))
377+
for k, v := range *i {
378+
size += uint64(len(k)) + uint64(len(v.Name))
379+
}
380+
return size
381+
}
382+
359383
// FromIsolationGroupPartitionList maps a list of isolation to the internal IsolationGroup configuration type
360384
// whose map keys tend to be used more for set operations
361385
func FromIsolationGroupPartitionList(in []IsolationGroupPartition) IsolationGroupConfiguration {
@@ -422,6 +446,18 @@ func (c AsyncWorkflowConfiguration) DeepCopy() AsyncWorkflowConfiguration {
422446
return res
423447
}
424448

449+
// ByteSize returns an approximate size of the object in bytes
450+
func (c *AsyncWorkflowConfiguration) ByteSize() uint64 {
451+
if c == nil {
452+
return 0
453+
}
454+
size := uint64(unsafe.Sizeof(*c))
455+
size += uint64(len(c.PredefinedQueueName))
456+
size += uint64(len(c.QueueType))
457+
size += c.QueueConfig.ByteSize()
458+
return size
459+
}
460+
425461
type UpdateDomainAsyncWorkflowConfiguratonRequest struct {
426462
Domain string
427463
Configuration *AsyncWorkflowConfiguration

common/types/replicator.go

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"strconv"
2626
"strings"
27+
"unsafe"
2728
)
2829

2930
// DLQType is an internal type (TBD...)
@@ -99,6 +100,14 @@ func (e DomainOperation) String() string {
99100
return fmt.Sprintf("DomainOperation(%d)", w)
100101
}
101102

103+
// ByteSize returns the approximate memory used in bytes
104+
func (e *DomainOperation) ByteSize() uint64 {
105+
if e == nil {
106+
return 0
107+
}
108+
return uint64(unsafe.Sizeof(*e))
109+
}
110+
102111
// UnmarshalText parses enum value from string representation
103112
func (e *DomainOperation) UnmarshalText(value []byte) error {
104113
switch s := strings.ToUpper(string(value)); s {
@@ -195,6 +204,22 @@ func (v *DomainTaskAttributes) GetPreviousFailoverVersion() (o int64) {
195204
return
196205
}
197206

207+
// ByteSize returns the approximate memory used in bytes
208+
func (v *DomainTaskAttributes) ByteSize() uint64 {
209+
if v == nil {
210+
return 0
211+
}
212+
213+
size := uint64(unsafe.Sizeof(*v))
214+
size += v.DomainOperation.ByteSize()
215+
size += uint64(len(v.ID))
216+
size += v.Info.ByteSize()
217+
size += v.Config.ByteSize()
218+
size += v.ReplicationConfig.ByteSize()
219+
220+
return size
221+
}
222+
198223
// FailoverMarkerAttributes is an internal type (TBD...)
199224
type FailoverMarkerAttributes struct {
200225
DomainID string `json:"domainID,omitempty"`
@@ -226,6 +251,21 @@ func (v *FailoverMarkerAttributes) GetCreationTime() (o int64) {
226251
return
227252
}
228253

254+
// ByteSize returns the approximate memory used in bytes
255+
func (v *FailoverMarkerAttributes) ByteSize() uint64 {
256+
if v == nil {
257+
return 0
258+
}
259+
260+
size := uint64(unsafe.Sizeof(*v))
261+
size += uint64(len(v.DomainID))
262+
if v.CreationTime != nil {
263+
size += uint64(unsafe.Sizeof(*v.CreationTime))
264+
}
265+
266+
return size
267+
}
268+
229269
// FailoverMarkers is an internal type (TBD...)
230270
type FailoverMarkers struct {
231271
FailoverMarkers []*FailoverMarkerAttributes `json:"failoverMarkers,omitempty"`
@@ -399,6 +439,28 @@ func (v *HistoryTaskV2Attributes) GetNewRunEvents() (o *DataBlob) {
399439
return
400440
}
401441

442+
// ByteSize returns the approximate memory used in bytes
443+
func (v *HistoryTaskV2Attributes) ByteSize() uint64 {
444+
if v == nil {
445+
return 0
446+
}
447+
448+
size := uint64(unsafe.Sizeof(*v))
449+
size += uint64(len(v.DomainID))
450+
size += uint64(len(v.WorkflowID))
451+
size += uint64(len(v.RunID))
452+
453+
size += uint64(len(v.VersionHistoryItems)) * uint64(unsafe.Sizeof((*VersionHistoryItem)(nil)))
454+
for _, item := range v.VersionHistoryItems {
455+
size += item.ByteSize()
456+
}
457+
458+
size += v.Events.ByteSize()
459+
size += v.NewRunEvents.ByteSize()
460+
461+
return size
462+
}
463+
402464
type CountDLQMessagesRequest struct {
403465
// ForceFetch will force fetching current values from DB
404466
// instead of using cached values used for emitting metrics.
@@ -656,9 +718,12 @@ func (v *ReplicationMessages) GetEarliestCreationTime() *int64 {
656718
return &result
657719
}
658720

659-
// ReplicationMessagesSizeFn is a function type to calculate size of ReplicationMessages
721+
// ReplicationMessagesSizeFn is a function type to calculate the size of ReplicationMessages
660722
type ReplicationMessagesSizeFn func(v *ReplicationMessages) int
661723

724+
// ReplicationTaskSizeFn is a function type to calculate the size of a single ReplicationTask
725+
type ReplicationTaskSizeFn func(v *ReplicationTask) int
726+
662727
// ReplicationTask is an internal type (TBD...)
663728
type ReplicationTask struct {
664729
TaskType *ReplicationTaskType `json:"taskType,omitempty"`
@@ -727,6 +792,26 @@ func (v *ReplicationTask) GetCreationTime() (o int64) {
727792
return
728793
}
729794

795+
// ByteSize returns the approximate memory used in bytes
796+
func (v *ReplicationTask) ByteSize() uint64 {
797+
if v == nil {
798+
return 0
799+
}
800+
801+
size := uint64(unsafe.Sizeof(*v))
802+
size += v.TaskType.ByteSize()
803+
size += v.DomainTaskAttributes.ByteSize()
804+
size += v.SyncShardStatusTaskAttributes.ByteSize()
805+
size += v.SyncActivityTaskAttributes.ByteSize()
806+
size += v.HistoryTaskV2Attributes.ByteSize()
807+
size += v.FailoverMarkerAttributes.ByteSize()
808+
if v.CreationTime != nil {
809+
size += uint64(unsafe.Sizeof(*v.CreationTime))
810+
}
811+
812+
return size
813+
}
814+
730815
// ReplicationTaskInfo is an internal type (TBD...)
731816
type ReplicationTaskInfo struct {
732817
DomainID string `json:"domainID,omitempty"`
@@ -898,6 +983,14 @@ const (
898983
ReplicationTaskTypeFailoverMarker
899984
)
900985

986+
// ByteSize returns the approximate memory used in bytes
987+
func (e *ReplicationTaskType) ByteSize() uint64 {
988+
if e == nil {
989+
return 0
990+
}
991+
return uint64(unsafe.Sizeof(*e))
992+
}
993+
901994
// ReplicationToken is an internal type (TBD...)
902995
type ReplicationToken struct {
903996
ShardID int32 `json:"shardID,omitempty"`
@@ -1028,6 +1121,37 @@ func (v *SyncActivityTaskAttributes) GetVersionHistory() (o *VersionHistory) {
10281121
return
10291122
}
10301123

1124+
// ByteSize returns the approximate memory used in bytes
1125+
func (v *SyncActivityTaskAttributes) ByteSize() uint64 {
1126+
if v == nil {
1127+
return 0
1128+
}
1129+
1130+
size := uint64(unsafe.Sizeof(*v))
1131+
size += uint64(len(v.DomainID))
1132+
size += uint64(len(v.WorkflowID))
1133+
size += uint64(len(v.RunID))
1134+
if v.ScheduledTime != nil {
1135+
size += uint64(unsafe.Sizeof(*v.ScheduledTime))
1136+
}
1137+
if v.StartedTime != nil {
1138+
size += uint64(unsafe.Sizeof(*v.StartedTime))
1139+
}
1140+
if v.LastHeartbeatTime != nil {
1141+
size += uint64(unsafe.Sizeof(*v.LastHeartbeatTime))
1142+
}
1143+
size += uint64(len(v.Details))
1144+
if v.LastFailureReason != nil {
1145+
size += uint64(unsafe.Sizeof(*v.LastFailureReason))
1146+
size += uint64(len(*v.LastFailureReason))
1147+
}
1148+
size += uint64(len(v.LastWorkerIdentity))
1149+
size += uint64(len(v.LastFailureDetails))
1150+
size += v.VersionHistory.ByteSize()
1151+
1152+
return size
1153+
}
1154+
10311155
// SyncShardStatus is an internal type (TBD...)
10321156
type SyncShardStatus struct {
10331157
Timestamp *int64 `json:"timestamp,omitempty"`
@@ -1047,3 +1171,18 @@ type SyncShardStatusTaskAttributes struct {
10471171
ShardID int64 `json:"shardId,omitempty"`
10481172
Timestamp *int64 `json:"timestamp,omitempty"`
10491173
}
1174+
1175+
// ByteSize returns the approximate memory used in bytes
1176+
func (v *SyncShardStatusTaskAttributes) ByteSize() uint64 {
1177+
if v == nil {
1178+
return 0
1179+
}
1180+
1181+
size := uint64(unsafe.Sizeof(*v))
1182+
size += uint64(len(v.SourceCluster))
1183+
if v.Timestamp != nil {
1184+
size += uint64(unsafe.Sizeof(*v.Timestamp))
1185+
}
1186+
1187+
return size
1188+
}

common/types/replicator_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,3 +1227,8 @@ func TestSyncShardStatus_GetTimestamp(t *testing.T) {
12271227
res = nilStruct.GetTimestamp()
12281228
assert.Equal(t, int64(0), res)
12291229
}
1230+
1231+
func TestReplicationTask_ByteSize(t *testing.T) {
1232+
AssertReachablesImplementByteSize(t, (*ReplicationTask)(nil))
1233+
AssertByteSizeMatchesReflect(t, &ReplicationTask{})
1234+
}

0 commit comments

Comments
 (0)