Skip to content

Commit c20b993

Browse files
authored
Calculate workflow history size and count and expose that to client (#1270)
Enable client side estimated history size exposure via API
1 parent 4cf8503 commit c20b993

File tree

11 files changed

+256
-8
lines changed

11 files changed

+256
-8
lines changed

internal/common/metrics/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ const (
115115
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
116116
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"
117117

118+
EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
119+
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
118120
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
119121
PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage"
120122
)

internal/compatibility/proto/response.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func PollForDecisionTaskResponse(t *shared.PollForDecisionTaskResponse) *apiv1.P
223223
StartedTime: unixNanoToTime(t.StartedTimestamp),
224224
Queries: WorkflowQueryMap(t.Queries),
225225
NextEventId: t.GetNextEventId(),
226+
TotalHistoryBytes: t.GetTotalHistoryBytes(),
226227
}
227228
}
228229

internal/compatibility/thrift/response.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func PollForDecisionTaskResponse(t *apiv1.PollForDecisionTaskResponse) *shared.P
223223
StartedTimestamp: timeToUnixNano(t.StartedTime),
224224
Queries: WorkflowQueryMap(t.Queries),
225225
NextEventId: &t.NextEventId,
226+
TotalHistoryBytes: &t.TotalHistoryBytes,
226227
}
227228
}
228229

internal/internal_event_handlers.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"fmt"
3030
"reflect"
3131
"sync"
32+
"sync/atomic"
3233
"time"
3334

3435
"github.com/opentracing/opentracing-go"
@@ -43,7 +44,8 @@ import (
4344
)
4445

4546
const (
46-
queryResultSizeLimit = 2000000 // 2MB
47+
queryResultSizeLimit = 2000000 // 2MB
48+
historySizeEstimationBuffer = 400 // 400B for common fields in history event
4749
)
4850

4951
// Make sure that interfaces are implemented
@@ -940,6 +942,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
940942
return err
941943
}
942944

945+
historySum := estimateHistorySize(weh.logger, event)
946+
atomic.AddInt64(&weh.workflowInfo.TotalHistoryBytes, int64(historySum))
947+
943948
// When replaying histories to get stack trace or current state the last event might be not
944949
// decision started. So always call OnDecisionTaskStarted on the last event.
945950
// Don't call for EventType_DecisionTaskStarted as it was already called when handling it.

internal/internal_event_handlers_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package internal
2222

2323
import (
2424
"encoding/json"
25+
"go.uber.org/cadence/internal/common"
2526
"testing"
2627

2728
"github.com/stretchr/testify/assert"
@@ -312,3 +313,72 @@ func Test_CreateSearchAttributesForChangeVersion(t *testing.T) {
312313
require.True(t, ok, "Remember to update related key on server side")
313314
require.Equal(t, []string{"cid-1"}, val)
314315
}
316+
317+
func TestHistoryEstimationforSmallEvents(t *testing.T) {
318+
taskList := "tasklist"
319+
testEvents := []*s.HistoryEvent{
320+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
321+
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
322+
createTestEventDecisionTaskStarted(3),
323+
{
324+
EventId: common.Int64Ptr(4),
325+
EventType: common.EventTypePtr(s.EventTypeDecisionTaskFailed),
326+
},
327+
{
328+
EventId: common.Int64Ptr(5),
329+
EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionSignaled),
330+
},
331+
createTestEventDecisionTaskScheduled(6, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
332+
createTestEventDecisionTaskStarted(7),
333+
}
334+
core, _ := observer.New(zapcore.InfoLevel)
335+
logger := zap.New(core, zap.Development())
336+
w := workflowExecutionEventHandlerImpl{
337+
workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger},
338+
}
339+
340+
w.logger = logger
341+
historySizeSum := 0
342+
for _, event := range testEvents {
343+
sum := estimateHistorySize(logger, event)
344+
historySizeSum += sum
345+
}
346+
trueSize := len(testEvents) * historySizeEstimationBuffer
347+
348+
assert.Equal(t, trueSize, historySizeSum)
349+
}
350+
351+
func TestHistoryEstimationforPackedEvents(t *testing.T) {
352+
// create an array of bytes for testing
353+
var byteArray []byte
354+
byteArray = append(byteArray, 100)
355+
taskList := "tasklist"
356+
testEvents := []*s.HistoryEvent{
357+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{
358+
TaskList: &s.TaskList{Name: &taskList},
359+
Input: byteArray,
360+
ContinuedFailureDetails: byteArray}),
361+
createTestEventWorkflowExecutionStarted(2, &s.WorkflowExecutionStartedEventAttributes{
362+
TaskList: &s.TaskList{Name: &taskList},
363+
Input: byteArray,
364+
ContinuedFailureDetails: byteArray}),
365+
createTestEventWorkflowExecutionStarted(3, &s.WorkflowExecutionStartedEventAttributes{
366+
TaskList: &s.TaskList{Name: &taskList},
367+
Input: byteArray,
368+
ContinuedFailureDetails: byteArray}),
369+
}
370+
core, _ := observer.New(zapcore.InfoLevel)
371+
logger := zap.New(core, zap.Development())
372+
w := workflowExecutionEventHandlerImpl{
373+
workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger},
374+
}
375+
376+
w.logger = logger
377+
historySizeSum := 0
378+
for _, event := range testEvents {
379+
sum := estimateHistorySize(logger, event)
380+
historySizeSum += sum
381+
}
382+
trueSize := len(testEvents)*historySizeEstimationBuffer + len(byteArray)*2*len(testEvents)
383+
assert.Equal(t, trueSize, historySizeSum)
384+
}

internal/internal_task_handlers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,8 @@ process_Workflow_Loop:
843843
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
844844
task := workflowTask.task
845845
historyIterator := workflowTask.historyIterator
846+
w.workflowInfo.HistoryBytesServer = task.GetTotalHistoryBytes()
847+
w.workflowInfo.HistoryCount = task.GetNextEventId() - 1
846848
if err := w.ResetIfStale(task, historyIterator); err != nil {
847849
return nil, err
848850
}
@@ -866,6 +868,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
866868
ProcessEvents:
867869
for {
868870
reorderedEvents, markers, binaryChecksum, err := reorderedHistory.NextDecisionEvents()
871+
w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge(metrics.EstimatedHistorySize).Update(float64(w.workflowInfo.TotalHistoryBytes))
872+
w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge(metrics.ServerSideHistorySize).Update(float64(w.workflowInfo.HistoryBytesServer))
869873
if err != nil {
870874
return nil, err
871875
}

internal/internal_task_pollers.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type (
117117
service workflowserviceclient.Interface
118118
metricsScope tally.Scope
119119
startedEventID int64
120-
maxEventID int64
120+
maxEventID int64 // Equivalent to History Count
121121
featureFlags FeatureFlags
122122
}
123123

@@ -330,7 +330,6 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
330330
})
331331
return nil
332332
}
333-
334333
doneCh := make(chan struct{})
335334
laResultCh := make(chan *localActivityResult)
336335
// close doneCh so local activity worker won't get blocked forever when trying to send back result to laResultCh.
@@ -341,6 +340,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
341340
startTime := time.Now()
342341
task.doneCh = doneCh
343342
task.laResultCh = laResultCh
343+
// Process the task.
344344
completedRequest, err := wtp.taskHandler.ProcessWorkflowTask(
345345
task,
346346
func(response interface{}, startTime time.Time) (*workflowTask, error) {
@@ -897,6 +897,11 @@ func (h *historyIteratorImpl) HasNextPage() bool {
897897
return h.nextPageToken != nil
898898
}
899899

900+
// GetHistoryCount returns History Event Count of current history (aka maxEventID)
901+
func (h *historyIteratorImpl) GetHistoryCount() int64 {
902+
return h.maxEventID
903+
}
904+
900905
func newGetHistoryPageFunc(
901906
ctx context.Context,
902907
service workflowserviceclient.Interface,

internal/internal_utils.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"context"
2727
"encoding/json"
2828
"fmt"
29+
"go.uber.org/zap"
2930
"os"
3031
"os/signal"
3132
"strings"
@@ -354,3 +355,141 @@ func getTimeoutTypeFromErrReason(reason string) (s.TimeoutType, error) {
354355
}
355356
return timeoutType, nil
356357
}
358+
359+
func estimateHistorySize(logger *zap.Logger, event *s.HistoryEvent) int {
360+
sum := historySizeEstimationBuffer
361+
switch event.GetEventType() {
362+
case s.EventTypeWorkflowExecutionStarted:
363+
if event.WorkflowExecutionStartedEventAttributes != nil {
364+
sum += len(event.WorkflowExecutionStartedEventAttributes.Input)
365+
sum += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
366+
sum += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
367+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields())
368+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Header.GetFields())
369+
sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields())
370+
}
371+
case s.EventTypeWorkflowExecutionCompleted:
372+
if event.WorkflowExecutionCompletedEventAttributes != nil {
373+
sum += len(event.WorkflowExecutionCompletedEventAttributes.Result)
374+
}
375+
case s.EventTypeWorkflowExecutionSignaled:
376+
if event.WorkflowExecutionSignaledEventAttributes != nil {
377+
sum += len(event.WorkflowExecutionSignaledEventAttributes.Input)
378+
}
379+
case s.EventTypeWorkflowExecutionFailed:
380+
if event.WorkflowExecutionFailedEventAttributes != nil {
381+
sum += len(event.WorkflowExecutionFailedEventAttributes.Details)
382+
}
383+
case s.EventTypeDecisionTaskStarted:
384+
if event.DecisionTaskStartedEventAttributes != nil {
385+
sum += getLengthOfStringPointer(event.DecisionTaskStartedEventAttributes.Identity)
386+
}
387+
case s.EventTypeDecisionTaskCompleted:
388+
if event.DecisionTaskCompletedEventAttributes != nil {
389+
sum += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
390+
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.Identity)
391+
sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.BinaryChecksum)
392+
}
393+
case s.EventTypeDecisionTaskFailed:
394+
if event.DecisionTaskFailedEventAttributes != nil {
395+
sum += len(event.DecisionTaskFailedEventAttributes.Details)
396+
}
397+
case s.EventTypeActivityTaskScheduled:
398+
if event.ActivityTaskScheduledEventAttributes != nil {
399+
sum += len(event.ActivityTaskScheduledEventAttributes.Input)
400+
sum += sizeOf(event.ActivityTaskScheduledEventAttributes.Header.GetFields())
401+
}
402+
case s.EventTypeActivityTaskStarted:
403+
if event.ActivityTaskStartedEventAttributes != nil {
404+
sum += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails)
405+
}
406+
case s.EventTypeActivityTaskCompleted:
407+
if event.ActivityTaskCompletedEventAttributes != nil {
408+
sum += len(event.ActivityTaskCompletedEventAttributes.Result)
409+
sum += getLengthOfStringPointer(event.ActivityTaskCompletedEventAttributes.Identity)
410+
}
411+
case s.EventTypeActivityTaskFailed:
412+
if event.ActivityTaskFailedEventAttributes != nil {
413+
sum += len(event.ActivityTaskFailedEventAttributes.Details)
414+
}
415+
case s.EventTypeActivityTaskTimedOut:
416+
if event.ActivityTaskTimedOutEventAttributes != nil {
417+
sum += len(event.ActivityTaskTimedOutEventAttributes.Details)
418+
sum += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
419+
}
420+
case s.EventTypeActivityTaskCanceled:
421+
if event.ActivityTaskCanceledEventAttributes != nil {
422+
sum += len(event.ActivityTaskCanceledEventAttributes.Details)
423+
}
424+
case s.EventTypeMarkerRecorded:
425+
if event.MarkerRecordedEventAttributes != nil {
426+
sum += len(event.MarkerRecordedEventAttributes.Details)
427+
}
428+
case s.EventTypeWorkflowExecutionTerminated:
429+
if event.WorkflowExecutionTerminatedEventAttributes != nil {
430+
sum += len(event.WorkflowExecutionTerminatedEventAttributes.Details)
431+
}
432+
case s.EventTypeWorkflowExecutionCanceled:
433+
if event.WorkflowExecutionCanceledEventAttributes != nil {
434+
sum += len(event.WorkflowExecutionCanceledEventAttributes.Details)
435+
}
436+
case s.EventTypeWorkflowExecutionContinuedAsNew:
437+
if event.WorkflowExecutionContinuedAsNewEventAttributes != nil {
438+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
439+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails)
440+
sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult)
441+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields())
442+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields())
443+
sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields())
444+
}
445+
case s.EventTypeStartChildWorkflowExecutionInitiated:
446+
if event.StartChildWorkflowExecutionInitiatedEventAttributes != nil {
447+
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
448+
sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
449+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields())
450+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields())
451+
sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields())
452+
}
453+
case s.EventTypeChildWorkflowExecutionCompleted:
454+
if event.ChildWorkflowExecutionCompletedEventAttributes != nil {
455+
sum += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result)
456+
}
457+
case s.EventTypeChildWorkflowExecutionFailed:
458+
if event.ChildWorkflowExecutionFailedEventAttributes != nil {
459+
sum += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
460+
sum += getLengthOfStringPointer(event.ChildWorkflowExecutionFailedEventAttributes.Reason)
461+
}
462+
case s.EventTypeChildWorkflowExecutionCanceled:
463+
if event.ChildWorkflowExecutionCanceledEventAttributes != nil {
464+
sum += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details)
465+
}
466+
case s.EventTypeSignalExternalWorkflowExecutionInitiated:
467+
if event.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil {
468+
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
469+
sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
470+
}
471+
472+
default:
473+
logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String()))
474+
// Do not fail to be forward compatible with new events
475+
}
476+
477+
return sum
478+
}
479+
480+
// simple function to estimate the size of a map[string][]byte
481+
func sizeOf(o map[string][]byte) int {
482+
sum := 0
483+
for k, v := range o {
484+
sum += len(k) + len(v)
485+
}
486+
return sum
487+
}
488+
489+
// simple function to estimate the size of a string pointer
490+
func getLengthOfStringPointer(s *string) int {
491+
if s == nil {
492+
return 0
493+
}
494+
return len(*s)
495+
}

internal/internal_worker_base.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,7 @@ func (bw *baseWorker) Start() {
216216
go bw.runTaskDispatcher()
217217

218218
// We want the emit function run once per host instead of run once per worker
219-
//collectHardwareUsageOnce.Do(func() {
220-
// bw.shutdownWG.Add(1)
221-
// go bw.emitHardwareUsage()
222-
//})
223-
219+
// since the emit function is host level metric.
224220
bw.shutdownWG.Add(1)
225221
go bw.emitHardwareUsage()
226222

internal/workflow.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,9 @@ type WorkflowInfo struct {
11111111
BinaryChecksum *string // The identifier(generated by md5sum by default) of worker code that is making the current decision(can be used for auto-reset feature)
11121112
DecisionStartedEventID int64 // the eventID of DecisionStarted that is making the current decision(can be used for reset API)
11131113
RetryPolicy *s.RetryPolicy
1114+
TotalHistoryBytes int64
1115+
HistoryBytesServer int64
1116+
HistoryCount int64
11141117
}
11151118

11161119
// GetBinaryChecksum returns the binary checksum(identifier) of this worker
@@ -1158,6 +1161,18 @@ func (wc *workflowEnvironmentInterceptor) GetMetricsScope(ctx Context) tally.Sco
11581161
return wc.env.GetMetricsScope()
11591162
}
11601163

1164+
// GetTotalEstimatedHistoryBytes returns the current history size of that workflow
1165+
func GetTotalEstimatedHistoryBytes(ctx Context) int64 {
1166+
i := getWorkflowInterceptor(ctx)
1167+
return i.GetWorkflowInfo(ctx).TotalHistoryBytes
1168+
}
1169+
1170+
// GetHistoryCount returns the current number of history events of that workflow
1171+
func GetHistoryCount(ctx Context) int64 {
1172+
i := getWorkflowInterceptor(ctx)
1173+
return i.GetWorkflowInfo(ctx).HistoryCount
1174+
}
1175+
11611176
// Now returns the current time in UTC. It corresponds to the time when the decision task is started or replayed.
11621177
// Workflow needs to use this method to get the wall clock time instead of the one from the golang library.
11631178
func Now(ctx Context) time.Time {

0 commit comments

Comments
 (0)