Skip to content

Commit 9e6216f

Browse files
committed
update history size calculation
1 parent b908498 commit 9e6216f

File tree

3 files changed

+134
-12
lines changed

3 files changed

+134
-12
lines changed

internal/internal_task_handlers.go

Lines changed: 131 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ const (
5151
defaultStickyCacheSize = 10000
5252

5353
noRetryBackoff = time.Duration(-1)
54+
55+
historySizeEstimationOffset = 100 * 1024
5456
)
5557

5658
type (
@@ -256,20 +258,21 @@ func isDecisionEvent(eventType s.EventType) bool {
256258

257259
// NextDecisionEvents returns events that there processed as new by the next decision.
258260
// TODO(maxim): Refactor to return a struct instead of multiple parameters
259-
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
261+
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, historySize int, err error) {
260262
if eh.next == nil {
261-
eh.next, _, err = eh.nextDecisionEvents()
263+
eh.next, _, historySize, err = eh.nextDecisionEvents()
262264
if err != nil {
263-
return result, markers, eh.binaryChecksum, err
265+
return result, markers, eh.binaryChecksum, historySize, err
264266
}
265267
}
266268

267269
result = eh.next
268270
checksum := eh.binaryChecksum
269271
if len(result) > 0 {
270-
eh.next, markers, err = eh.nextDecisionEvents()
272+
eh.next, markers, historySize, err = eh.nextDecisionEvents()
271273
}
272-
return result, markers, checksum, err
274+
275+
return result, markers, checksum, historySize, err
273276
}
274277

275278
func (eh *history) HasNextDecisionEvents() bool {
@@ -301,12 +304,12 @@ func (eh *history) verifyAllEventsProcessed() error {
301304
return nil
302305
}
303306

304-
func (eh *history) nextDecisionEvents() (nextEvents []*s.HistoryEvent, markers []*s.HistoryEvent, err error) {
307+
func (eh *history) nextDecisionEvents() (nextEvents []*s.HistoryEvent, markers []*s.HistoryEvent, historySizeEstimation int, err error) {
305308
if eh.currentIndex == len(eh.loadedEvents) && !eh.hasMoreEvents() {
306309
if err := eh.verifyAllEventsProcessed(); err != nil {
307-
return nil, nil, err
310+
return nil, nil, 0, err
308311
}
309-
return []*s.HistoryEvent{}, []*s.HistoryEvent{}, nil
312+
return []*s.HistoryEvent{}, []*s.HistoryEvent{}, 0, nil
310313
}
311314

312315
// Process events
@@ -367,7 +370,114 @@ OrderEvents:
367370
eh.loadedEvents = eh.loadedEvents[eh.currentIndex:]
368371
eh.currentIndex = 0
369372

370-
return nextEvents, markers, nil
373+
// estimate history size for nextEvents and markers
374+
historySizeEstimation = estimateHistorySize(nextEvents) + estimateHistorySize(markers)
375+
376+
return nextEvents, markers, historySizeEstimation, nil
377+
}
378+
379+
func estimateHistorySize(events []*s.HistoryEvent) (sum int) {
380+
for _, e := range events {
381+
switch e.GetEventType() {
382+
case s.EventTypeWorkflowExecutionStarted:
383+
if e.WorkflowExecutionStartedEventAttributes != nil {
384+
sum += len(e.WorkflowExecutionStartedEventAttributes.Input)
385+
sum += len(e.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
386+
sum += len(e.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
387+
sum += len(e.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
388+
}
389+
case s.EventTypeWorkflowExecutionSignaled:
390+
if e.WorkflowExecutionSignaledEventAttributes != nil {
391+
sum += len(e.WorkflowExecutionSignaledEventAttributes.Input)
392+
}
393+
case s.EventTypeWorkflowExecutionFailed:
394+
if e.WorkflowExecutionFailedEventAttributes != nil {
395+
sum += len(e.WorkflowExecutionFailedEventAttributes.Details)
396+
}
397+
case s.EventTypeDecisionTaskCompleted:
398+
if e.DecisionTaskCompletedEventAttributes != nil {
399+
sum += len(e.DecisionTaskCompletedEventAttributes.ExecutionContext)
400+
}
401+
case s.EventTypeDecisionTaskFailed:
402+
if e.DecisionTaskFailedEventAttributes != nil {
403+
sum += len(e.DecisionTaskFailedEventAttributes.Details)
404+
}
405+
case s.EventTypeActivityTaskScheduled:
406+
if e.ActivityTaskScheduledEventAttributes != nil {
407+
sum += len(e.ActivityTaskScheduledEventAttributes.Input)
408+
sum += len(e.ActivityTaskScheduledEventAttributes.Header.GetFields())
409+
}
410+
case s.EventTypeActivityTaskStarted:
411+
if e.ActivityTaskStartedEventAttributes != nil {
412+
sum += len(e.ActivityTaskStartedEventAttributes.LastFailureDetails)
413+
}
414+
case s.EventTypeActivityTaskCompleted:
415+
if e.ActivityTaskCompletedEventAttributes != nil {
416+
sum += len(e.ActivityTaskCompletedEventAttributes.Result)
417+
}
418+
case s.EventTypeActivityTaskFailed:
419+
if e.ActivityTaskFailedEventAttributes != nil {
420+
sum += len(e.ActivityTaskFailedEventAttributes.Details)
421+
}
422+
case s.EventTypeActivityTaskTimedOut:
423+
if e.ActivityTaskTimedOutEventAttributes != nil {
424+
sum += len(e.ActivityTaskTimedOutEventAttributes.Details)
425+
sum += len(e.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
426+
}
427+
case s.EventTypeActivityTaskCanceled:
428+
if e.ActivityTaskCanceledEventAttributes != nil {
429+
sum += len(e.ActivityTaskCanceledEventAttributes.Details)
430+
}
431+
case s.EventTypeMarkerRecorded:
432+
if e.MarkerRecordedEventAttributes != nil {
433+
sum += len(e.MarkerRecordedEventAttributes.Details)
434+
}
435+
case s.EventTypeWorkflowExecutionTerminated:
436+
if e.WorkflowExecutionTerminatedEventAttributes != nil {
437+
sum += len(e.WorkflowExecutionTerminatedEventAttributes.Details)
438+
}
439+
case s.EventTypeWorkflowExecutionCanceled:
440+
if e.WorkflowExecutionCanceledEventAttributes != nil {
441+
sum += len(e.WorkflowExecutionCanceledEventAttributes.Details)
442+
}
443+
case s.EventTypeWorkflowExecutionContinuedAsNew:
444+
if e.WorkflowExecutionContinuedAsNewEventAttributes != nil {
445+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Input)
446+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
447+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
448+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
449+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
450+
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
451+
}
452+
case s.EventTypeStartChildWorkflowExecutionInitiated:
453+
if e.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
454+
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
455+
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
456+
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
457+
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
458+
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
459+
}
460+
case s.EventTypeChildWorkflowExecutionCompleted:
461+
if e.ChildWorkflowExecutionCompletedEventAttributes != nil {
462+
sum += len(e.ChildWorkflowExecutionCompletedEventAttributes.Result)
463+
}
464+
case s.EventTypeChildWorkflowExecutionFailed:
465+
if e.ChildWorkflowExecutionFailedEventAttributes != nil {
466+
sum += len(e.ChildWorkflowExecutionFailedEventAttributes.Details)
467+
}
468+
case s.EventTypeChildWorkflowExecutionCanceled:
469+
if e.ChildWorkflowExecutionCanceledEventAttributes != nil {
470+
sum += len(e.ChildWorkflowExecutionCanceledEventAttributes.Details)
471+
}
472+
case s.EventTypeSignalExternalWorkflowExecutionInitiated:
473+
if e.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
474+
sum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
475+
sum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
476+
}
477+
}
478+
}
479+
sum += historySizeEstimationOffset
480+
return
371481
}
372482

373483
func isPreloadMarkerEvent(event *s.HistoryEvent) bool {
@@ -824,7 +934,7 @@ process_Workflow_Loop:
824934
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
825935
task := workflowTask.task
826936
historyIterator := workflowTask.historyIterator
827-
w.workflowInfo.TotalHistoryBytes = task.GetTotalHistoryBytes()
937+
w.workflowInfo.HistoryBytesServer = task.GetTotalHistoryBytes()
828938
w.workflowInfo.HistoryCount = task.GetNextEventId() - 1
829939
if err := w.ResetIfStale(task, historyIterator); err != nil {
830940
return nil, err
@@ -849,7 +959,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
849959
// Process events
850960
ProcessEvents:
851961
for {
852-
reorderedEvents, markers, binaryChecksum, err := reorderedHistory.NextDecisionEvents()
962+
var historySizeEstimation int
963+
reorderedEvents, markers, binaryChecksum, historySizeEstimation, err := reorderedHistory.NextDecisionEvents()
853964
if err != nil {
854965
return nil, err
855966
}
@@ -870,6 +981,7 @@ ProcessEvents:
870981
if err != nil {
871982
return nil, err
872983
}
984+
873985
if w.isWorkflowCompleted {
874986
break ProcessEvents
875987
}
@@ -894,6 +1006,14 @@ ProcessEvents:
8941006
return nil, err
8951007
}
8961008

1009+
if isLast {
1010+
w.workflowInfo.TotalHistoryBytes += int64(historySizeEstimation)
1011+
w.wth.logger.Info("DIfferences between history size estimation and actual size",
1012+
zap.Int64("HistorySizeEstimation", w.workflowInfo.TotalHistoryBytes),
1013+
zap.Int64("ActualHistorySize", w.workflowInfo.HistoryBytesServer),
1014+
zap.Int64("HistorySizeDiff", w.workflowInfo.TotalHistoryBytes-w.workflowInfo.HistoryBytesServer))
1015+
}
1016+
8971017
err = eventHandler.ProcessEvent(event, isInReplay, isLast)
8981018
if err != nil {
8991019
return nil, err

internal/internal_task_handlers_interfaces_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,12 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextDecisions() {
176176

177177
eh := newHistory(workflowTask, nil)
178178

179-
events, _, _, err := eh.NextDecisionEvents()
179+
events, _, _, historySize, err := eh.NextDecisionEvents()
180180

181181
s.NoError(err)
182182
s.Equal(3, len(events))
183183
s.Equal(m.EventTypeWorkflowExecutionSignaled, events[1].GetEventType())
184184
s.Equal(m.EventTypeDecisionTaskStarted, events[2].GetEventType())
185185
s.Equal(int64(7), events[2].GetEventId())
186+
s.Equal(0, historySize)
186187
}

internal/workflow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,7 @@ type WorkflowInfo struct {
11121112
DecisionStartedEventID int64 // the eventID of DecisionStarted that is making the current decision(can be used for reset API)
11131113
RetryPolicy *s.RetryPolicy
11141114
TotalHistoryBytes int64
1115+
HistoryBytesServer int64
11151116
HistoryCount int64
11161117
}
11171118

0 commit comments

Comments
 (0)