Skip to content

Commit 76613c6

Browse files
committed
Add operation tag to the blob size error metric
1 parent 86628e6 commit 76613c6

File tree

7 files changed

+39
-40
lines changed

7 files changed

+39
-40
lines changed

common/metrics/metric_defs.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -647,18 +647,18 @@ var (
647647
TlsCertsExpired = NewGaugeDef("certificates_expired")
648648
TlsCertsExpiring = NewGaugeDef("certificates_expiring")
649649
ServiceAuthorizationLatency = NewTimerDef("service_authorization_latency")
650-
EventBlobSize = NewBytesHistogramDef("event_blob_size")
651-
EventBlobSizeError = NewCounterDef(
650+
EventBlobSize = NewBytesHistogramDef("event_blob_size")
651+
EventBlobSizeError = NewCounterDef(
652652
"event_blob_size_error",
653-
WithDescription("The number of requests that failed due to blob size exceeding the limit."),
654-
)
655-
HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client. This metric is experimental and can be removed in the future."))
656-
LockRequests = NewCounterDef("lock_requests")
657-
LockLatency = NewTimerDef("lock_latency")
658-
SemaphoreRequests = NewCounterDef("semaphore_requests")
659-
SemaphoreFailures = NewCounterDef("semaphore_failures")
660-
SemaphoreLatency = NewTimerDef("semaphore_latency")
661-
ClientRequests = NewCounterDef(
653+
WithDescription("The number of requests that failed due to blob size exceeding limits configured with BlobSizeLimitError and MemoSizeLimitError."),
654+
)
655+
HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client. This metric is experimental and can be removed in the future."))
656+
LockRequests = NewCounterDef("lock_requests")
657+
LockLatency = NewTimerDef("lock_latency")
658+
SemaphoreRequests = NewCounterDef("semaphore_requests")
659+
SemaphoreFailures = NewCounterDef("semaphore_failures")
660+
SemaphoreLatency = NewTimerDef("semaphore_latency")
661+
ClientRequests = NewCounterDef(
662662
"client_requests",
663663
WithDescription("The number of requests sent by the client to an individual service, keyed by `service_role` and `operation`."),
664664
)

common/util.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -587,22 +587,22 @@ func CheckEventBlobSizeLimit(
587587
runID string,
588588
metricsHandler metrics.Handler,
589589
logger log.Logger,
590-
blobSizeViolationOperationTag tag.ZapTag,
590+
operation string,
591591
) error {
592592

593-
metrics.EventBlobSize.With(metricsHandler).Record(int64(actualSize))
593+
metrics.EventBlobSize.With(metricsHandler).Record(int64(actualSize), metrics.OperationTag(operation))
594594
if actualSize > warnLimit {
595595
if logger != nil {
596596
logger.Warn("Blob data size exceeds the warning limit.",
597597
tag.WorkflowNamespace(namespace), // TODO: Not necessarily a "workflow" namespace, fix the tag.
598598
tag.WorkflowID(workflowID), // TODO: this should be entity ID and we need an archetype too.
599599
tag.WorkflowRunID(runID), // TODO: not necessarily a workflow run ID, fix the tag.
600600
tag.WorkflowSize(int64(actualSize)),
601-
blobSizeViolationOperationTag)
601+
tag.BlobSizeViolationOperation(operation))
602602
}
603603

604604
if actualSize > errorLimit {
605-
metrics.EventBlobSizeError.With(metricsHandler).Record(1)
605+
metrics.EventBlobSizeError.With(metricsHandler).Record(1, metrics.OperationTag(operation))
606606
return ErrBlobSizeExceedsLimit
607607
}
608608
}

service/frontend/workflow_handler.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed(
10661066
taskToken.GetRunId(),
10671067
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
10681068
wh.throttledLogger,
1069-
tag.BlobSizeViolationOperation("RespondWorkflowTaskFailed"),
1069+
"RespondWorkflowTaskFailed",
10701070
); err != nil {
10711071
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
10721072
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
@@ -1240,7 +1240,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ
12401240
taskToken.GetRunId(),
12411241
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
12421242
wh.throttledLogger,
1243-
tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
1243+
"RecordActivityTaskHeartbeat",
12441244
); err != nil {
12451245
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
12461246
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -1353,7 +1353,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context,
13531353
taskToken.GetRunId(),
13541354
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
13551355
wh.throttledLogger,
1356-
tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatById"),
1356+
"RecordActivityTaskHeartbeatById",
13571357
); err != nil {
13581358
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
13591359
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -1437,7 +1437,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
14371437
taskToken.GetRunId(),
14381438
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
14391439
wh.throttledLogger,
1440-
tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
1440+
"RespondActivityTaskCompleted",
14411441
); err != nil {
14421442
// result exceeds blob size limit, we would record it as failure
14431443
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -1548,7 +1548,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context,
15481548
runID,
15491549
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
15501550
wh.throttledLogger,
1551-
tag.BlobSizeViolationOperation("RespondActivityTaskCompletedById"),
1551+
"RespondActivityTaskCompletedById",
15521552
); err != nil {
15531553
// result exceeds blob size limit, we would record it as failure
15541554
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -1636,7 +1636,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
16361636
taskToken.GetRunId(),
16371637
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
16381638
wh.throttledLogger,
1639-
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
1639+
"RespondActivityTaskFailed",
16401640
); err != nil {
16411641
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
16421642
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))
@@ -1655,7 +1655,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
16551655
taskToken.GetRunId(),
16561656
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
16571657
wh.throttledLogger,
1658-
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
1658+
"RespondActivityTaskFailed",
16591659
); err != nil {
16601660
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
16611661
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
@@ -1759,7 +1759,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
17591759
runID,
17601760
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
17611761
wh.throttledLogger,
1762-
tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"),
1762+
"RespondActivityTaskFailedById",
17631763
); err != nil {
17641764
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
17651765
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))
@@ -1778,7 +1778,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
17781778
runID,
17791779
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
17801780
wh.throttledLogger,
1781-
tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"),
1781+
"RespondActivityTaskFailedById",
17821782
); err != nil {
17831783
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
17841784
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
@@ -1846,7 +1846,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ
18461846
taskToken.GetRunId(),
18471847
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
18481848
wh.throttledLogger,
1849-
tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
1849+
"RespondActivityTaskCanceled",
18501850
); err != nil {
18511851
// details exceeds blob size limit, we would record it as failure
18521852
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -1956,7 +1956,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context,
19561956
runID,
19571957
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
19581958
wh.throttledLogger,
1959-
tag.BlobSizeViolationOperation("RespondActivityTaskCanceledById"),
1959+
"RespondActivityTaskCanceledById",
19601960
); err != nil {
19611961
// details exceeds blob size limit, we would record it as failure
19621962
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
@@ -2070,7 +2070,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request
20702070
request.GetWorkflowExecution().GetRunId(),
20712071
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
20722072
wh.throttledLogger,
2073-
tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
2073+
"SignalWorkflowExecution",
20742074
); err != nil {
20752075
return nil, err
20762076
}
@@ -2744,7 +2744,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
27442744
"",
27452745
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
27462746
wh.throttledLogger,
2747-
tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
2747+
"RespondQueryTaskCompleted",
27482748
); err != nil {
27492749
request = &workflowservice.RespondQueryTaskCompletedRequest{
27502750
TaskToken: request.TaskToken,
@@ -2892,7 +2892,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows
28922892
request.GetExecution().GetRunId(),
28932893
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
28942894
wh.throttledLogger,
2895-
tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
2895+
"QueryWorkflow"); err != nil {
28962896
return nil, err
28972897
}
28982898

@@ -3186,7 +3186,7 @@ func (wh *WorkflowHandler) createScheduleCHASM(
31863186
"", // don't have runid yet
31873187
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
31883188
wh.throttledLogger,
3189-
tag.BlobSizeViolationOperation("CreateSchedule"),
3189+
"CreateSchedule",
31903190
); err != nil {
31913191
return nil, err
31923192
}
@@ -4100,7 +4100,7 @@ func (wh *WorkflowHandler) updateScheduleWorkflow(
41004100
"", // don't have runid yet
41014101
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
41024102
wh.throttledLogger,
4103-
tag.BlobSizeViolationOperation("UpdateSchedule"),
4103+
"UpdateSchedule",
41044104
); err != nil {
41054105
return nil, err
41064106
}
@@ -4177,7 +4177,7 @@ func (wh *WorkflowHandler) PatchSchedule(
41774177
"", // don't have runid yet
41784178
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
41794179
wh.throttledLogger,
4180-
tag.BlobSizeViolationOperation("PatchSchedule"),
4180+
"PatchSchedule",
41814181
); err != nil {
41824182
return nil, err
41834183
}
@@ -4280,7 +4280,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
42804280
"",
42814281
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
42824282
wh.throttledLogger,
4283-
tag.BlobSizeViolationOperation("ListScheduleMatchingTimes")); err != nil {
4283+
"ListScheduleMatchingTimes"); err != nil {
42844284
return nil, err
42854285
}
42864286

service/history/api/create_workflow_util.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"go.temporal.io/server/chasm"
1414
"go.temporal.io/server/common"
1515
"go.temporal.io/server/common/definition"
16-
"go.temporal.io/server/common/log/tag"
1716
"go.temporal.io/server/common/metrics"
1817
"go.temporal.io/server/common/namespace"
1918
"go.temporal.io/server/common/primitives/timestamp"
@@ -237,7 +236,7 @@ func ValidateStart(
237236
"",
238237
handlerWithCommandTag,
239238
throttledLogger,
240-
tag.BlobSizeViolationOperation(operation),
239+
operation,
241240
); err != nil {
242241
return err
243242
}
@@ -252,7 +251,7 @@ func ValidateStart(
252251
"",
253252
handlerWithCommandTag,
254253
throttledLogger,
255-
tag.BlobSizeViolationOperation(operation),
254+
operation,
256255
); err != nil {
257256
return common.ErrMemoSizeExceedsLimit
258257
}

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,7 @@ func (handler *WorkflowTaskCompletedHandler) handleBufferedQueries(
956956
runID,
957957
scope,
958958
handler.throttledLogger,
959-
tag.BlobSizeViolationOperation("ConsistentQuery"),
959+
"ConsistentQuery",
960960
); err != nil {
961961
handler.logger.Info("failing query because query result size is too large",
962962
tag.WorkflowNamespace(namespaceName.String()),

service/history/api/respondworkflowtaskcompleted/workflow_size_checker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (c *workflowSizeChecker) checkIfPayloadSizeExceedsLimit(
6868
executionState.RunId,
6969
c.metricsHandler.WithTags(commandTypeTag),
7070
c.logger,
71-
tag.BlobSizeViolationOperation(commandTypeTag.Value),
71+
commandTypeTag.Value,
7272
)
7373
if err != nil {
7474
return fmt.Errorf("%s", message) // nolint:err113
@@ -96,7 +96,7 @@ func (c *workflowSizeChecker) checkIfMemoSizeExceedsLimit(
9696
executionState.RunId,
9797
c.metricsHandler.WithTags(commandTypeTag),
9898
c.logger,
99-
tag.BlobSizeViolationOperation(commandTypeTag.Value),
99+
commandTypeTag.Value,
100100
)
101101
if err != nil {
102102
return fmt.Errorf("%s", message) // nolint:err113

service/history/api/signal_workflow_util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func ValidateSignal(
4545
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()),
4646
),
4747
shard.GetThrottledLogger(),
48-
tag.BlobSizeViolationOperation(operation),
48+
operation,
4949
); err != nil {
5050
return err
5151
}

0 commit comments

Comments
 (0)