Skip to content

Commit a16cf09

Browse files
committed
Move estimateHistorySize function to internal_utils.go
1 parent bdd300d commit a16cf09

File tree

3 files changed

+142
-139
lines changed

3 files changed

+142
-139
lines changed

internal/internal_event_handlers.go

Lines changed: 1 addition & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
942942
return err
943943
}
944944

945-
historySum := weh.estimateHistorySize(event)
945+
historySum := estimateHistorySize(weh.logger, event)
946946
atomic.AddInt64(&weh.workflowInfo.TotalHistoryBytes, int64(historySum))
947947

948948
// When replaying histories to get stack trace or current state the last event might be not
@@ -1384,139 +1384,3 @@ func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecut
13841384

13851385
return nil
13861386
}
1387-
1388-
func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.HistoryEvent) int {
1389-
sum := historySizeEstimationBuffer
1390-
switch event.GetEventType() {
1391-
case m.EventTypeWorkflowExecutionStarted:
1392-
if event.WorkflowExecutionStartedEventAttributes != nil {
1393-
sum += len(event.WorkflowExecutionStartedEventAttributes.Input)
1394-
sum += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
1395-
sum += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
1396-
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
1397-
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Header.GetFields())
1398-
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields())
1399-
}
1400-
case m.EventTypeWorkflowExecutionCompleted:
1401-
if event.WorkflowExecutionCompletedEventAttributes != nil {
1402-
sum += len(event.WorkflowExecutionCompletedEventAttributes.Result)
1403-
}
1404-
case m.EventTypeWorkflowExecutionSignaled:
1405-
if event.WorkflowExecutionSignaledEventAttributes != nil {
1406-
sum += len(event.WorkflowExecutionSignaledEventAttributes.Input)
1407-
}
1408-
case m.EventTypeWorkflowExecutionFailed:
1409-
if event.WorkflowExecutionFailedEventAttributes != nil {
1410-
sum += len(event.WorkflowExecutionFailedEventAttributes.Details)
1411-
}
1412-
case m.EventTypeDecisionTaskStarted:
1413-
if event.DecisionTaskStartedEventAttributes != nil {
1414-
sum += getLengthOfStringPointer(event.DecisionTaskStartedEventAttributes.Identity)
1415-
}
1416-
case m.EventTypeDecisionTaskCompleted:
1417-
if event.DecisionTaskCompletedEventAttributes != nil {
1418-
sum += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
1419-
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.Identity)
1420-
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.BinaryChecksum)
1421-
}
1422-
case m.EventTypeDecisionTaskFailed:
1423-
if event.DecisionTaskFailedEventAttributes != nil {
1424-
sum += len(event.DecisionTaskFailedEventAttributes.Details)
1425-
}
1426-
case m.EventTypeActivityTaskScheduled:
1427-
if event.ActivityTaskScheduledEventAttributes != nil {
1428-
sum += len(event.ActivityTaskScheduledEventAttributes.Input)
1429-
sum += sizeOf(event.ActivityTaskScheduledEventAttributes.Header.GetFields())
1430-
}
1431-
case m.EventTypeActivityTaskStarted:
1432-
if event.ActivityTaskStartedEventAttributes != nil {
1433-
sum += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails)
1434-
}
1435-
case m.EventTypeActivityTaskCompleted:
1436-
if event.ActivityTaskCompletedEventAttributes != nil {
1437-
sum += len(event.ActivityTaskCompletedEventAttributes.Result)
1438-
sum += getLengthOfStringPointer(event.ActivityTaskCompletedEventAttributes.Identity)
1439-
}
1440-
case m.EventTypeActivityTaskFailed:
1441-
if event.ActivityTaskFailedEventAttributes != nil {
1442-
sum += len(event.ActivityTaskFailedEventAttributes.Details)
1443-
}
1444-
case m.EventTypeActivityTaskTimedOut:
1445-
if event.ActivityTaskTimedOutEventAttributes != nil {
1446-
sum += len(event.ActivityTaskTimedOutEventAttributes.Details)
1447-
sum += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
1448-
}
1449-
case m.EventTypeActivityTaskCanceled:
1450-
if event.ActivityTaskCanceledEventAttributes != nil {
1451-
sum += len(event.ActivityTaskCanceledEventAttributes.Details)
1452-
}
1453-
case m.EventTypeMarkerRecorded:
1454-
if event.MarkerRecordedEventAttributes != nil {
1455-
sum += len(event.MarkerRecordedEventAttributes.Details)
1456-
}
1457-
case m.EventTypeWorkflowExecutionTerminated:
1458-
if event.WorkflowExecutionTerminatedEventAttributes != nil {
1459-
sum += len(event.WorkflowExecutionTerminatedEventAttributes.Details)
1460-
}
1461-
case m.EventTypeWorkflowExecutionCanceled:
1462-
if event.WorkflowExecutionCanceledEventAttributes != nil {
1463-
sum += len(event.WorkflowExecutionCanceledEventAttributes.Details)
1464-
}
1465-
case m.EventTypeWorkflowExecutionContinuedAsNew:
1466-
if event.WorkflowExecutionContinuedAsNewEventAttributes != nil {
1467-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
1468-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
1469-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
1470-
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
1471-
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
1472-
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
1473-
}
1474-
case m.EventTypeStartChildWorkflowExecutionInitiated:
1475-
if event.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
1476-
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
1477-
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
1478-
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
1479-
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
1480-
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
1481-
}
1482-
case m.EventTypeChildWorkflowExecutionCompleted:
1483-
if event.ChildWorkflowExecutionCompletedEventAttributes != nil {
1484-
sum += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result)
1485-
}
1486-
case m.EventTypeChildWorkflowExecutionFailed:
1487-
if event.ChildWorkflowExecutionFailedEventAttributes != nil {
1488-
sum += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
1489-
sum += getLengthOfStringPointer(event.ChildWorkflowExecutionFailedEventAttributes.Reason)
1490-
}
1491-
case m.EventTypeChildWorkflowExecutionCanceled:
1492-
if event.ChildWorkflowExecutionCanceledEventAttributes != nil {
1493-
sum += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details)
1494-
}
1495-
case m.EventTypeSignalExternalWorkflowExecutionInitiated:
1496-
if event.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
1497-
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
1498-
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
1499-
}
1500-
1501-
default:
1502-
weh.logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String()))
1503-
}
1504-
1505-
return sum
1506-
}
1507-
1508-
// simple function to estimate the size of a map[string][]byte
1509-
func sizeOf(o map[string][]byte) int {
1510-
sum := 0
1511-
for k, v := range o {
1512-
sum += len(k) + len(v)
1513-
}
1514-
return sum
1515-
}
1516-
1517-
func getLengthOfStringPointer(s *string) int {
1518-
if s == nil {
1519-
return 0
1520-
}
1521-
return len(*s)
1522-
}

internal/internal_event_handlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func TestHistoryEstimationforSmallEvents(t *testing.T) {
340340
w.logger = logger
341341
historySizeSum := 0
342342
for _, event := range testEvents {
343-
sum := w.estimateHistorySize(event)
343+
sum := estimateHistorySize(logger, event)
344344
historySizeSum += sum
345345
}
346346
trueSize := len(testEvents) * historySizeEstimationBuffer
@@ -376,7 +376,7 @@ func TestHistoryEstimationforPackedEvents(t *testing.T) {
376376
w.logger = logger
377377
historySizeSum := 0
378378
for _, event := range testEvents {
379-
sum := w.estimateHistorySize(event)
379+
sum := estimateHistorySize(logger, event)
380380
historySizeSum += sum
381381
}
382382
trueSize := len(testEvents)*historySizeEstimationBuffer + len(byteArray)*2*len(testEvents)

internal/internal_utils.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"context"
2727
"encoding/json"
2828
"fmt"
29+
"go.uber.org/zap"
2930
"os"
3031
"os/signal"
3132
"strings"
@@ -354,3 +355,141 @@ func getTimeoutTypeFromErrReason(reason string) (s.TimeoutType, error) {
354355
}
355356
return timeoutType, nil
356357
}
358+
359+
func estimateHistorySize(logger *zap.Logger, event *s.HistoryEvent) int {
360+
sum := historySizeEstimationBuffer
361+
switch event.GetEventType() {
362+
case s.EventTypeWorkflowExecutionStarted:
363+
if event.WorkflowExecutionStartedEventAttributes != nil {
364+
sum += len(event.WorkflowExecutionStartedEventAttributes.Input)
365+
sum += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
366+
sum += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
367+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
368+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Header.GetFields())
369+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields())
370+
}
371+
case s.EventTypeWorkflowExecutionCompleted:
372+
if event.WorkflowExecutionCompletedEventAttributes != nil {
373+
sum += len(event.WorkflowExecutionCompletedEventAttributes.Result)
374+
}
375+
case s.EventTypeWorkflowExecutionSignaled:
376+
if event.WorkflowExecutionSignaledEventAttributes != nil {
377+
sum += len(event.WorkflowExecutionSignaledEventAttributes.Input)
378+
}
379+
case s.EventTypeWorkflowExecutionFailed:
380+
if event.WorkflowExecutionFailedEventAttributes != nil {
381+
sum += len(event.WorkflowExecutionFailedEventAttributes.Details)
382+
}
383+
case s.EventTypeDecisionTaskStarted:
384+
if event.DecisionTaskStartedEventAttributes != nil {
385+
sum += getLengthOfStringPointer(event.DecisionTaskStartedEventAttributes.Identity)
386+
}
387+
case s.EventTypeDecisionTaskCompleted:
388+
if event.DecisionTaskCompletedEventAttributes != nil {
389+
sum += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
390+
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.Identity)
391+
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.BinaryChecksum)
392+
}
393+
case s.EventTypeDecisionTaskFailed:
394+
if event.DecisionTaskFailedEventAttributes != nil {
395+
sum += len(event.DecisionTaskFailedEventAttributes.Details)
396+
}
397+
case s.EventTypeActivityTaskScheduled:
398+
if event.ActivityTaskScheduledEventAttributes != nil {
399+
sum += len(event.ActivityTaskScheduledEventAttributes.Input)
400+
sum += sizeOf(event.ActivityTaskScheduledEventAttributes.Header.GetFields())
401+
}
402+
case s.EventTypeActivityTaskStarted:
403+
if event.ActivityTaskStartedEventAttributes != nil {
404+
sum += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails)
405+
}
406+
case s.EventTypeActivityTaskCompleted:
407+
if event.ActivityTaskCompletedEventAttributes != nil {
408+
sum += len(event.ActivityTaskCompletedEventAttributes.Result)
409+
sum += getLengthOfStringPointer(event.ActivityTaskCompletedEventAttributes.Identity)
410+
}
411+
case s.EventTypeActivityTaskFailed:
412+
if event.ActivityTaskFailedEventAttributes != nil {
413+
sum += len(event.ActivityTaskFailedEventAttributes.Details)
414+
}
415+
case s.EventTypeActivityTaskTimedOut:
416+
if event.ActivityTaskTimedOutEventAttributes != nil {
417+
sum += len(event.ActivityTaskTimedOutEventAttributes.Details)
418+
sum += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
419+
}
420+
case s.EventTypeActivityTaskCanceled:
421+
if event.ActivityTaskCanceledEventAttributes != nil {
422+
sum += len(event.ActivityTaskCanceledEventAttributes.Details)
423+
}
424+
case s.EventTypeMarkerRecorded:
425+
if event.MarkerRecordedEventAttributes != nil {
426+
sum += len(event.MarkerRecordedEventAttributes.Details)
427+
}
428+
case s.EventTypeWorkflowExecutionTerminated:
429+
if event.WorkflowExecutionTerminatedEventAttributes != nil {
430+
sum += len(event.WorkflowExecutionTerminatedEventAttributes.Details)
431+
}
432+
case s.EventTypeWorkflowExecutionCanceled:
433+
if event.WorkflowExecutionCanceledEventAttributes != nil {
434+
sum += len(event.WorkflowExecutionCanceledEventAttributes.Details)
435+
}
436+
case s.EventTypeWorkflowExecutionContinuedAsNew:
437+
if event.WorkflowExecutionContinuedAsNewEventAttributes != nil {
438+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
439+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
440+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
441+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
442+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
443+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
444+
}
445+
case s.EventTypeStartChildWorkflowExecutionInitiated:
446+
if event.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
447+
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
448+
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
449+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
450+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
451+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
452+
}
453+
case s.EventTypeChildWorkflowExecutionCompleted:
454+
if event.ChildWorkflowExecutionCompletedEventAttributes != nil {
455+
sum += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result)
456+
}
457+
case s.EventTypeChildWorkflowExecutionFailed:
458+
if event.ChildWorkflowExecutionFailedEventAttributes != nil {
459+
sum += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
460+
sum += getLengthOfStringPointer(event.ChildWorkflowExecutionFailedEventAttributes.Reason)
461+
}
462+
case s.EventTypeChildWorkflowExecutionCanceled:
463+
if event.ChildWorkflowExecutionCanceledEventAttributes != nil {
464+
sum += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details)
465+
}
466+
case s.EventTypeSignalExternalWorkflowExecutionInitiated:
467+
if event.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
468+
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
469+
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
470+
}
471+
472+
default:
473+
logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String()))
474+
// Do not fail to be forward compatible with new events
475+
}
476+
477+
return sum
478+
}
479+
480+
// simple function to estimate the size of a map[string][]byte
481+
func sizeOf(o map[string][]byte) int {
482+
sum := 0
483+
for k, v := range o {
484+
sum += len(k) + len(v)
485+
}
486+
return sum
487+
}
488+
489+
// simple function to estimate the size of a string pointer
490+
func getLengthOfStringPointer(s *string) int {
491+
if s == nil {
492+
return 0
493+
}
494+
return len(*s)
495+
}

0 commit comments

Comments
 (0)