Skip to content

Commit dbf41d3

Browse files
committed
respond to comments
1 parent 98ced27 commit dbf41d3

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

internal/internal_event_handlers.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"fmt"
3030
"reflect"
3131
"sync"
32+
"sync/atomic"
3233
"time"
3334

3435
"github.com/opentracing/opentracing-go"
@@ -942,7 +943,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
942943
}
943944

944945
historySum := weh.estimateHistorySize(event)
945-
weh.workflowInfo.TotalHistoryBytes += int64(historySum)
946+
atomic.AddInt64(&weh.workflowInfo.TotalHistoryBytes, int64(historySum))
946947

947948
// When replaying histories to get stack trace or current state the last event might be not
948949
// decision started. So always call OnDecisionTaskStarted on the last event.
@@ -1392,9 +1393,9 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
13921393
sum += len(event.WorkflowExecutionStartedEventAttributes.Input)
13931394
sum += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
13941395
sum += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
1395-
sum += len(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
1396-
sum += len(event.WorkflowExecutionStartedEventAttributes.Header.GetFields())
1397-
sum += len(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields())
1396+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
1397+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Header.GetFields())
1398+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields())
13981399
}
13991400
case m.EventTypeWorkflowExecutionCompleted:
14001401
if event.WorkflowExecutionCompletedEventAttributes != nil {
@@ -1408,9 +1409,15 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14081409
if event.WorkflowExecutionFailedEventAttributes != nil {
14091410
sum += len(event.WorkflowExecutionFailedEventAttributes.Details)
14101411
}
1412+
case m.EventTypeDecisionTaskStarted:
1413+
if event.DecisionTaskStartedEventAttributes != nil {
1414+
sum += len(*event.DecisionTaskStartedEventAttributes.Identity)
1415+
}
14111416
case m.EventTypeDecisionTaskCompleted:
14121417
if event.DecisionTaskCompletedEventAttributes != nil {
14131418
sum += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
1419+
sum += len(*event.DecisionTaskCompletedEventAttributes.Identity)
1420+
sum += len(*event.DecisionTaskCompletedEventAttributes.BinaryChecksum)
14141421
}
14151422
case m.EventTypeDecisionTaskFailed:
14161423
if event.DecisionTaskFailedEventAttributes != nil {
@@ -1419,7 +1426,7 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14191426
case m.EventTypeActivityTaskScheduled:
14201427
if event.ActivityTaskScheduledEventAttributes != nil {
14211428
sum += len(event.ActivityTaskScheduledEventAttributes.Input)
1422-
sum += len(event.ActivityTaskScheduledEventAttributes.Header.GetFields())
1429+
sum += sizeOf(event.ActivityTaskScheduledEventAttributes.Header.GetFields())
14231430
}
14241431
case m.EventTypeActivityTaskStarted:
14251432
if event.ActivityTaskStartedEventAttributes != nil {
@@ -1428,6 +1435,7 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14281435
case m.EventTypeActivityTaskCompleted:
14291436
if event.ActivityTaskCompletedEventAttributes != nil {
14301437
sum += len(event.ActivityTaskCompletedEventAttributes.Result)
1438+
sum += len(*event.ActivityTaskCompletedEventAttributes.Identity)
14311439
}
14321440
case m.EventTypeActivityTaskFailed:
14331441
if event.ActivityTaskFailedEventAttributes != nil {
@@ -1459,17 +1467,17 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14591467
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
14601468
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
14611469
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
1462-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
1463-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
1464-
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
1470+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
1471+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
1472+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
14651473
}
14661474
case m.EventTypeStartChildWorkflowExecutionInitiated:
14671475
if event.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
14681476
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
14691477
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
1470-
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
1471-
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
1472-
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
1478+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
1479+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
1480+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
14731481
}
14741482
case m.EventTypeChildWorkflowExecutionCompleted:
14751483
if event.ChildWorkflowExecutionCompletedEventAttributes != nil {
@@ -1478,6 +1486,7 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14781486
case m.EventTypeChildWorkflowExecutionFailed:
14791487
if event.ChildWorkflowExecutionFailedEventAttributes != nil {
14801488
sum += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
1489+
sum += len(*event.ChildWorkflowExecutionFailedEventAttributes.Reason)
14811490
}
14821491
case m.EventTypeChildWorkflowExecutionCanceled:
14831492
if event.ChildWorkflowExecutionCanceledEventAttributes != nil {
@@ -1488,9 +1497,19 @@ func (weh *workflowExecutionEventHandlerImpl) estimateHistorySize(event *m.Histo
14881497
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
14891498
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
14901499
}
1500+
14911501
default:
14921502
weh.logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String()))
14931503
}
14941504

14951505
return sum
14961506
}
1507+
1508+
// simple function to estimate the size of a map[string][]byte
1509+
func sizeOf(o map[string][]byte) int {
1510+
sum := 0
1511+
for k, v := range o {
1512+
sum += len(k) + len(v)
1513+
}
1514+
return sum
1515+
}

internal/internal_task_pollers.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -809,8 +809,6 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
809809

810810
wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint())
811811

812-
wtp.logger.Info(fmt.Sprintf("Tasklist backlog count is %d", response.GetBacklogCountHint()))
813-
814812
task := wtp.toWorkflowTask(response)
815813
traceLog(func() {
816814
var firstEventID int64 = -1

0 commit comments

Comments
 (0)