Skip to content

Commit dad83ff

Browse files
authored
Add Domain Redirection Metrics for Active-Active (#7202)
<!-- Describe what has changed in this PR --> **What changed?** Add Domain Redirection Metrics for Active-Active workflows. Add additional debug logs for cluster redirection decisions. <!-- Tell your future self why have you made these changes --> **Why?** This should help diagnose any issues with request forwarding for active-active workflows, as well as add additional support for debugging requests with QueryConsistencyLevel specified. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Validated in dev env. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** N/A <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** N/A <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** N/A
1 parent a07ce67 commit dad83ff

File tree

8 files changed

+611
-13
lines changed

8 files changed

+611
-13
lines changed

common/metrics/defs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,8 @@ const (
703703
DCRedirectionRefreshWorkflowTasksScope
704704
// DCRedirectionRestartWorkflowExecutionScope tracks RPC calls for dc redirection
705705
DCRedirectionRestartWorkflowExecutionScope
706+
// DCRedirectionForwardingPolicyScope tracks cluster redirection decisions
707+
DCRedirectionForwardingPolicyScope
706708

707709
// MessagingPublishScope tracks Publish calls made by service to messaging layer
708710
MessagingClientPublishScope
@@ -1758,6 +1760,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
17581760
DCRedirectionGetTaskListsByDomainScope: {operation: "DCRedirectionGetTaskListsByDomain", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
17591761
DCRedirectionRefreshWorkflowTasksScope: {operation: "DCRedirectionRefreshWorkflowTasks", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
17601762
DCRedirectionRestartWorkflowExecutionScope: {operation: "DCRedirectionRestartWorkflowExecution", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
1763+
DCRedirectionForwardingPolicyScope: {operation: "DCRedirectionForwardingPolicy", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
17611764

17621765
MessagingClientPublishScope: {operation: "MessagingClientPublish"},
17631766
MessagingClientPublishBatchScope: {operation: "MessagingClientPublishBatch"},
@@ -2398,6 +2401,9 @@ const (
23982401
ActiveClusterManagerLookupFailureCount
23992402
ActiveClusterManagerLookupLatency
24002403

2404+
// cluster forwarding policy metrics
2405+
ClusterForwardingPolicyRequests
2406+
24012407
NumCommonMetrics // Needs to be last on this list for iota numbering
24022408
)
24032409

@@ -3180,6 +3186,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
31803186
ActiveClusterManagerLookupSuccessCount: {metricName: "active_cluster_manager_lookup_success_count", metricType: Counter},
31813187
ActiveClusterManagerLookupFailureCount: {metricName: "active_cluster_manager_lookup_failure_count", metricType: Counter},
31823188
ActiveClusterManagerLookupLatency: {metricName: "active_cluster_manager_lookup_latency", metricType: Histogram, buckets: ExponentialDurationBuckets},
3189+
3190+
ClusterForwardingPolicyRequests: {metricName: "cluster_forwarding_policy_requests", metricType: Counter},
31833191
},
31843192
History: {
31853193
TaskRequests: {metricName: "task_requests", metricType: Counter},

common/metrics/tags.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const (
7070
topic = "topic"
7171
mode = "mode"
7272
isRetry = "is_retry"
73+
queryConsistencyLevel = "query_consistency_level"
7374

7475
// limiter-side tags
7576
globalRatelimitKey = "global_ratelimit_key"
@@ -369,3 +370,8 @@ func ActiveClusterLookupFnTag(fn string) Tag {
369370
func ActiveClusterSelectionStrategyTag(strategy string) Tag {
370371
return metricWithUnknown("strategy", strategy)
371372
}
373+
374+
// QueryConsistencyLevelTag returns a new query consistency level tag.
375+
func QueryConsistencyLevelTag(level string) Tag {
376+
return metricWithUnknown(queryConsistencyLevel, level)
377+
}

service/frontend/templates/clusterredirection.tmpl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func NewAPIHandler(
4646
policy,
4747
resource.GetLogger(),
4848
resource.GetActiveClusterManager(),
49+
resource.GetMetricsClient(),
4950
)
5051

5152
return &clusterRedirectionHandler{

service/frontend/wrappers/clusterredirection/api_generated.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/frontend/wrappers/clusterredirection/api_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,7 @@ func (s *clusterRedirectionHandlerSuite) TestGetTaskListsByDomainError() {
10861086
"",
10871087
s.mockResource.GetLogger(),
10881088
s.mockResource.GetActiveClusterManager(),
1089+
s.mockResource.GetMetricsClient(),
10891090
)
10901091
ctx := context.Background()
10911092
req := &types.GetTaskListsByDomainRequest{

service/frontend/wrappers/clusterredirection/policy.go

Lines changed: 100 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/uber/cadence/common/config"
3232
"github.com/uber/cadence/common/log"
3333
"github.com/uber/cadence/common/log/tag"
34+
"github.com/uber/cadence/common/metrics"
3435
"github.com/uber/cadence/common/types"
3536
frontendcfg "github.com/uber/cadence/service/frontend/config"
3637
)
@@ -115,6 +116,7 @@ type (
115116
targetCluster string
116117
logger log.Logger
117118
activeClusterManager activecluster.Manager
119+
metricsClient metrics.Client
118120
}
119121
)
120122

@@ -165,6 +167,7 @@ func RedirectionPolicyGenerator(
165167
policy config.ClusterRedirectionPolicy,
166168
logger log.Logger,
167169
activeClusterManager activecluster.Manager,
170+
metricsClient metrics.Client,
168171
) ClusterRedirectionPolicy {
169172
switch policy.Policy {
170173
case DCRedirectionPolicyDefault:
@@ -174,16 +177,16 @@ func RedirectionPolicyGenerator(
174177
return newNoopRedirectionPolicy(clusterMetadata.GetCurrentClusterName())
175178
case DCRedirectionPolicySelectedAPIsForwarding:
176179
currentClusterName := clusterMetadata.GetCurrentClusterName()
177-
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, "", logger, activeClusterManager)
180+
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, "", logger, activeClusterManager, metricsClient)
178181
case DCRedirectionPolicySelectedAPIsForwardingV2:
179182
currentClusterName := clusterMetadata.GetCurrentClusterName()
180-
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, "", logger, activeClusterManager)
183+
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, "", logger, activeClusterManager, metricsClient)
181184
case DCRedirectionPolicyAllDomainAPIsForwarding:
182185
currentClusterName := clusterMetadata.GetCurrentClusterName()
183-
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, policy.AllDomainApisForwardingTargetCluster, logger, activeClusterManager)
186+
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, policy.AllDomainApisForwardingTargetCluster, logger, activeClusterManager, metricsClient)
184187
case DCRedirectionPolicyAllDomainAPIsForwardingV2:
185188
currentClusterName := clusterMetadata.GetCurrentClusterName()
186-
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, policy.AllDomainApisForwardingTargetCluster, logger, activeClusterManager)
189+
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, policy.AllDomainApisForwardingTargetCluster, logger, activeClusterManager, metricsClient)
187190

188191
default:
189192
panic(fmt.Sprintf("Unknown DC redirection policy %v", policy.Policy))
@@ -219,6 +222,7 @@ func newSelectedOrAllAPIsForwardingPolicy(
219222
targetCluster string,
220223
logger log.Logger,
221224
activeClusterManager activecluster.Manager,
225+
metricsClient metrics.Client,
222226
) *selectedOrAllAPIsForwardingRedirectionPolicy {
223227
return &selectedOrAllAPIsForwardingRedirectionPolicy{
224228
currentClusterName: currentClusterName,
@@ -228,6 +232,7 @@ func newSelectedOrAllAPIsForwardingPolicy(
228232
targetCluster: targetCluster,
229233
logger: logger,
230234
activeClusterManager: activeClusterManager,
235+
metricsClient: metricsClient,
231236
}
232237
}
233238

@@ -264,30 +269,65 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) withRedirect(
264269
call func(string) error,
265270
) error {
266271
targetDC, enableDomainNotActiveForwarding := policy.getTargetClusterAndIsDomainNotActiveAutoForwarding(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName, requestedConsistencyLevel)
267-
268-
policy.logger.Debugf("Calling API %q on target cluster:%q for domain:%q", apiName, targetDC, domainEntry.GetInfo().Name)
272+
domainName := domainEntry.GetInfo().Name
273+
274+
policy.logger.Info(
275+
"Calling API on target cluster for domain",
276+
tag.OperationName(apiName),
277+
tag.ClusterName(policy.currentClusterName),
278+
tag.ActiveClusterName(targetDC),
279+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
280+
)
269281
err := call(targetDC)
282+
scope := policy.metricsClient.Scope(metrics.DCRedirectionForwardingPolicyScope).Tagged(
283+
append(
284+
metrics.GetContextTags(ctx),
285+
metrics.DomainTag(domainName),
286+
metrics.SourceClusterTag(policy.currentClusterName),
287+
metrics.TargetClusterTag(targetDC),
288+
metrics.IsActiveActiveDomainTag(actClSelPolicyForNewWF != nil),
289+
metrics.QueryConsistencyLevelTag(requestedConsistencyLevel.String()),
290+
)...,
291+
)
270292

271293
var domainNotActiveErr *types.DomainNotActiveError
272294
ok := errors.As(err, &domainNotActiveErr)
273295
if !ok || !enableDomainNotActiveForwarding {
296+
policy.logger.Debugf("Redirection not enabled for request domain:%q, api: %q", domainName, apiName)
297+
scope.IncCounter(metrics.ClusterForwardingPolicyRequests)
274298
return err
275299
}
276300

277-
// TODO(active-active): emit a metric here including apiName, targetDC and newTargetDC tags
278-
// This can only happen if there was a failover during the API call.
279-
// Forward the request the the active cluster specified in the error
301+
scope = scope.Tagged(metrics.ActiveClusterTag(domainNotActiveErr.ActiveCluster))
302+
scope.IncCounter(metrics.ClusterForwardingPolicyRequests)
303+
280304
if domainNotActiveErr.ActiveCluster == "" {
281-
policy.logger.Debugf("No active cluster specified in the error returned from cluster:%q for domain:%q, api: %q so skipping redirect", targetDC, domainEntry.GetInfo().Name, apiName)
305+
policy.logger.Debug(
306+
"No active cluster specified in the error returned from cluster, skipping redirect",
307+
tag.ClusterName(targetDC),
308+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
309+
tag.OperationName(apiName),
310+
)
282311
return err
283312
}
284313

285314
if domainNotActiveErr.ActiveCluster == targetDC {
286-
policy.logger.Debugf("No need to redirect to new target cluster:%q for domain:%q, api: %q", targetDC, domainEntry.GetInfo().Name, apiName)
315+
policy.logger.Debug(
316+
"No need to redirect to new target cluster",
317+
tag.ClusterName(targetDC),
318+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
319+
tag.OperationName(apiName),
320+
)
287321
return err
288322
}
289323

290-
policy.logger.Debugf("Calling API %q on new target cluster:%q for domain:%q as indicated by response from cluster:%q", apiName, domainNotActiveErr.ActiveCluster, domainEntry.GetInfo().Name, targetDC)
324+
policy.logger.Debug(
325+
"Calling API on new target cluster for domain as indicated by response from cluster",
326+
tag.OperationName(apiName),
327+
tag.ClusterName(domainNotActiveErr.ActiveCluster),
328+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
329+
tag.ClusterName(targetDC),
330+
)
291331
return call(domainNotActiveErr.ActiveCluster)
292332
}
293333

@@ -303,39 +343,86 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) getTargetClusterAndI
303343
if !domainEntry.IsGlobalDomain() {
304344
// Do not do dc redirection if domain is local domain,
305345
// for global domains with 1 dc, it's still useful to do auto-forwarding during cluster migration
346+
policy.logger.Debug(
347+
"Local domain, routing to current cluster",
348+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
349+
tag.ClusterName(policy.currentClusterName),
350+
)
306351
return policy.currentClusterName, false
307352
}
308353

309354
if !policy.config.EnableDomainNotActiveAutoForwarding(domainEntry.GetInfo().Name) {
310355
// Do not do dc redirection if auto-forwarding dynamicconfig is not enabled
356+
policy.logger.Debug(
357+
"Auto-forwarding dynamicconfig is not enabled, routing to current cluster",
358+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
359+
tag.ClusterName(policy.currentClusterName),
360+
)
311361
return policy.currentClusterName, false
312362
}
313363

314364
currentActiveCluster := domainEntry.GetReplicationConfig().ActiveClusterName
315365
if domainEntry.GetReplicationConfig().IsActiveActive() {
316-
currentActiveCluster = policy.activeClusterForActiveActiveDomainRequest(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName)
366+
workflowActiveCluster := policy.activeClusterForActiveActiveDomainRequest(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName)
367+
policy.logger.Debug(
368+
"Active-active domain, routing to active cluster",
369+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
370+
tag.ClusterName(currentActiveCluster),
371+
tag.ActiveClusterName(workflowActiveCluster),
372+
)
373+
currentActiveCluster = workflowActiveCluster
317374
}
318375

319376
if policy.allDomainAPIs {
320377
if policy.targetCluster == "" {
378+
policy.logger.Debug(
379+
"All domain APIs forwarding, routing to active cluster",
380+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
381+
tag.ClusterName(policy.currentClusterName),
382+
tag.ActiveClusterName(currentActiveCluster),
383+
)
321384
return currentActiveCluster, true
322385
}
323386
if policy.targetCluster == currentActiveCluster {
387+
policy.logger.Debug(
388+
"All domain APIs forwarding, routing to active cluster",
389+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
390+
tag.ClusterName(policy.currentClusterName),
391+
tag.ActiveClusterName(currentActiveCluster),
392+
)
324393
return currentActiveCluster, true
325394
}
326395
// fallback to selected APIs if targetCluster is not empty and not the same as currentActiveCluster
327396
}
328397

329398
if requestedConsistencyLevel == types.QueryConsistencyLevelStrong {
399+
policy.logger.Debug(
400+
"Query requested strong consistency, routing to active cluster",
401+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
402+
tag.ClusterName(policy.currentClusterName),
403+
tag.ActiveClusterName(currentActiveCluster),
404+
)
330405
return currentActiveCluster, true
331406
}
332407

333408
_, ok := policy.selectedAPIs[apiName]
334409
if !ok {
335410
// do not do dc redirection if API is not whitelisted
411+
policy.logger.Debug(
412+
"API is not whitelisted, routing to current cluster",
413+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
414+
tag.ClusterName(policy.currentClusterName),
415+
tag.OperationName(apiName),
416+
)
336417
return policy.currentClusterName, false
337418
}
338419

420+
policy.logger.Debug(
421+
"API is whitelisted, routing to active cluster",
422+
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
423+
tag.ClusterName(policy.currentClusterName),
424+
tag.ActiveClusterName(currentActiveCluster),
425+
)
339426
return currentActiveCluster, true
340427
}
341428

service/frontend/wrappers/clusterredirection/policy_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/uber/cadence/common/dynamicconfig"
3737
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3838
"github.com/uber/cadence/common/log/testlogger"
39+
"github.com/uber/cadence/common/metrics"
3940
"github.com/uber/cadence/common/persistence"
4041
"github.com/uber/cadence/common/types"
4142
frontendcfg "github.com/uber/cadence/service/frontend/config"
@@ -62,6 +63,7 @@ type (
6263
currentClusterName string
6364
alternativeClusterName string
6465
mockConfig *frontendcfg.Config
66+
mockMetricsClient metrics.Client
6567

6668
policy *selectedOrAllAPIsForwardingRedirectionPolicy
6769
}
@@ -178,6 +180,8 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
178180
logger,
179181
)
180182

183+
s.mockMetricsClient = metrics.NewNoopMetricsClient()
184+
181185
s.activeClusterManager = activecluster.NewMockManager(s.controller)
182186

183187
s.policy = newSelectedOrAllAPIsForwardingPolicy(
@@ -188,6 +192,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
188192
"",
189193
logger,
190194
s.activeClusterManager,
195+
s.mockMetricsClient,
191196
)
192197
}
193198

0 commit comments

Comments
 (0)