Skip to content

Commit ba6be7f

Browse files
committed
Change buffer size to 100 and add logger tag for workflowtype, also add more logs
1 parent 5ff41f1 commit ba6be7f

File tree

1 file changed

+63
-40
lines changed

1 file changed

+63
-40
lines changed

internal/internal_task_handlers.go

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const (
5252

5353
noRetryBackoff = time.Duration(-1)
5454

55-
historySizeEstimationOffset = 1 * 1024
55+
historySizeEstimationOffset = 1 * 100
5656
)
5757

5858
type (
@@ -372,116 +372,139 @@ OrderEvents:
372372
eh.currentIndex = 0
373373

374374
// estimate history size for nextEvents and markers
375-
historySizeEstimation += estimateHistorySize(nextEvents)
376-
historySizeEstimation += estimateHistorySize(markers)
375+
historySizeEstimation += eh.estimateHistorySize(nextEvents)
376+
historySizeEstimation += eh.estimateHistorySize(markers)
377377

378378
eh.eventsHandler.logger.Info(fmt.Sprintf("Returning historySizeEstimation not zero of %d", historySizeEstimation))
379379

380380
return nextEvents, markers, historySizeEstimation, nil
381381
}
382382

383-
func estimateHistorySize(events []*s.HistoryEvent) int {
383+
func (eh *history) estimateHistorySize(events []*s.HistoryEvent) int {
384384
sum := 0
385385
for _, e := range events {
386+
eventSum := historySizeEstimationOffset
386387
switch e.GetEventType() {
387388
case s.EventTypeWorkflowExecutionStarted:
388389
if e.WorkflowExecutionStartedEventAttributes != nil {
389-
sum += len(e.WorkflowExecutionStartedEventAttributes.Input)
390-
sum += len(e.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
391-
sum += len(e.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
392-
sum += len(e.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
390+
eventSum += len(e.WorkflowExecutionStartedEventAttributes.Input)
391+
eventSum += len(e.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
392+
eventSum += len(e.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
393+
eventSum += len(e.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
393394
}
395+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionStartedEventAttributes size is %d", eventSum))
394396
case s.EventTypeWorkflowExecutionSignaled:
395397
if e.WorkflowExecutionSignaledEventAttributes != nil {
396-
sum += len(e.WorkflowExecutionSignaledEventAttributes.Input)
398+
eventSum += len(e.WorkflowExecutionSignaledEventAttributes.Input)
397399
}
400+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionSignaledEventAttributes size is %d", eventSum))
398401
case s.EventTypeWorkflowExecutionFailed:
399402
if e.WorkflowExecutionFailedEventAttributes != nil {
400-
sum += len(e.WorkflowExecutionFailedEventAttributes.Details)
403+
eventSum += len(e.WorkflowExecutionFailedEventAttributes.Details)
401404
}
405+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionFailedEventAttributes size is %d", eventSum))
402406
case s.EventTypeDecisionTaskCompleted:
403407
if e.DecisionTaskCompletedEventAttributes != nil {
404-
sum += len(e.DecisionTaskCompletedEventAttributes.ExecutionContext)
408+
eventSum += len(e.DecisionTaskCompletedEventAttributes.ExecutionContext)
405409
}
410+
eh.eventsHandler.logger.Info(fmt.Sprintf("DecisionTaskCompletedEventAttributes size is %d", eventSum))
406411
case s.EventTypeDecisionTaskFailed:
407412
if e.DecisionTaskFailedEventAttributes != nil {
408-
sum += len(e.DecisionTaskFailedEventAttributes.Details)
413+
eventSum += len(e.DecisionTaskFailedEventAttributes.Details)
409414
}
415+
eh.eventsHandler.logger.Info(fmt.Sprintf("DecisionTaskFailedEventAttributes size is %d", eventSum))
410416
case s.EventTypeActivityTaskScheduled:
411417
if e.ActivityTaskScheduledEventAttributes != nil {
412-
sum += len(e.ActivityTaskScheduledEventAttributes.Input)
413-
sum += len(e.ActivityTaskScheduledEventAttributes.Header.GetFields())
418+
eventSum += len(e.ActivityTaskScheduledEventAttributes.Input)
419+
eventSum += len(e.ActivityTaskScheduledEventAttributes.Header.GetFields())
414420
}
421+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskScheduledEventAttributes size is %d", eventSum))
415422
case s.EventTypeActivityTaskStarted:
416423
if e.ActivityTaskStartedEventAttributes != nil {
417-
sum += len(e.ActivityTaskStartedEventAttributes.LastFailureDetails)
424+
eventSum += len(e.ActivityTaskStartedEventAttributes.LastFailureDetails)
418425
}
426+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskStartedEventAttributes size is %d", eventSum))
419427
case s.EventTypeActivityTaskCompleted:
420428
if e.ActivityTaskCompletedEventAttributes != nil {
421-
sum += len(e.ActivityTaskCompletedEventAttributes.Result)
429+
eventSum += len(e.ActivityTaskCompletedEventAttributes.Result)
422430
}
431+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskCompletedEventAttributes size is %d", eventSum))
423432
case s.EventTypeActivityTaskFailed:
424433
if e.ActivityTaskFailedEventAttributes != nil {
425-
sum += len(e.ActivityTaskFailedEventAttributes.Details)
434+
eventSum += len(e.ActivityTaskFailedEventAttributes.Details)
426435
}
436+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskFailedEventAttributes size is %d", eventSum))
427437
case s.EventTypeActivityTaskTimedOut:
428438
if e.ActivityTaskTimedOutEventAttributes != nil {
429-
sum += len(e.ActivityTaskTimedOutEventAttributes.Details)
430-
sum += len(e.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
439+
eventSum += len(e.ActivityTaskTimedOutEventAttributes.Details)
440+
eventSum += len(e.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
431441
}
442+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskTimedOutEventAttributes size is %d", eventSum))
432443
case s.EventTypeActivityTaskCanceled:
433444
if e.ActivityTaskCanceledEventAttributes != nil {
434-
sum += len(e.ActivityTaskCanceledEventAttributes.Details)
445+
eventSum += len(e.ActivityTaskCanceledEventAttributes.Details)
435446
}
447+
eh.eventsHandler.logger.Info(fmt.Sprintf("ActivityTaskCanceledEventAttributes size is %d", eventSum))
436448
case s.EventTypeMarkerRecorded:
437449
if e.MarkerRecordedEventAttributes != nil {
438-
sum += len(e.MarkerRecordedEventAttributes.Details)
450+
eventSum += len(e.MarkerRecordedEventAttributes.Details)
439451
}
452+
eh.eventsHandler.logger.Info(fmt.Sprintf("MarkerRecordedEventAttributes size is %d", eventSum))
440453
case s.EventTypeWorkflowExecutionTerminated:
441454
if e.WorkflowExecutionTerminatedEventAttributes != nil {
442-
sum += len(e.WorkflowExecutionTerminatedEventAttributes.Details)
455+
eventSum += len(e.WorkflowExecutionTerminatedEventAttributes.Details)
443456
}
457+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionTerminatedEventAttributes size is %d", eventSum))
444458
case s.EventTypeWorkflowExecutionCanceled:
445459
if e.WorkflowExecutionCanceledEventAttributes != nil {
446-
sum += len(e.WorkflowExecutionCanceledEventAttributes.Details)
460+
eventSum += len(e.WorkflowExecutionCanceledEventAttributes.Details)
447461
}
462+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionCanceledEventAttributes size is %d", eventSum))
448463
case s.EventTypeWorkflowExecutionContinuedAsNew:
449464
if e.WorkflowExecutionContinuedAsNewEventAttributes != nil {
450-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Input)
451-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
452-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
453-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
454-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
455-
sum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
465+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Input)
466+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
467+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
468+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
469+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
470+
eventSum += len(e.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
456471
}
472+
eh.eventsHandler.logger.Info(fmt.Sprintf("WorkflowExecutionContinuedAsNewEventAttributes size is %d", eventSum))
457473
case s.EventTypeStartChildWorkflowExecutionInitiated:
458474
if e.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
459-
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
460-
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
461-
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
462-
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
463-
sum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
475+
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
476+
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
477+
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
478+
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
479+
eventSum += len(e.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
464480
}
481+
eh.eventsHandler.logger.Info(fmt.Sprintf("StartChildWorkflowExecutionInitiatedEventAttributes size is %d", eventSum))
465482
case s.EventTypeChildWorkflowExecutionCompleted:
466483
if e.ChildWorkflowExecutionCompletedEventAttributes != nil {
467-
sum += len(e.ChildWorkflowExecutionCompletedEventAttributes.Result)
484+
eventSum += len(e.ChildWorkflowExecutionCompletedEventAttributes.Result)
468485
}
486+
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionCompletedEventAttributes size is %d", eventSum))
469487
case s.EventTypeChildWorkflowExecutionFailed:
470488
if e.ChildWorkflowExecutionFailedEventAttributes != nil {
471-
sum += len(e.ChildWorkflowExecutionFailedEventAttributes.Details)
489+
eventSum += len(e.ChildWorkflowExecutionFailedEventAttributes.Details)
472490
}
491+
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionFailedEventAttributes size is %d", eventSum))
473492
case s.EventTypeChildWorkflowExecutionCanceled:
474493
if e.ChildWorkflowExecutionCanceledEventAttributes != nil {
475-
sum += len(e.ChildWorkflowExecutionCanceledEventAttributes.Details)
494+
eventSum += len(e.ChildWorkflowExecutionCanceledEventAttributes.Details)
476495
}
496+
eh.eventsHandler.logger.Info(fmt.Sprintf("ChildWorkflowExecutionCanceledEventAttributes size is %d", eventSum))
477497
case s.EventTypeSignalExternalWorkflowExecutionInitiated:
478498
if e.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
479-
sum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
480-
sum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
499+
eventSum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
500+
eventSum += len(e.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
481501
}
502+
eh.eventsHandler.logger.Info(fmt.Sprintf("SignalExternalWorkflowExecutionInitiatedEventAttributes size is %d", eventSum))
503+
default:
504+
// ignore other events
482505
}
506+
sum += eventSum
483507
}
484-
sum += historySizeEstimationOffset
485508
return sum
486509
}
487510

0 commit comments

Comments
 (0)