Skip to content

Commit c3f8aea

Browse files
committed
remove estimate history size in task handlers, now we estimate history size after processing every history event
1 parent 8dc580d commit c3f8aea

File tree

4 files changed

+90
-162
lines changed

4 files changed

+90
-162
lines changed

internal/internal_event_handlers.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import (
4444

4545
const (
4646
queryResultSizeLimit = 2000000 // 2MB
47-
historySizeEstimationBuffer = 3 * 128 // 128B
47+
historySizeEstimationBuffer = 512 // 512B
4848
)
4949

5050
// Make sure that interfaces are implemented
@@ -941,12 +941,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
941941
return err
942942
}
943943

944-
if !isReplay {
945-
historySizeErr := weh.estimateHistorySize(event)
946-
if historySizeErr != nil {
947-
weh.logger.Error("Failed to estimate history size", zap.Error(historySizeErr))
948-
}
949-
}
944+
historySum := weh.estimateHistorySize(event)
945+
weh.workflowInfo.TotalHistoryBytes += int64(historySum)
946+
weh.logger.Info("EstimateHistorySize", zap.Int("historyBytes", historySum), zap.Int64("totalHistoryBytes", weh.workflowInfo.TotalHistoryBytes))
950947

951948
// When replaying histories to get stack trace or current state the last event might be not
952949
// decision started. So always call OnDecisionTaskStarted on the last event.
@@ -1388,7 +1385,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecut
13881385
return nil
13891386
}
13901387

1391-
func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.HistoryEvent) error {
1388+
func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.HistoryEvent) int {
13921389
sum := historySizeEstimationBuffer
13931390
switch event.GetEventType() {
13941391
case m.EventTypeWorkflowExecutionStarted:
@@ -1487,9 +1484,8 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14871484
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
14881485
}
14891486
default:
1490-
return fmt.Errorf("unknown event type")
1487+
weh.logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String()))
14911488
}
1492-
weh.workflowInfo.TotalHistoryBytes += int64(sum)
1493-
weh.logger.Info("EstimateHistorySize", zap.Int("historyBytes", sum), zap.Int64("totalHistoryBytes", weh.workflowInfo.TotalHistoryBytes))
1494-
return nil
1489+
1490+
return sum
14951491
}

internal/internal_event_handlers_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package internal
2222

2323
import (
2424
"encoding/json"
25+
"go.uber.org/cadence/internal/common"
2526
"testing"
2627

2728
"github.com/stretchr/testify/assert"
@@ -312,3 +313,72 @@ func Test_CreateSearchAttributesForChangeVersion(t *testing.T) {
312313
require.True(t, ok, "Remember to update related key on server side")
313314
require.Equal(t, []string{"cid-1"}, val)
314315
}
316+
317+
func TestHistoryEstimationforSmallEvents(t *testing.T) {
318+
taskList := "tasklist"
319+
testEvents := []*s.HistoryEvent{
320+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
321+
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
322+
createTestEventDecisionTaskStarted(3),
323+
{
324+
EventId: common.Int64Ptr(4),
325+
EventType: common.EventTypePtr(s.EventTypeDecisionTaskFailed),
326+
},
327+
{
328+
EventId: common.Int64Ptr(5),
329+
EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionSignaled),
330+
},
331+
createTestEventDecisionTaskScheduled(6, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
332+
createTestEventDecisionTaskStarted(7),
333+
}
334+
core, _ := observer.New(zapcore.InfoLevel)
335+
logger := zap.New(core, zap.Development())
336+
w := workflowExecutionEventHandlerImpl{
337+
workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger},
338+
}
339+
340+
w.logger = logger
341+
historySizeSum := 0
342+
for _, event := range testEvents {
343+
sum := w.estimateHistorySize(event)
344+
historySizeSum += sum
345+
}
346+
trueSize := len(testEvents) * historySizeEstimationBuffer
347+
348+
assert.Equal(t, trueSize, historySizeSum)
349+
}
350+
351+
func TestHistoryEstimationforPackedEvents(t *testing.T) {
352+
// create an array of bytes for testing
353+
var byteArray []byte
354+
byteArray = append(byteArray, 100)
355+
taskList := "tasklist"
356+
testEvents := []*s.HistoryEvent{
357+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{
358+
TaskList: &s.TaskList{Name: &taskList},
359+
Input: byteArray,
360+
ContinuedFailureDetails: byteArray}),
361+
createTestEventWorkflowExecutionStarted(2, &s.WorkflowExecutionStartedEventAttributes{
362+
TaskList: &s.TaskList{Name: &taskList},
363+
Input: byteArray,
364+
ContinuedFailureDetails: byteArray}),
365+
createTestEventWorkflowExecutionStarted(3, &s.WorkflowExecutionStartedEventAttributes{
366+
TaskList: &s.TaskList{Name: &taskList},
367+
Input: byteArray,
368+
ContinuedFailureDetails: byteArray}),
369+
}
370+
core, _ := observer.New(zapcore.InfoLevel)
371+
logger := zap.New(core, zap.Development())
372+
w := workflowExecutionEventHandlerImpl{
373+
workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger},
374+
}
375+
376+
w.logger = logger
377+
historySizeSum := 0
378+
for _, event := range testEvents {
379+
sum := w.estimateHistorySize(event)
380+
historySizeSum += sum
381+
}
382+
trueSize := len(testEvents)*historySizeEstimationBuffer + len(byteArray)*2*len(testEvents)
383+
assert.Equal(t, trueSize, historySizeSum)
384+
}

internal/internal_task_handlers.go

Lines changed: 11 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -258,21 +258,21 @@ func isDecisionEvent(eventType s.EventType) bool {
258258

259259
// NextDecisionEvents returns events that there processed as new by the next decision.
260260
// TODO(maxim): Refactor to return a struct instead of multiple parameters
261-
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, historySize int, err error) {
261+
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
262262
if eh.next == nil {
263-
eh.next, _, historySize, err = eh.nextDecisionEvents()
263+
eh.next, _, err = eh.nextDecisionEvents()
264264
if err != nil {
265-
return result, markers, eh.binaryChecksum, historySize, err
265+
return result, markers, eh.binaryChecksum, err
266266
}
267267
}
268268

269269
result = eh.next
270270
checksum := eh.binaryChecksum
271271
if len(result) > 0 {
272-
eh.next, markers, historySize, err = eh.nextDecisionEvents()
272+
eh.next, markers, err = eh.nextDecisionEvents()
273273
}
274274

275-
return result, markers, checksum, historySize, err
275+
return result, markers, checksum, err
276276
}
277277

278278
func (eh *history) HasNextDecisionEvents() bool {
@@ -304,13 +304,12 @@ func (eh *history) verifyAllEventsProcessed() error {
304304
return nil
305305
}
306306

307-
func (eh *history) nextDecisionEvents() (nextEvents []*s.HistoryEvent, markers []*s.HistoryEvent, historySizeEstimation int, err error) {
307+
func (eh *history) nextDecisionEvents() (nextEvents []*s.HistoryEvent, markers []*s.HistoryEvent, err error) {
308308
if eh.currentIndex == len(eh.loadedEvents) && !eh.hasMoreEvents() {
309-
eh.eventsHandler.logger.Info("Returning 0 for historySizeEstimation since none will be processed")
310309
if err := eh.verifyAllEventsProcessed(); err != nil {
311-
return nil, nil, 0, err
310+
return nil, nil, err
312311
}
313-
return []*s.HistoryEvent{}, []*s.HistoryEvent{}, 0, nil
312+
return []*s.HistoryEvent{}, []*s.HistoryEvent{}, nil
314313
}
315314

316315
// Process events
@@ -371,141 +370,7 @@ OrderEvents:
371370
eh.loadedEvents = eh.loadedEvents[eh.currentIndex:]
372371
eh.currentIndex = 0
373372

374-
// estimate history size for nextEvents and markers
375-
//historySizeEstimation += eh.estimateHistorySize(nextEvents)
376-
//historySizeEstimation += eh.estimateHistorySize(markers)
377-
//
378-
//eh.eventsHandler.logger.Info(fmt.Sprintf("Returning historySizeEstimation not zero of %d", historySizeEstimation))
379-
380-
return nextEvents, markers, historySizeEstimation, nil
381-
}
382-
383-
func (eh *history) estimateHistorySize(events []*s.HistoryEvent) int {
384-
sum := 0
385-
for _, e := range events {
386-
eventSum := historySizeEstimationOffset
387-
switch e.GetEventType() {
388-
case s.EventTypeWorkflowExecutionStarted:
389-
if e.WorkflowExecutionStartedEventAttributes != nil {
390-
eventSum += len(e.WorkflowExecutionStartedEventAttributes.Input)
391-
eventSum += len(e.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
392-
eventSum += len(e.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
393-
eventSum += len(e.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
394-
}
395-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionStartedEventAttributes size is %d", eventSum))
396-
case s.EventTypeWorkflowExecutionSignaled:
397-
if e.WorkflowExecutionSignaledEventAttributes != nil {
398-
eventSum += len(e.WorkflowExecutionSignaledEventAttributes.Input)
399-
}
400-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionSignaledEventAttributes size is %d", eventSum))
401-
case s.EventTypeWorkflowExecutionFailed:
402-
if e.WorkflowExecutionFailedEventAttributes != nil {
403-
eventSum += len(e.WorkflowExecutionFailedEventAttributes.Details)
404-
}
405-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionFailedEventAttributes size is %d", eventSum))
406-
case s.EventTypeDecisionTaskCompleted:
407-
if e.DecisionTaskCompletedEventAttributes != nil {
408-
eventSum += len(e.DecisionTaskCompletedEventAttributes.ExecutionContext)
409-
}
410-
eh.eventsHandler.logger.Info(fmt.Sprintf("DecisionTaskCompletedEventAttributes size is %d", eventSum))
411-
case s.EventTypeDecisionTaskFailed:
412-
if e.DecisionTaskFailedEventAttributes != nil {
413-
eventSum += len(e.DecisionTaskFailedEventAttributes.Details)
414-
}
415-
eh.eventsHandler.logger.Info(fmt.Sprintf("DecisionTaskFailedEventAttributes size is %d", eventSum))
416-
case s.EventTypeActivityTaskScheduled:
417-
if e.ActivityTaskScheduledEventAttributes != nil {
418-
eventSum += len(e.ActivityTaskScheduledEventAttributes.Input)
419-
eventSum += len(e.ActivityTaskScheduledEventAttributes.Header.GetFields())
420-
}
421-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskScheduledEventAttributes size is %d", eventSum))
422-
case s.EventTypeActivityTaskStarted:
423-
if e.ActivityTaskStartedEventAttributes != nil {
424-
eventSum += len(e.ActivityTaskStartedEventAttributes.LastFailureDetails)
425-
}
426-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskStartedEventAttributes size is %d", eventSum))
427-
case s.EventTypeActivityTaskCompleted:
428-
if e.ActivityTaskCompletedEventAttributes != nil {
429-
eventSum += len(e.ActivityTaskCompletedEventAttributes.Result)
430-
}
431-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskCompletedEventAttributes size is %d", eventSum))
432-
case s.EventTypeActivityTaskFailed:
433-
if e.ActivityTaskFailedEventAttributes != nil {
434-
eventSum += len(e.ActivityTaskFailedEventAttributes.Details)
435-
}
436-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskFailedEventAttributes size is %d", eventSum))
437-
case s.EventTypeActivityTaskTimedOut:
438-
if e.ActivityTaskTimedOutEventAttributes != nil {
439-
eventSum += len(e.ActivityTaskTimedOutEventAttributes.Details)
440-
eventSum += len(e.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
441-
}
442-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskTimedOutEventAttributes size is %d", eventSum))
443-
case s.EventTypeActivityTaskCanceled:
444-
if e.ActivityTaskCanceledEventAttributes != nil {
445-
eventSum += len(e.ActivityTaskCanceledEventAttributes.Details)
446-
}
447-
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskCanceledEventAttributes size is %d", eventSum))
448-
case s.EventTypeMarkerRecorded:
449-
if e.MarkerRecordedEventAttributes != nil {
450-
eventSum += len(e.MarkerRecordedEventAttributes.Details)
451-
}
452-
eh.eventsHandler.logger.Info(fmt.Sprintf("MarkerRecordedEventAttributes size is %d", eventSum))
453-
case s.EventTypeWorkflowExecutionTerminated:
454-
if e.WorkflowExecutionTerminatedEventAttributes != nil {
455-
eventSum += len(e.WorkflowExecutionTerminatedEventAttributes.Details)
456-
}
457-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionTerminatedEventAttributes size is %d", eventSum))
458-
case s.EventTypeWorkflowExecutionCanceled:
459-
if e.WorkflowExecutionCanceledEventAttributes != nil {
460-
eventSum += len(e.WorkflowExecutionCanceledEventAttributes.Details)
461-
}
462-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionCanceledEventAttributes size is %d", eventSum))
463-
case s.EventTypeWorkflowExecutionContinuedAsNew:
464-
if e.WorkflowExecutionContinuedAsNewEventAttributes != nil {
465-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Input)
466-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
467-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
468-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
469-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
470-
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
471-
}
472-
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionContinuedAsNewEventAttributes size is %d", eventSum))
473-
case s.EventTypeStartChildWorkflowExecutionInitiated:
474-
if e.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
475-
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
476-
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
477-
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
478-
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
479-
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
480-
}
481-
eh.eventsHandler.logger.Info(fmt.Sprintf("StartChildWorkflowExecutionInitiatedEventAttributes size is %d", eventSum))
482-
case s.EventTypeChildWorkflowExecutionCompleted:
483-
if e.ChildWorkflowExecutionCompletedEventAttributes != nil {
484-
eventSum += len(e.ChildWorkflowExecutionCompletedEventAttributes.Result)
485-
}
486-
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionCompletedEventAttributes size is %d", eventSum))
487-
case s.EventTypeChildWorkflowExecutionFailed:
488-
if e.ChildWorkflowExecutionFailedEventAttributes != nil {
489-
eventSum += len(e.ChildWorkflowExecutionFailedEventAttributes.Details)
490-
}
491-
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionFailedEventAttributes size is %d", eventSum))
492-
case s.EventTypeChildWorkflowExecutionCanceled:
493-
if e.ChildWorkflowExecutionCanceledEventAttributes != nil {
494-
eventSum += len(e.ChildWorkflowExecutionCanceledEventAttributes.Details)
495-
}
496-
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionCanceledEventAttributes size is %d", eventSum))
497-
case s.EventTypeSignalExternalWorkflowExecutionInitiated:
498-
if e.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
499-
eventSum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
500-
eventSum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
501-
}
502-
eh.eventsHandler.logger.Info(fmt.Sprintf("SignalExternalWorkflowExecutionInitiatedEventAttributes size is %d", eventSum))
503-
default:
504-
// ignore other events
505-
}
506-
sum += eventSum
507-
}
508-
return sum
373+
return nextEvents, markers, nil
509374
}
510375

511376
func isPreloadMarkerEvent(event *s.HistoryEvent) bool {
@@ -987,17 +852,15 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
987852
// Process events
988853
ProcessEvents:
989854
for {
990-
reorderedEvents, markers, binaryChecksum, historySizeEstimation, err := reorderedHistory.NextDecisionEvents()
855+
reorderedEvents, markers, binaryChecksum, err := reorderedHistory.NextDecisionEvents()
991856
//w.workflowInfo.TotalHistoryBytes += int64(historySizeEstimation)
992857
w.wth.logger.Info("Differences between history size estimation and actual size",
993-
zap.Int("HistoryEstimation", historySizeEstimation),
994858
zap.Int64("HistorySizeEstimation", w.workflowInfo.TotalHistoryBytes),
995859
zap.Int64("ActualHistorySize", w.workflowInfo.HistoryBytesServer),
996860
zap.Int64("HistorySizeDiff", w.workflowInfo.TotalHistoryBytes-w.workflowInfo.HistoryBytesServer),
997861
zap.Float64("DiffRatio", float64(w.workflowInfo.TotalHistoryBytes)/float64(w.workflowInfo.HistoryBytesServer)),
998862
zap.String("workflowType", w.workflowInfo.WorkflowType.Name))
999-
w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge("cadence-server-historysize").Update(float64(w.workflowInfo.HistoryBytesServer))
1000-
w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge("cadence-client-historysize").Update(float64(historySizeEstimation))
863+
w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge("cadence-historysize-ratio").Update(float64(w.workflowInfo.TotalHistoryBytes) / float64(w.workflowInfo.HistoryBytesServer))
1001864
if err != nil {
1002865
return nil, err
1003866
}

internal/internal_task_handlers_interfaces_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,11 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextDecisions() {
176176

177177
eh := newHistory(workflowTask, nil)
178178

179-
events, _, _, historySize, err := eh.NextDecisionEvents()
179+
events, _, _, err := eh.NextDecisionEvents()
180180

181181
s.NoError(err)
182182
s.Equal(3, len(events))
183183
s.Equal(m.EventTypeWorkflowExecutionSignaled, events[1].GetEventType())
184184
s.Equal(m.EventTypeDecisionTaskStarted, events[2].GetEventType())
185185
s.Equal(int64(7), events[2].GetEventId())
186-
s.Equal(0, historySize)
187186
}

0 commit comments

Comments
 (0)