diff --git a/Makefile b/Makefile index 51bd35f16a6..3d41543de5e 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ $(BUILD)/goversion-lint: $(BUILD)/fmt: $(BUILD)/codegen # formatting must occur only after all other go-file-modifications are done # $(BUILD)/copyright # $(BUILD)/copyright: $(BUILD)/codegen # must add copyright to generated code, sometimes needs re-formatting -$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc +$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc $(BUILD)/metrics $(BUILD)/thrift: $(BUILD)/go_mod_check $(BUILD)/protoc: $(BUILD)/go_mod_check $(BUILD)/go_mod_check: @@ -211,6 +211,12 @@ $(BIN)/protoc-gen-gogofast: go.mod go.work | $(BIN) $(BIN)/protoc-gen-yarpc-go: go.mod go.work | $(BIN) $(call go_mod_build_tool,go.uber.org/yarpc/encoding/protobuf/protoc-gen-yarpc-go) +$(BIN)/metricsgen: internal/tools/go.mod go.work $(wildcard internal/tools/metricsgen/*) | $(BIN) + $(call go_build_tool,./metricsgen) + +$(BIN)/metricslint: internal/tools/go.mod go.work $(wildcard internal/tools/metricslint/* internal/tools/metricslint/cmd/*) | $(BIN) + $(call go_build_tool,./metricslint/cmd,metricslint) + $(BUILD)/go_mod_check: go.mod internal/tools/go.mod go.work $Q # generated == used is occasionally important for gomock / mock libs in general. this is not a definite problem if violated though. $Q ./scripts/check-gomod-version.sh github.com/golang/mock/gomock $(if $(verbose),-v) @@ -352,6 +358,10 @@ $(BUILD)/protoc: $(PROTO_FILES) $(STABLE_BIN)/$(PROTOC_VERSION_BIN) $(BIN)/proto fi $Q touch $@ +$(BUILD)/metrics: $(ALL_SRC) $(BIN)/metricsgen + $Q $(BIN_PATH) go generate -run=metricsgen ./... + $Q touch $@ + # ==================================== # Rule-breaking targets intended ONLY for special cases with no good alternatives. # ==================================== @@ -404,6 +414,11 @@ $(BUILD)/code-lint: $(LINT_SRC) $(BIN)/revive | $(BUILD) fi $Q touch $@ +$(BUILD)/metrics-lint: $(ALL_SRC) $(BIN)/metricslint | $(BUILD) + $Q echo "linting metrics definitions..." + $Q $(BIN_PATH) $(BIN)/metricslint -skip cadence_requests_per_tl,2 -skip cache_hit,2 -skip cache_full,2 -skip cache_miss,2 -skip cross_cluster_fetch_errors,2 ./... + $Q touch $@ + $(BUILD)/goversion-lint: go.work Dockerfile docker/github_actions/Dockerfile${DOCKERFILE_SUFFIX} $Q echo "checking go version..." $Q # intentionally using go.work toolchain, as GOTOOLCHAIN is user-overridable @@ -458,7 +473,7 @@ endef # useful to actually re-run to get output again. # reuse the intermediates for simplicity and consistency. lint: ## (Re)run the linter - $(call remake,proto-lint gomod-lint code-lint goversion-lint) + $(call remake,proto-lint gomod-lint code-lint goversion-lint metrics-lint) # intentionally not re-making, it's a bit slow and it's clear when it's unnecessary fmt: $(BUILD)/fmt ## Run `gofmt` / organize imports / etc @@ -544,14 +559,20 @@ bins: $(BINS) ## Build all binaries, and any fast codegen needed (does not refre tools: $(TOOLS) -go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap ## Run `go generate` to regen mocks, enums, etc +go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap $(BIN)/metricsgen ## Run `go generate` to regen mocks, enums, etc $Q echo "running go generate ./..., this takes a minute or more..." $Q # add our bins to PATH so `go generate` can find them $Q $(BIN_PATH) go generate $(if $(verbose),-v) ./... + $Q touch $(BUILD)/metrics # whole-service go-generate also regenerates metrics $Q $(MAKE) --no-print-directory fmt # $Q echo "updating copyright headers" # $Q $(MAKE) --no-print-directory copyright +metrics: $(BIN)/metricsgen ## metrics-only code regen, much faster than go-generate + $Q echo "re-generating metrics structs..." + $Q $(MAKE) $(BUILD)/metrics + $Q $(MAKE) fmt # clean up imports + release: ## Re-generate generated code and run tests $(MAKE) --no-print-directory go-generate $(MAKE) --no-print-directory test @@ -577,7 +598,7 @@ tidy: ## `go mod tidy` all packages clean: ## Clean build products and SQLite database rm -f $(BINS) rm -Rf $(BUILD) - rm *.db + rm -f *.db $(if \ $(wildcard $(STABLE_BIN)/*), \ $(warning usually-stable build tools still exist, delete the $(STABLE_BIN) folder to rebuild them),) diff --git a/cmd/server/cadence/fx.go b/cmd/server/cadence/fx.go index 147dba12e01..719a2d48afa 100644 --- a/cmd/server/cadence/fx.go +++ b/cmd/server/cadence/fx.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/metrics/metricsfx" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/rpc/rpcfx" "github.com/uber/cadence/common/service" @@ -101,6 +102,7 @@ type AppParams struct { DynamicConfig dynamicconfig.Client Scope tally.Scope MetricsClient metrics.Client + Emitter structured.Emitter } // NewApp created a new Application from pre initalized config and logger. @@ -112,6 +114,7 @@ func NewApp(params AppParams) *App { dynamicConfig: params.DynamicConfig, scope: params.Scope, metricsClient: params.MetricsClient, + emitter: params.Emitter, } params.LifeCycle.Append(fx.StartHook(app.verifySchema)) @@ -128,13 +131,14 @@ type App struct { dynamicConfig dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter daemon common.Daemon service string } func (a *App) Start(_ context.Context) error { - a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient) + a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient, a.emitter) a.daemon.Start() return nil } diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 873ce35a4aa..01b8f5715c5 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -54,6 +54,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/peerprovider/ringpopprovider" pnt "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/resource" @@ -79,12 +80,13 @@ type ( dynamicCfgClient dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter } ) // newServer returns a new instance of a daemon // that represents a cadence service -func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client) common.Daemon { +func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client, emitter structured.Emitter) common.Daemon { return &server{ cfg: cfg, name: service, @@ -93,6 +95,7 @@ func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgC dynamicCfgClient: dynamicCfgClient, scope: scope, metricsClient: metricsClient, + emitter: emitter, } } @@ -142,6 +145,7 @@ func (s *server) startService() common.Daemon { params.MetricScope = s.scope params.MetricsClient = s.metricsClient + params.Emitter = s.emitter rpcParams, err := rpc.NewParams(params.Name, &s.cfg, dc, params.Logger, params.MetricsClient) if err != nil { diff --git a/cmd/server/cadence/server_test.go b/cmd/server/cadence/server_test.go index 902f5fd5eef..ea6bb16be66 100644 --- a/cmd/server/cadence/server_test.go +++ b/cmd/server/cadence/server_test.go @@ -44,6 +44,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" pt "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite" "github.com/uber/cadence/common/resource" @@ -110,7 +111,15 @@ func (s *ServerSuite) TestServerStartup() { }) for _, svc := range services { - server := newServer(svc, cfg, logger, dynamicconfig.NewNopClient(), tally.NoopScope, metrics.NewNoopMetricsClient()) + server := newServer( + svc, + cfg, + logger, + dynamicconfig.NewNopClient(), + tally.NoopScope, + metrics.NewNoopMetricsClient(), + structured.NewTestEmitter(s.T(), nil), + ) daemons = append(daemons, server) server.Start() } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 00214a52649..a1d546ede05 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -53,6 +53,10 @@ type ( ServiceIdx int ) +func (s scopeDefinition) GetOperationString() string { + return s.operation +} + // MetricTypes which are supported const ( Counter MetricType = iota @@ -1068,7 +1072,7 @@ const ( // -- Operation scopes for History service -- const ( // HistoryStartWorkflowExecutionScope tracks StartWorkflowExecution API calls received by service - HistoryStartWorkflowExecutionScope = iota + NumCommonScopes + HistoryStartWorkflowExecutionScope = iota + NumFrontendScopes // HistoryRecordActivityTaskHeartbeatScope tracks RecordActivityTaskHeartbeat API calls received by service HistoryRecordActivityTaskHeartbeatScope // HistoryRespondDecisionTaskCompletedScope tracks RespondDecisionTaskCompleted API calls received by service @@ -1356,7 +1360,7 @@ const ( // -- Operation scopes for Matching service -- const ( // PollForDecisionTaskScope tracks PollForDecisionTask API calls received by service - MatchingPollForDecisionTaskScope = iota + NumCommonScopes + MatchingPollForDecisionTaskScope = iota + NumHistoryScopes // PollForActivityTaskScope tracks PollForActivityTask API calls received by service MatchingPollForActivityTaskScope // MatchingAddActivityTaskScope tracks AddActivityTask API calls received by service @@ -1392,7 +1396,7 @@ const ( // -- Operation scopes for Worker service -- const ( // ReplicationScope is the scope used by all metric emitted by replicator - ReplicatorScope = iota + NumCommonScopes + ReplicatorScope = iota + NumMatchingScopes // DomainReplicationTaskScope is the scope used by domain task replication processing DomainReplicationTaskScope // ESProcessorScope is scope used by all metric emitted by esProcessor @@ -1440,7 +1444,7 @@ const ( // -- Operation scopes for ShardDistributor service -- const ( // ShardDistributorGetShardOwnerScope tracks GetShardOwner API calls received by service - ShardDistributorGetShardOwnerScope = iota + NumCommonScopes + ShardDistributorGetShardOwnerScope = iota + NumWorkerScopes ShardDistributorHeartbeatScope ShardDistributorAssignLoopScope @@ -2700,12 +2704,13 @@ const ( VirtualQueueCountGauge VirtualQueuePausedGauge VirtualQueueRunningGauge + NumHistoryMetrics ) // Matching metrics enum const ( - PollSuccessPerTaskListCounter = iota + NumCommonMetrics + PollSuccessPerTaskListCounter = iota + NumHistoryMetrics PollTimeoutPerTaskListCounter PollSuccessWithSyncPerTaskListCounter LeaseRequestPerTaskListCounter @@ -2783,12 +2788,13 @@ const ( IsolationGroupUpscale IsolationGroupDownscale PartitionDrained + NumMatchingMetrics ) // Worker metrics enum const ( - ReplicatorMessages = iota + NumCommonMetrics + ReplicatorMessages = iota + NumMatchingMetrics ReplicatorFailures ReplicatorMessagesDropped ReplicatorLatency @@ -2872,12 +2878,13 @@ const ( DiagnosticsWorkflowStartedCount DiagnosticsWorkflowSuccess DiagnosticsWorkflowExecutionLatency + NumWorkerMetrics ) // ShardDistributor metrics enum const ( - ShardDistributorRequests = iota + NumCommonMetrics + ShardDistributorRequests = iota + NumWorkerMetrics ShardDistributorFailures ShardDistributorLatency ShardDistributorErrContextTimeoutCounter diff --git a/common/metrics/defs_test.go b/common/metrics/defs_test.go index 122aa359645..db6c15a96a3 100644 --- a/common/metrics/defs_test.go +++ b/common/metrics/defs_test.go @@ -129,6 +129,39 @@ func TestMetricDefs(t *testing.T) { } } +// "index -> operation" must be unique for structured.DynamicOperationTags' int lookup to work consistently. +// Duplicate indexes with the same operation name are technically fine, but there doesn't seem to be any benefit in allowing it, +// and it trivially ensures that all indexes have only one operation name. +func TestOperationIndexesAreUnique(t *testing.T) { + seen := make(map[int]bool) + for serviceIdx, serviceOps := range ScopeDefs { + for idx := range serviceOps { + if seen[idx] { + t.Error("duplicate operation index:", idx, "with name:", serviceOps[idx].operation, "in service:", serviceIdx) + } + seen[idx] = true + } + } +} + +func TestMetricsAreUnique(t *testing.T) { + // Duplicate indexes is arguably fine, but there doesn't seem to be any benefit in allowing it. + // + // Duplicate names are also linted, but they're done via an analyzer (metricslint) instead, to + // allow checking across multiple formats. + t.Run("indexes", func(t *testing.T) { + seen := make(map[int]bool) + for _, serviceMetrics := range MetricDefs { + for idx := range serviceMetrics { + if seen[idx] { + t.Error("duplicate metric index:", idx, "with name:", serviceMetrics[idx].metricName) + } + seen[idx] = true + } + } + }) +} + func TestExponentialDurationBuckets(t *testing.T) { factor := math.Pow(2, 0.25) assert.Equal(t, 80, len(ExponentialDurationBuckets)) diff --git a/common/metrics/metricsfx/metricsfx.go b/common/metrics/metricsfx/metricsfx.go index 56702a26107..790880aee67 100644 --- a/common/metrics/metricsfx/metricsfx.go +++ b/common/metrics/metricsfx/metricsfx.go @@ -29,12 +29,15 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/service" ) // Module provides metrics client for fx application. var Module = fx.Module("metricsfx", - fx.Provide(buildClient)) + fx.Provide(buildClient), + structured.Module, +) // ModuleForExternalScope provides metrics client for fx application when tally.Scope is created outside. var ModuleForExternalScope = fx.Module("metricsfx", diff --git a/common/metrics/structured/base.go b/common/metrics/structured/base.go new file mode 100644 index 00000000000..11cb05d535f --- /dev/null +++ b/common/metrics/structured/base.go @@ -0,0 +1,128 @@ +package structured + +import ( + "maps" + "strings" + "testing" + "time" + + "github.com/uber-go/tally" + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide(func(s tally.Scope) Emitter { + return Emitter{scope: s} + }), +) + +// Tags is an immutable map of strings, to prevent accidentally mutating shared vars. +type Tags struct{ m map[string]string } + +func TagsFromMap(m map[string]string) Tags { + return Tags{m} +} + +// With makes a copy with the added key/value pair(s), overriding any conflicting keys. +func (o Tags) With(key, value string, more ...string) Tags { + if len(more)%2 != 0 { + // pretty easy to catch at dev time, seems reasonably unlikely to ever happen in prod. + panic("Tags.With requires an even number of 'more' arguments") + } + + dup := Tags{make(map[string]string, len(o.m)+1+len(more)/2)} + maps.Copy(dup.m, o.m) + dup.m[key] = value + for i := 0; i < len(more)-1; i += 2 { + dup.m[more[i]] = more[i+1] + } + return dup +} + +// Emitter is essentially our new metrics.Client, but with a much smaller API surface +// to make it easier to reason about, lint, and fully disconnect from Tally details later. +// +// Calls to metrics methods on this type are checked by internal/tools/metricslint +// to ensure that all metrics are uniquely named, to reduce our risk of collisions +// that break queries / Prometheus / etc. +// +// Tags must be passed in each time to help make it clear what tags are used, +// and you MUST NOT vary the tags per metric name - this breaks Prometheus. +type Emitter struct { + // intentionally NOT no-op by default. use a test emitter in tests. + // + // currently, because this is constructed by common/config/metrics.go, + // this scope already contains the `cadence_service:cadence-{whatever}` tag, + // but essentially no others (aside from platform-level stuff). + // you can get the instance from go.uber.org/fx, as just `tally.Scope`. + scope tally.Scope +} + +// Histogram records a duration-based histogram with the provided data. +// +// Metric names MUST have an "_ns" suffix to avoid confusion with timers, +// and to make it clear they are duration-based histograms. +func (b Emitter) Histogram(name string, buckets SubsettableHistogram, dur time.Duration, tags Tags) { + histogramTags := make(map[string]string, len(tags.m)+3) + maps.Copy(histogramTags, tags.m) + buckets.writeTags(name, histogramTags, b) + + if !strings.HasSuffix(name, "_ns") { + // duration-based histograms are always in nanoseconds, + // and the name MUST be different from timers while we migrate, + // so this ensures we always have a unique _ns suffix. + // + // this suffix is also checked in the linter, to change the allowed + // suffix(es) just make sure you update both. + b.scope.Tagged(map[string]string{"bad_metric_name": name}).Counter("incorrect_histogram_metric_name").Inc(1) + name = name + "_ns" + } + b.scope.Tagged(histogramTags).Histogram(name, buckets.tallyBuckets).RecordDuration(dur) +} + +// IntHistogram records a count-based histogram with the provided data. +// It adds a "histogram_scale" tag, so histograms can be accurately subset in queries or via middleware. +func (b Emitter) IntHistogram(name string, buckets IntSubsettableHistogram, num int, tags Tags) { + histogramTags := make(map[string]string, len(tags.m)+3) + maps.Copy(histogramTags, tags.m) + buckets.writeTags(name, histogramTags, b) + + if !strings.HasSuffix(name, "_counts") { + // same as duration suffix. + // this suffix is also checked in the linter, to change the allowed + // suffix(es) just make sure you update both. + b.scope.Tagged(map[string]string{"bad_metric_name": name}).Counter("incorrect_int_histogram_metric_name").Inc(1) + name = name + "_counts" + } + b.scope.Tagged(histogramTags).Histogram(name, buckets.tallyBuckets).RecordDuration(time.Duration(num)) +} + +// TODO: make a MinMaxGauge helper which maintains a precise, rolling +// min/max gauge, over a window larger than the metrics granularity (e.g. ~20s) +// to work around gauges' last-data-only behavior. +// +// This will likely require some additional state though, and might benefit from +// keeping that state further up the Tags-stack to keep contention and +// series-deduplication-costs low. +// +// Maybe OTEL / Prometheus will natively support this one day. It'd be simple. + +// Count records a counter with the provided data. +func (b Emitter) Count(name string, num int, tags Tags) { + b.scope.Tagged(tags.m).Counter(name).Inc(int64(num)) +} + +// Gauge emits a gauge with the provided data. +func (b Emitter) Gauge(name string, val float64, tags Tags) { + b.scope.Tagged(tags.m).Gauge(name).Update(val) +} + +// NewTestEmitter creates an emitter for tests, optionally using the provided scope. +// If scope is nil, a no-op scope will be used. +func NewTestEmitter(t *testing.T, scope tally.Scope) Emitter { + t.Name() // require non-nil + if scope == nil { + scope = tally.NoopScope + } + return Emitter{scope} +} diff --git a/common/metrics/structured/histograms.go b/common/metrics/structured/histograms.go new file mode 100644 index 00000000000..a059869e306 --- /dev/null +++ b/common/metrics/structured/histograms.go @@ -0,0 +1,274 @@ +package structured + +import ( + "fmt" + "math" + "slices" + "strconv" + "time" + + "github.com/uber-go/tally" +) + +// Nearly all histograms should use pre-defined buckets here, to encourage consistency. +// Name them more by their intent than their exact ranges - if you need something outside the intent, +// make a new bucket. +// +// Extreme, rare cases should still create a new bucket here, not declare one in-line elsewhere. +// +// Partly this helps us document our ranges, and partly it ensures that any customized emitter +// can easily detect buckets with an `==` comparison, and can choose to use a different one +// if needed (e.g. to reduce scale if it is too costly to support). +var ( + // Default1ms10m is our "default" set of buckets, targeting 1ms through 100s, + // and is "rounded up" slightly to reach 80 buckets == 16 minutes (100s needs 68 buckets), + // plus multi-minute exceptions are common enough to support for the small additional cost. + // + // If you need sub-millisecond precision, or substantially longer durations than about 1 minute, + // consider using a different histogram. + Default1ms10m = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + return last >= 10*time.Minute && length == 80 + }) + + // High1ms24h covers things like activity latency, where "longer than 1 day" is not + // particularly worth being precise about. + // + // This uses a lot of buckets, so it must be used with relatively-low cardinality elsewhere, + // and/or consider dual emitting (Mid1ms24h or lower) so overviews can be queried efficiently. + High1ms24h = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + // 24h leads to 108 buckets, raise to 112 for cleaner subsetting + return last >= 24*time.Hour && length == 112 + }) + + // Mid1ms24h is a one-scale-lower version of High1ms24h, + // for use when we know it's too detailed to be worth emitting. + // + // This uses 57 buckets, half of High1ms24h's 113 + Mid1ms24h = High1ms24h.subsetTo(1) + + // Low1ms10s is for things that are usually very fast, like most individual database calls, + // and is MUCH lower cardinality than Default1ms10m so it's more suitable for things like per-shard metrics. + Low1ms10s = makeSubsettableHistogram(time.Millisecond, 1, func(last time.Duration, length int) bool { + // 10s needs 26 buckets, raise to 32 for cleaner subsetting + return last >= 10*time.Second && length == 32 + }) + + // Mid1To32k is a histogram for small counters, like "how many replication tasks did we receive". + // This targets 1 to 32k + Mid1To32k = IntSubsettableHistogram(makeSubsettableHistogram(1, 2, func(last time.Duration, length int) bool { + // 10k needs 56 buckets, raise to 64 for cleaner subsetting + return last >= 10000 && length == 64 + })) +) + +// SubsettableHistogram is a duration-based histogram that can be subset to a lower scale +// in a standardized, predictable way. It is intentionally compatible with Prometheus and OTEL's +// "exponential histograms": https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram +// +// These histograms MUST always have a "_ns" suffix in their name to avoid confusion with timers. +type SubsettableHistogram struct { + tallyBuckets tally.DurationBuckets + + scale int +} + +// IntSubsettableHistogram is a non-duration-based integer-distribution histogram, otherwise identical +// to SubsettableHistogram but built as a separate type so you cannot pass the wrong one. +// +// These histograms MUST always have a "_counts" suffix in their name to avoid confusion with timers, +// or modify the Emitter to allow different suffixes if something else reads better. +type IntSubsettableHistogram SubsettableHistogram + +// currently we have no apparent need for float-histograms, +// as all our value ranges go from >=1 to many thousands, where +// decimal precision is pointless and mostly just looks bad. +// +// if we ever need 0..1 precision in histograms, we can add them then. +// type FloatSubsettableHistogram struct { +// tally.ValueBuckets +// scale int +// } + +func (s SubsettableHistogram) subsetTo(newScale int) SubsettableHistogram { + if newScale >= s.scale { + panic(fmt.Sprintf("scale %v is not less than the current scale %v", newScale, s.scale)) + } + if newScale < 0 { + panic(fmt.Sprintf("negative scales (%v == greater than *2 per step) are possible, but do not have tests yet", newScale)) + } + dup := SubsettableHistogram{ + tallyBuckets: slices.Clone(s.tallyBuckets), + scale: s.scale, + } + // remove every other bucket per -1 scale + for dup.scale > newScale { + if (len(dup.tallyBuckets)-1)%2 != 0 { + panic(fmt.Sprintf("cannot subset from scale %v to %v, %v-buckets is not divisible by 2", dup.scale, dup.scale-1, len(dup.tallyBuckets)-1)) + } + half := make(tally.DurationBuckets, 0, ((len(dup.tallyBuckets)-1)/2)+1) + half = append(half, dup.tallyBuckets[0]) // keep the zero value + for i := 1; i < len(dup.tallyBuckets); i += 2 { + half = append(half, dup.tallyBuckets[i]) // add first, third, etc + } + dup.tallyBuckets = half + dup.scale-- + } + return dup +} + +func (i IntSubsettableHistogram) subsetTo(newScale int) IntSubsettableHistogram { + return IntSubsettableHistogram(SubsettableHistogram(i).subsetTo(newScale)) +} + +// makeSubsettableHistogram is a replacement for tally.MustMakeExponentialDurationBuckets, +// tailored to make "construct a range" or "ensure N buckets" simpler for OTEL-compatible exponential histograms +// (i.e. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram), +// ensures values are as precise as possible (preventing display-space-wasteful numbers like 3.99996ms), +// and that all histograms start with a 0 value (else erroneous negative values are impossible to notice). +// +// Start time must be positive, scale must be 0..3 (inclusive), and the loop MUST stop within 320 or fewer steps, +// to prevent extremely-costly histograms. +// Even 320 is far too many tbh, target 160 for very-high-value data, and generally around 80 or less (ideally a value +// that is divisible by 2 many times). +// +// The stop callback will be given the current largest value and the number of values generated so far. +// This excludes the zero value (you should ignore it anyway), so you can stop at your target value, +// or == a highly-divisible-by-2 number (do not go over). +// The buckets produced may be padded further to reach a "full" power-of-2 row, as this simplifies math elsewhere +// and costs very little compared to the rest of the histogram. +// +// For all values produced, please add a test to print the concrete values, and record the length and maximum value +// so they can be quickly checked when reading. +func makeSubsettableHistogram(start time.Duration, scale int, stop func(last time.Duration, length int) bool) SubsettableHistogram { + if start <= 0 { + panic(fmt.Sprintf("start must be greater than 0 or it will not grow exponentially, got %v", start)) + } + if scale < 0 || scale > 3 { + // anything outside this range is currently not expected and probably a mistake + panic(fmt.Sprintf("scale must be between 0 (grows by *2) and 3 (grows by *2^1/8), got scale: %v", scale)) + } + buckets := tally.DurationBuckets{ + time.Duration(0), // else "too low" and "negative" are impossible to tell apart. + // ^ note this must be excluded from calculations below, hence -1 everywhere. + } + for { + if len(buckets) > 320 { + panic(fmt.Sprintf("over 320 buckets is too many, choose a smaller range or smaller scale. "+ + "started at: %v, scale: %v, last value: %v", + start, scale, buckets[len(buckets)-1])) + } + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + + // stop when requested. + if stop(buckets[len(buckets)-1], len(buckets)-1) { + break + } + } + + // fill in as many buckets as are necessary to make a full "row", i.e. just + // before the next power of 2 from the original value. + // this ensures subsetting keeps "round" numbers as long as possible. + powerOfTwoWidth := int(math.Pow(2, float64(scale))) // num of buckets needed to double a value + for (len(buckets)-1)%powerOfTwoWidth != 0 { + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + } + return SubsettableHistogram{ + tallyBuckets: buckets, + scale: scale, + } +} + +func nextBucket(start time.Duration, num int, scale int) time.Duration { + // calculating it from `start` each time reduces floating point error, ensuring "clean" multiples + // at every power of 2 (and possibly others), instead of e.g. "1ms ... 1.9999994ms" which occurs + // if you try to build from the previous value each time. + return time.Duration( + float64(start) * + math.Pow(2, float64(num)/math.Pow(2, float64(scale)))) +} + +func (s SubsettableHistogram) histScale() int { return s.scale } +func (s SubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s SubsettableHistogram) len() int { return len(s.tallyBuckets) } +func (s SubsettableHistogram) start() time.Duration { return s.tallyBuckets[1] } +func (s SubsettableHistogram) end() time.Duration { return s.tallyBuckets[len(s.tallyBuckets)-1] } +func (s SubsettableHistogram) buckets() tally.DurationBuckets { return s.tallyBuckets } +func (s SubsettableHistogram) writeTags(metricName string, into map[string]string, tagCollisionComplaints Emitter) { + writeHistogramTags(s, into, func(key string) { + tagCollisionComplaints.Count( + "error_histogram_tag_collision", + 1, + TagsFromMap(into).With( // carry along existing tags in case it narrows down the cause + "bad_key", key, + "metric_name", metricName, + )) + }) +} +func (s SubsettableHistogram) print(to func(string, ...any)) { + to("%v\n", s.tallyBuckets[0:1]) // zero value on its own row + for rowStart := 1; rowStart < s.len(); rowStart += s.width() { + to("%v\n", s.tallyBuckets[rowStart:rowStart+s.width()]) + } +} + +func (i IntSubsettableHistogram) histScale() int { return i.scale } +func (i IntSubsettableHistogram) width() int { return int(math.Pow(2, float64(i.scale))) } +func (i IntSubsettableHistogram) len() int { return len(i.tallyBuckets) } +func (i IntSubsettableHistogram) start() time.Duration { return i.tallyBuckets[1] } +func (i IntSubsettableHistogram) end() time.Duration { + return i.tallyBuckets[len(i.tallyBuckets)-1] +} +func (i IntSubsettableHistogram) buckets() tally.DurationBuckets { return i.tallyBuckets } +func (i IntSubsettableHistogram) writeTags(metricName string, into map[string]string, tagCollisionComplaints Emitter) { + writeHistogramTags(i, into, func(key string) { + tagCollisionComplaints.Count( + "error_int_histogram_tag_collision", + 1, + TagsFromMap(into).With( // carry along existing tags in case it narrows down the cause + "bad_key", key, + "metric_name", metricName, + )) + }) +} +func (i IntSubsettableHistogram) print(to func(string, ...any)) { + // fairly unreadable as duration-strings, so convert to int by hand + to("[%d]\n", int(i.tallyBuckets[0])) // zero value on its own row + for rowStart := 1; rowStart < i.len(); rowStart += i.width() { + ints := make([]int, 0, i.width()) + for _, d := range i.tallyBuckets[rowStart : rowStart+i.width()] { + ints = append(ints, int(d)) + } + to("%v\n", ints) + } +} + +func writeHistogramTags[T any](h histogrammy[T], into map[string]string, tagCollision func(key string)) { + if _, ok := into["histogram_start"]; ok { + tagCollision("histogram_start") + } + if _, ok := into["histogram_end"]; ok { + tagCollision("histogram_end") + } + if _, ok := into["histogram_scale"]; ok { + tagCollision("histogram_scale") + } + // record the full range and scale of the histogram so it can be recreated from any individual metric. + into["histogram_start"] = strconv.Itoa(int(h.start())) + into["histogram_end"] = strconv.Itoa(int(h.end())) + // include the scale, so we know how far away from the requested scale it is, when re-subsetting. + into["histogram_scale"] = strconv.Itoa(h.histScale()) +} + +// internal utility/test methods, but could be exposed if there's a use for it +type histogrammy[T any] interface { + histScale() int // exponential scale value. 0..3 inclusive. + width() int // number of values per power of 2 == how wide to print each row. 1, 2, 4, or 8. + len() int // number of buckets + start() time.Duration // first non-zero bucket + end() time.Duration // last bucket + buckets() tally.DurationBuckets // access to all buckets + subsetTo(newScale int) T // generic so specific types can be returned + writeTags(metricName string, into map[string]string, tagCollisionComplaints Emitter) + + print(to func(string, ...any)) // test-oriented printer +} diff --git a/common/metrics/structured/histograms_test.go b/common/metrics/structured/histograms_test.go new file mode 100644 index 00000000000..ba2a18a9cfd --- /dev/null +++ b/common/metrics/structured/histograms_test.go @@ -0,0 +1,191 @@ +package structured + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHistogramValues(t *testing.T) { + t.Run("default_1ms_to_10m", func(t *testing.T) { + checkHistogram(t, Default1ms10m, 81, + ` +[0s] +[1ms 1.189207ms 1.414213ms 1.681792ms] +[2ms 2.378414ms 2.828427ms 3.363585ms] +[4ms 4.756828ms 5.656854ms 6.727171ms] +[8ms 9.513656ms 11.313708ms 13.454342ms] +[16ms 19.027313ms 22.627416ms 26.908685ms] +[32ms 38.054627ms 45.254833ms 53.81737ms] +[64ms 76.109255ms 90.509667ms 107.634741ms] +[128ms 152.21851ms 181.019335ms 215.269482ms] +[256ms 304.437021ms 362.038671ms 430.538964ms] +[512ms 608.874042ms 724.077343ms 861.077929ms] +[1.024s 1.217748085s 1.448154687s 1.722155858s] +[2.048s 2.435496171s 2.896309375s 3.444311716s] +[4.096s 4.870992343s 5.792618751s 6.888623433s] +[8.192s 9.741984686s 11.585237502s 13.777246867s] +[16.384s 19.483969372s 23.170475005s 27.554493735s] +[32.768s 38.967938744s 46.340950011s 55.10898747s] +[1m5.536s 1m17.935877488s 1m32.681900023s 1m50.21797494s] +[2m11.072s 2m35.871754977s 3m5.363800047s 3m40.43594988s] +[4m22.144s 5m11.743509955s 6m10.727600094s 7m20.87189976s] +[8m44.288s 10m23.48701991s 12m21.455200189s 14m41.743799521s] +`) + }) + t.Run("high_1ms_to_24h", func(t *testing.T) { + checkHistogram(t, High1ms24h, 113, + ` +[0s] +[1ms 1.189207ms 1.414213ms 1.681792ms] +[2ms 2.378414ms 2.828427ms 3.363585ms] +[4ms 4.756828ms 5.656854ms 6.727171ms] +[8ms 9.513656ms 11.313708ms 13.454342ms] +[16ms 19.027313ms 22.627416ms 26.908685ms] +[32ms 38.054627ms 45.254833ms 53.81737ms] +[64ms 76.109255ms 90.509667ms 107.634741ms] +[128ms 152.21851ms 181.019335ms 215.269482ms] +[256ms 304.437021ms 362.038671ms 430.538964ms] +[512ms 608.874042ms 724.077343ms 861.077929ms] +[1.024s 1.217748085s 1.448154687s 1.722155858s] +[2.048s 2.435496171s 2.896309375s 3.444311716s] +[4.096s 4.870992343s 5.792618751s 6.888623433s] +[8.192s 9.741984686s 11.585237502s 13.777246867s] +[16.384s 19.483969372s 23.170475005s 27.554493735s] +[32.768s 38.967938744s 46.340950011s 55.10898747s] +[1m5.536s 1m17.935877488s 1m32.681900023s 1m50.21797494s] +[2m11.072s 2m35.871754977s 3m5.363800047s 3m40.43594988s] +[4m22.144s 5m11.743509955s 6m10.727600094s 7m20.87189976s] +[8m44.288s 10m23.48701991s 12m21.455200189s 14m41.743799521s] +[17m28.576s 20m46.974039821s 24m42.910400378s 29m23.487599042s] +[34m57.152s 41m33.948079642s 49m25.820800757s 58m46.975198084s] +[1h9m54.304s 1h23m7.896159284s 1h38m51.641601515s 1h57m33.950396168s] +[2h19m48.608s 2h46m15.792318568s 3h17m43.283203031s 3h55m7.900792337s] +[4h39m37.216s 5h32m31.584637137s 6h35m26.566406062s 7h50m15.801584674s] +[9h19m14.432s 11h5m3.169274274s 13h10m53.132812125s 15h40m31.603169349s] +[18h38m28.864s 22h10m6.338548549s 26h21m46.265624251s 31h21m3.206338698s] +[37h16m57.728s 44h20m12.677097099s 52h43m32.531248503s 62h42m6.412677396s] +`) + }) + t.Run("mid_1ms_24h", func(t *testing.T) { + checkHistogram(t, Mid1ms24h, 57, ` +[0s] +[1ms 1.414213ms] +[2ms 2.828427ms] +[4ms 5.656854ms] +[8ms 11.313708ms] +[16ms 22.627416ms] +[32ms 45.254833ms] +[64ms 90.509667ms] +[128ms 181.019335ms] +[256ms 362.038671ms] +[512ms 724.077343ms] +[1.024s 1.448154687s] +[2.048s 2.896309375s] +[4.096s 5.792618751s] +[8.192s 11.585237502s] +[16.384s 23.170475005s] +[32.768s 46.340950011s] +[1m5.536s 1m32.681900023s] +[2m11.072s 3m5.363800047s] +[4m22.144s 6m10.727600094s] +[8m44.288s 12m21.455200189s] +[17m28.576s 24m42.910400378s] +[34m57.152s 49m25.820800757s] +[1h9m54.304s 1h38m51.641601515s] +[2h19m48.608s 3h17m43.283203031s] +[4h39m37.216s 6h35m26.566406062s] +[9h19m14.432s 13h10m53.132812125s] +[18h38m28.864s 26h21m46.265624251s] +[37h16m57.728s 52h43m32.531248503s] +`) + }) + t.Run("mid_to_32k_ints", func(t *testing.T) { + // note: this histogram has some duplicates. + // + // this wastes a bit of memory, but Tally will choose the same index each + // time for a specific value, so it does not cause extra non-zero series + // to be stored or emitted. + // + // if this turns out to be expensive in Prometheus / etc, it's easy + // enough to start the histogram at 8, and dual-emit a non-exponential + // histogram for lower values. + checkHistogram(t, Mid1To32k, 65, ` +[0] +[1 1 1 1] +[2 2 2 3] +[4 4 5 6] +[8 9 11 13] +[16 19 22 26] +[32 38 45 53] +[64 76 90 107] +[128 152 181 215] +[256 304 362 430] +[512 608 724 861] +[1024 1217 1448 1722] +[2048 2435 2896 3444] +[4096 4870 5792 6888] +[8192 9741 11585 13777] +[16384 19483 23170 27554] +[32768 38967 46340 55108] +`) + }) + t.Run("low_cardinality_1ms_10s", func(t *testing.T) { + checkHistogram(t, Low1ms10s, 33, ` +[0s] +[1ms 1.414213ms] +[2ms 2.828427ms] +[4ms 5.656854ms] +[8ms 11.313708ms] +[16ms 22.627416ms] +[32ms 45.254833ms] +[64ms 90.509667ms] +[128ms 181.019335ms] +[256ms 362.038671ms] +[512ms 724.077343ms] +[1.024s 1.448154687s] +[2.048s 2.896309375s] +[4.096s 5.792618751s] +[8.192s 11.585237502s] +[16.384s 23.170475005s] +[32.768s 46.340950011s] +`) + }) +} + +// most histograms should pass this check, but fuzzy comparison is fine if needed for extreme cases. +func checkHistogram[T any](t *testing.T, h histogrammy[T], length int, expected string) { + var buf strings.Builder + h.print(func(s string, a ...any) { + str := fmt.Sprintf(s, a...) + t.Logf(str) + buf.WriteString(str) + }) + if strings.TrimSpace(expected) != strings.TrimSpace(buf.String()) { + t.Error("histogram definition changed, update the test if this is intended") + } + + assert.Equal(t, length, h.len(), "wrong number of buckets") + + buckets := h.buckets() + assert.EqualValues(t, 0, buckets[0], "first bucket should always be zero") + for i := 1; i < len(buckets); i += h.width() { + if i > 1 { + // ensure good float math. + // + // this is *intentionally* doing exact comparisons, as floating point math with + // human-friendly integers have precise power-of-2 multiples for a very long time. + // + // note that the equivalent tally buckets, e.g.: + // tally.MustMakeExponentialDurationBuckets(time.Millisecond, math.Pow(2, 1.0/4.0)) + // fails this test, and the logs produced show ugly e.g. 31.999942ms values. + // tally also produces incorrect results if you start at e.g. 1, + // as it just produces endless `1` values. + assert.Equalf(t, buckets[i-h.width()]*2, buckets[i], + "current row's value (%v) is not a power of 2 greater than previous (%v), skewed / bad math?", + buckets[i-h.width()], buckets[i]) + } + } +} diff --git a/common/metrics/structured/operation.go b/common/metrics/structured/operation.go new file mode 100644 index 00000000000..7a9b63761b3 --- /dev/null +++ b/common/metrics/structured/operation.go @@ -0,0 +1,12 @@ +package structured + +import "github.com/uber/cadence/common/metrics" + +func GetOperationString(op int) string { + for _, serviceOps := range metrics.ScopeDefs { + if serviceOp, ok := serviceOps[op]; ok { + return serviceOp.GetOperationString() // unit test ensures this is unique + } + } + return "unknown_operation_int" // should never happen +} diff --git a/common/resource/params.go b/common/resource/params.go index 7e48a56d220..aaed4ff26c5 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -42,6 +42,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" persistenceClient "github.com/uber/cadence/common/persistence/client" "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/rpc" @@ -68,6 +69,7 @@ type ( ClusterMetadata cluster.Metadata ReplicatorConfig config.Replicator MetricsClient metrics.Client + Emitter structured.Emitter MessagingClient messaging.Client BlobstoreClient blobstore.Client ESClient es.GenericClient diff --git a/common/resource/resource_impl.go b/common/resource/resource_impl.go index 48a9be7bd66..bd50919328d 100644 --- a/common/resource/resource_impl.go +++ b/common/resource/resource_impl.go @@ -57,6 +57,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -98,6 +99,7 @@ type Impl struct { timeSource clock.TimeSource payloadSerializer persistence.PayloadSerializer metricsClient metrics.Client + emitter structured.Emitter messagingClient messaging.Client blobstoreClient blobstore.Client archivalMetadata archiver.ArchivalMetadata @@ -360,6 +362,7 @@ func New( timeSource: clock.NewRealTimeSource(), payloadSerializer: persistence.NewPayloadSerializer(), metricsClient: params.MetricsClient, + emitter: params.Emitter, messagingClient: params.MessagingClient, blobstoreClient: params.BlobstoreClient, archivalMetadata: params.ArchivalMetadata, @@ -534,6 +537,11 @@ func (h *Impl) GetMetricsClient() metrics.Client { return h.metricsClient } +// GetMetricsEmitter returns a base structured metrics emitter +func (h *Impl) GetMetricsEmitter() structured.Emitter { + return h.emitter +} + // GetMessagingClient return messaging client func (h *Impl) GetMessagingClient() messaging.Client { return h.messagingClient diff --git a/common/resource/resource_mock.go b/common/resource/resource_mock.go index 58127259301..1fbf82491cd 100644 --- a/common/resource/resource_mock.go +++ b/common/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" rpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -498,6 +499,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/common/resource/resource_test_utils.go b/common/resource/resource_test_utils.go index 18249940c32..b40e89640fe 100644 --- a/common/resource/resource_test_utils.go +++ b/common/resource/resource_test_utils.go @@ -52,6 +52,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" @@ -74,6 +75,7 @@ type ( TimeSource clock.TimeSource PayloadSerializer persistence.PayloadSerializer MetricsClient metrics.Client + Emitter structured.Emitter ArchivalMetadata *archiver.MockArchivalMetadata ArchiverProvider *provider.MockArchiverProvider BlobstoreClient *blobstore.MockClient @@ -188,6 +190,7 @@ func NewTest( TimeSource: clock.NewRealTimeSource(), PayloadSerializer: persistence.NewPayloadSerializer(), MetricsClient: metrics.NewClient(scope, serviceMetricsIndex), + Emitter: structured.NewTestEmitter(t, scope), ArchivalMetadata: &archiver.MockArchivalMetadata{}, ArchiverProvider: &provider.MockArchiverProvider{}, BlobstoreClient: &blobstore.MockClient{}, @@ -295,6 +298,11 @@ func (s *Test) GetMetricsClient() metrics.Client { return s.MetricsClient } +// GetMetricsEmitter for testing +func (s *Test) GetMetricsEmitter() structured.Emitter { + return s.Emitter +} + // GetMessagingClient for testing func (s *Test) GetMessagingClient() messaging.Client { panic("user should implement this method for test") diff --git a/common/resource/types.go b/common/resource/types.go index fdc3701ece7..11920e32339 100644 --- a/common/resource/types.go +++ b/common/resource/types.go @@ -47,6 +47,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -79,6 +80,7 @@ type Resource interface { GetTimeSource() clock.TimeSource GetPayloadSerializer() persistence.PayloadSerializer GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetArchiverProvider() provider.ArchiverProvider GetMessagingClient() messaging.Client GetBlobstoreClient() blobstore.Client diff --git a/internal/tools/metricslint/analyzer.go b/internal/tools/metricslint/analyzer.go new file mode 100644 index 00000000000..9704caf1ce4 --- /dev/null +++ b/internal/tools/metricslint/analyzer.go @@ -0,0 +1,283 @@ +package metricslint + +import ( + "fmt" + "go/ast" + "go/token" + "go/types" + "maps" + "slices" + "strings" + + "golang.org/x/tools/go/analysis" + "golang.org/x/tools/go/analysis/passes/inspect" + "golang.org/x/tools/go/ast/inspector" +) + +const ( + emitterPkg = "github.com/uber/cadence/common/metrics/structured" + emitterTypeName = emitterPkg + ".Emitter" +) + +// Analyzer reports all (inline string) metric names passed to Emitter methods or defined in metrics/defs.go. +// +// This is NOT intended to be used directly, it's just using the analysis framework to simplify type checking. +// The output needs to be checked for duplicates or error messages by a separate process (see cmd/main.go). +var Analyzer = &analysis.Analyzer{ + Name: "metricnames", + Doc: "finds calls to metrics-emitting methods on Emitter, and metricDefinition names, and reports them for further analysis", + Requires: []*analysis.Analyzer{inspect.Analyzer}, + Run: run, +} + +// method names on the emitter that we care about, and the suffixes that are required (if any). +// method names are checked against the actual methods on the emitter type, +// but we need the list ahead of time or passes might be racing with initializing it. +var emitterMethods = map[string]map[string]bool{ + "Histogram": {"_ns": true}, // all our durations are in nanoseconds + "IntHistogram": {"_counts": true}, // differentiates from durations, will likely have multiple suffixes + "Count": nil, + "Gauge": nil, +} + +func run(pass *analysis.Pass) (interface{}, error) { + i := pass.ResultOf[inspect.Analyzer].(*inspector.Inspector) + + if pass.Pkg.Path() == "github.com/uber/cadence/common/metrics" { + // report all hard-coded metricDefinition names, so we can make sure + // there are no duplicates between the integers and the const strings + // passed to the emitter. + // + // if you want a duplicate: no, you really don't. + // use a different name, and once verified just swap to the old name and + // delete the integer def. otherwise you risk double-counting events if + // both are emitted, and can't tell if your tags are wrong. + // + // if you're swapping calls in one step, also delete the old definition. + // + // if you really do have an exception, modify this linter to allow it. + reportMetricDefinitionNames(pass, i) + // and DO NOT return, in case there are emitter calls in this package too, + // though currently that would imply an import loop. + } else if pass.Pkg.Path() == emitterPkg { + // read the methods on the emitter and the target methods, and make sure they're in sync. + checkTargetNamesMap(pass) + } + + // print calls to metrics-emitting methods on Emitter, regardless of location + reportMetricEmitterCalls(pass, i) + + return nil, nil +} + +func reportMetricDefinitionNames(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.CompositeLit)(nil), + } + i.Preorder(nodeFilter, func(n ast.Node) { + decl := n.(*ast.CompositeLit) + var metricNameField *ast.KeyValueExpr + for _, el := range decl.Elts { + kv, ok := el.(*ast.KeyValueExpr) + if !ok { + return // not a struct initializer + } + ident, ok := kv.Key.(*ast.Ident) + if !ok { + return // dynamic key in a dict or something, ignore + } + if ident.Name == "metricName" { + metricNameField = kv // currently this is exclusively used by metricDefinition + } + break + } + if metricNameField == nil { + return + } + str, ok := getConstantString(metricNameField.Value) + if !ok { + pass.Reportf(metricNameField.Pos(), "metricDefinition{metricName: ...} value must be an inline string literal") + return + } + pass.Reportf(metricNameField.Pos(), "success: %v", str) + }) +} + +func checkTargetNamesMap(pass *analysis.Pass) { + named, ok := pass.Pkg.Scope().Lookup("Emitter").Type().(*types.Named) + if !ok { + pass.Reportf(token.NoPos, "could not find Emitter type in %v", emitterPkg) + return + } + ms := types.NewMethodSet(named) + foundTargets := make(map[string]map[string]bool, len(emitterMethods)) + maps.Copy(foundTargets, emitterMethods) + for i := 0; i < ms.Len(); i++ { + m := ms.At(i) + _, ok := emitterMethods[m.Obj().Name()] + if !ok { + foundTargets[m.Obj().Name()] = nil // add any missing keys to the map + } else { + delete(foundTargets, m.Obj().Name()) // remove any found ones from the map + } + } + if len(foundTargets) != 0 { + pass.Reportf(named.Obj().Pos(), "target methods do not match Emitter methods, diff: %v", foundTargets) + } +} + +func reportMetricEmitterCalls(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.File)(nil), // to skip test files + (*ast.FuncDecl)(nil), // to skip test helper funcs + (*ast.CallExpr)(nil), + } + i.Nodes(nodeFilter, func(n ast.Node, push bool) (proceed bool) { + // always descend by default, in case the call contains a closure that emits metrics. + // this covers the sync.Once case in the testdata, for example. + proceed = true + if !push { + return // do nothing when ascending the ast tree + } + + // check for test files, ignore their content if found + file, ok := n.(*ast.File) + if ok { + filename := pass.Fset.Position(file.Pos()).Filename + if strings.HasSuffix(filename, "_test.go") { + return false // don't inspect test files + } + return + } + + // check for test helper funcs anywhere, ignore their content if found. + // these are identified by a *testing.T as their first arg. + if fn, ok := n.(*ast.FuncDecl); ok { + if len(fn.Type.Params.List) > 0 { + firstArgType := pass.TypesInfo.TypeOf(fn.Type.Params.List[0].Type) + asStr := types.TypeString(firstArgType, nil) // "full/path/to.Type" + if asStr == "*testing.T" { + return false // don't inspect test helpers + } + } + return + } + + call := n.(*ast.CallExpr) // only other possibility due to nodeFilter + + // check if this is a method call (receiver.Method() == X.Sel) + // for one of the names we care about (as an early / efficient check) + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return + } + methodName := sel.Sel.Name + requiredSuffixes, ok := emitterMethods[methodName] + if !ok { + return + } + + // check if the receiver type is one we care about + receiver := pass.TypesInfo.TypeOf(sel.X) + // pointer receivers are not allowed, complain if found. + // if we just ignore them here, we might miss a metric call with a duplicate name. + isPointerReceiver := false + ptr, ok := receiver.(*types.Pointer) + for ; ok; ptr, ok = receiver.(*types.Pointer) { + isPointerReceiver = true + receiver = ptr.Elem() + } + named, ok := receiver.(*types.Named) + if !ok { + // allowed by the type system, but should be impossible in a CallExpr + pass.Reportf(sel.Pos(), "anonymous receiver of a method call should be impossible?") + return + } + fullName := getPkgAndName(named) + if fullName != emitterTypeName { + return // apparently not the emitter, it just has similarly-named methods (e.g. Tally). ignore it. + } + + // at this point we know that it's "a metrics method", make sure it's valid. + + if isPointerReceiver { + // could be allowed with a bit more .Elem() dereferencing in this analyzer, + // but currently blocked to intentionally force value types. + pass.Reportf(sel.Pos(), "pointer receivers are not allowed on metrics emission calls: %v", types.ExprString(sel)) + return + } + if len(call.Args) < 2 { + // 0 or 1 arg == not currently in use + pass.Reportf(call.Pos(), "method call %v looks like a metrics method, but has too few args", call) + return + } + + // pull out the first arg, and make sure it's a constant inline string. + nameArg := call.Args[0] + metricName, isConstant := getConstantString(nameArg) + if !isConstant { + pass.Reportf(nameArg.Pos(), "metric names must be in-line strings, not consts or vars: %v", nameArg) + return + } + if len(requiredSuffixes) > 0 { + matched := false + for suffix := range requiredSuffixes { + if strings.HasSuffix(metricName, suffix) { + matched = true + break + } + } + if !matched { + suffixes := make([]string, 0, len(requiredSuffixes)) + for s := range requiredSuffixes { + suffixes = append(suffixes, fmt.Sprintf("%q", s)) + } + slices.Sort(suffixes) + pass.Reportf( + nameArg.Pos(), + "metric name %q is not valid for method %v, it must have one of the following suffixes: %v", + metricName, methodName, strings.Join(suffixes, ", "), + ) + return + } + } + + // valid call! + // + // "report" the data we've found so it can be checked with a separate process (see cmd/main.go). + // that will error if any non-"success" lines were found, else check that the list is unique. + // + // tests may lead to the same package being processed more than once, but + // the reported lines for "path/to/file:line success: {name}" are still unique + // and that won't count as a duplicate. + pass.Reportf(call.Pos(), "success: %s", metricName) + return + }) +} + +func getPkgAndName(named *types.Named) (fullName string) { + obj := named.Obj() + if obj == nil { + // afaik not possible, this would be "a named type without a type" + return "" + } + pkg := obj.Pkg() + if pkg == nil { + // given where this is used, I believe this would currently imply + // "a builtin type with a method", which does not exist. + // but it's fine to return partial values, in case it's used in more places. + return obj.Name() + } + return pkg.Path() + "." + obj.Name() +} + +func getConstantString(expr ast.Expr) (str string, ok bool) { + lit, ok := expr.(*ast.BasicLit) + if !ok { + return "", false + } + if lit.Kind != token.STRING { + return "", false + } + return strings.Trim(lit.Value, `"`), true +} diff --git a/internal/tools/metricslint/analyzer_test.go b/internal/tools/metricslint/analyzer_test.go new file mode 100644 index 00000000000..55e262e7186 --- /dev/null +++ b/internal/tools/metricslint/analyzer_test.go @@ -0,0 +1,12 @@ +package metricslint + +import ( + "testing" + + "golang.org/x/tools/go/analysis/analysistest" +) + +func TestAnalyzer(t *testing.T) { + testdata := analysistest.TestData() + analysistest.Run(t, testdata, Analyzer, "github.com/uber/cadence/common/metrics/structured") +} diff --git a/internal/tools/metricslint/cmd/main.go b/internal/tools/metricslint/cmd/main.go new file mode 100644 index 00000000000..6fff6b1306e --- /dev/null +++ b/internal/tools/metricslint/cmd/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/exec" + "slices" + "strconv" + "strings" + + "golang.org/x/tools/go/analysis/singlechecker" + + "github.com/uber/cadence/internal/tools/metricslint" +) + +func main() { + var analyze bool + flag.BoolVar(&analyze, "analyze", false, "if true, run the analyzer normally (e.g. for debugging). if false, run the analyzer internally and interpret the results as a linter") + skip := map[string]int{} + flag.Func("skip", "metric name to ignore, comma, followed by the number of times it is duplicated. repeat to skip more things.", func(s string) error { + parts := strings.SplitN(s, ",", 2) + if len(parts) != 2 { + return fmt.Errorf("skip argument %q is not in the form `name,count`", s) + } + count, err := strconv.Atoi(parts[1]) + if err != nil || count < 2 { + return fmt.Errorf("skip argument %q has invalid count %q: %v", s, parts[1], err) + } + skip[parts[0]] = count + return nil + }) + flag.Parse() + + if analyze { + singlechecker.Main(metricslint.Analyzer) + } else { + os.Exit(checkAnalyzerOutput(skip)) + } +} + +func checkAnalyzerOutput(skip map[string]int) (exitCode int) { + cmd := exec.Command(os.Args[0], append([]string{"-analyze"}, os.Args[1:]...)...) + out, _ := cmd.CombinedOutput() + + defer func() { + // in case of crashes, print the output we got so far as it likely narrows down the cause. + if r := recover(); r != nil { + _, _ = fmt.Fprint(os.Stderr, "\n\n"+string(out)) + panic(r) + } + }() + + lines := strings.Split(string(out), "\n") + // map of metric name to set of lines using it (to deduplicate) + names := make(map[string]map[string]struct{}, len(lines)) + var failures []string + for _, line := range lines { + if line == "" { + continue // empty lines are fine + } + words := strings.Fields(line) + if len(words) == 3 && words[1] == "success:" { + prev, ok := names[words[2]] + if !ok { + prev = make(map[string]struct{}) + } + prev[line] = struct{}{} + names[words[2]] = prev + } else { + failures = append(failures, line) + } + } + + if len(failures) > 0 { + _, _ = fmt.Fprintln(os.Stderr, strings.Join(failures, "\n")+"\n") + return 1 + } + + exitCode = 0 + for name, usages := range names { + var complain bool + expected, ok := skip[name] + if ok { + complain = len(usages) != expected // expectation mismatch + } else { + complain = len(usages) > 1 // must not have duplicates + } + if complain { + if expected > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in an unexpected number of places:\n", name) + } else { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in multiple places:\n", name) + } + // frustratingly: this can't use `maps.Keys` because it + // returns an iterator, not a slice, so it can't be sorted or `...`-spread. + // bleh. + sorted := make([]string, 0, len(usages)) + for usage := range usages { + sorted = append(sorted, usage) + } + slices.Sort(sorted) + for usage := range usages { + _, _ = fmt.Fprintf(os.Stderr, "\t%s\n", usage) + } + exitCode = 1 + } + } + + return exitCode +} diff --git a/internal/tools/metricslint/testdata/go.mod b/internal/tools/metricslint/testdata/go.mod new file mode 100644 index 00000000000..42d6cbc8c14 --- /dev/null +++ b/internal/tools/metricslint/testdata/go.mod @@ -0,0 +1,4 @@ +module github.com/uber/cadence/common/metrics/structured +// ^ must be the same package as the emitter's definition for the hardcoded path to work right + +go 1.24 \ No newline at end of file diff --git a/internal/tools/metricslint/testdata/metrics.go b/internal/tools/metricslint/testdata/metrics.go new file mode 100644 index 00000000000..b6078034f00 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics.go @@ -0,0 +1,67 @@ +package structured + +import ( + "sync" + "testing" + "time" +) + +const disallowedConst = "dynamic-name" + +type Emitter struct{} +type Tags map[string]string + +func (e Emitter) Count(name string, value int, meta Tags) {} +func (e Emitter) Histogram(name string, buckets []time.Duration, dur time.Duration, meta Tags) {} +func (e Emitter) Gauge(name string, value float64, meta Tags) {} +func (e Emitter) IntHistogram(name string, buckets []time.Duration, num int, meta Tags) {} + +type UnrelatedThing struct{ metricName string } + +var _ = UnrelatedThing{metricName: disallowedConst} + +func (n UnrelatedThing) Gauge(name string) {} +func (n UnrelatedThing) Count(name string, value int, meta Tags) {} + +// test helper funcs can violate the rules, ignore them. +// they must have *testing.T as the first arg +func TestHelper(t *testing.T, name string) { + var emitter Emitter + emitter.Count(name, 5, nil) +} + +func someMetricsCalls() { + var emitter Emitter + tags := Tags{"key": "value"} + + // valid calls on the emitter + emitter.Count("test-count", 5, tags) // want `success: test-count` + emitter.Histogram("test-histogram_ns", nil, time.Second, tags) // want `success: test-histogram_ns` + + // duplicates are not blocked at this stage, it's run separately because of how the analysis framework works + emitter.Count("duplicate-metric", 1, tags) // want `success: duplicate-metric` + emitter.Count("duplicate-metric", 2, tags) // want `success: duplicate-metric` + + // wrong suffix + emitter.Histogram("test-histogram", nil, time.Second, tags) // want `metric name "test-histogram" is not valid for method Histogram, it must have one of the following suffixes: "_ns"` + + // string vars are not good + dynamicName := "dynamic-name" + emitter.Count(dynamicName, 1, tags) // want `metric names must be in-line strings, not consts or vars: dynamicName` + + // named consts are also not good + // (this is fairly easy to allow though, if strongly desired) + emitter.Count(disallowedConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + + var unrelated UnrelatedThing + name := "asdf" + unrelated.Gauge(name) // ignored, not on Emitter + unrelated.Count(name, 42, tags) // ignored, not on Emitter + + // contents of closures must be checked too (ensures we recurse into calls) + var once sync.Once + once.Do(func() { + emitter.Gauge(disallowedConst, 9001, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + emitter.IntHistogram("hist_counts", nil, 42, tags) // want `success: hist_counts` + }) +} diff --git a/internal/tools/metricslint/testdata/metrics_gen.go b/internal/tools/metricslint/testdata/metrics_gen.go new file mode 100644 index 00000000000..02669e19d57 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_gen.go @@ -0,0 +1,20 @@ +package structured + +// Code generated by test_shenanigans; DO NOT EDIT + +import "time" + +func GeneratedFunction() { + var emitter Emitter + tags := Tags{} + + // allowed, generated code needs to follow the same rules + emitter.Count("base_count", 1, tags) // want `success: base_count` + emitter.Gauge("gauge", 12, tags) // want `success: gauge` + emitter.Histogram("hist_ns", nil, time.Second, tags) // want `success: hist_ns` + + // disallowed, generated code needs to follow the same rules + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` + emitter.Count(sneakyConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` +} diff --git a/internal/tools/metricslint/testdata/metrics_test.go b/internal/tools/metricslint/testdata/metrics_test.go new file mode 100644 index 00000000000..0067efd8e67 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_test.go @@ -0,0 +1,20 @@ +package structured + +import ( + "time" +) + +func ignoredInTestFiles() { + var emitter Emitter + tags := Tags{} + + // test code is allowed to break the rules because it does not run in production + + emitter.Count("base_count", 1, tags) + emitter.Gauge("gauge", 12, tags) + emitter.Histogram("hist_ns", nil, time.Second, tags) + + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) + emitter.Count(sneakyConst, 1, tags) +} diff --git a/revive.toml b/revive.toml index 2238d4dd4c9..d1b939df7af 100644 --- a/revive.toml +++ b/revive.toml @@ -20,7 +20,7 @@ warningCode = 0 [rule.errorf] # [rule.exported] # disabled due to lack of value / encouraging bad habits [rule.if-return] -[rule.increment-decrement] +# [rule.increment-decrement] # noisy on generated code, otherwise seems fine - if there's a way to fix that, probably re-enable [rule.indent-error-flow] # Disabled because we have 158 packages that need package comments; we could instead add ignore # directives for existing packages and require it for new packages. @@ -62,7 +62,7 @@ arguments=[["loop","method-call","recover","return", "immediate-recover"]] [rule.range-val-address] # beneficial [rule.range-val-in-closure] # beneficial [rule.unconditional-recursion] # probably a good idea -[rule.unreachable-code] # code simplifier +# [rule.unreachable-code] # reasonable code simplifier, but does not match compiler behavior (e.g. log.Fatal still needs panic or return) [rule.waitgroup-by-value] # correct use of sync code, important #### unused utilities diff --git a/service/history/engine/engineimpl/history_engine.go b/service/history/engine/engineimpl/history_engine.go index 576b2998707..573ac483846 100644 --- a/service/history/engine/engineimpl/history_engine.go +++ b/service/history/engine/engineimpl/history_engine.go @@ -279,6 +279,7 @@ func NewEngineWithShardContext( historyEngImpl, config, shard.GetMetricsClient(), + shard.GetMetricsEmitter(), replicationTaskFetcher, replicationTaskExecutor, shard.GetTimeSource(), diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 2e0c9416950..e9cd3191fdc 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation" @@ -81,6 +82,8 @@ type ( historySerializer persistence.PayloadSerializer config *config.Config metricsClient metrics.Client + metrics structured.Emitter + metricTags structured.Tags logger log.Logger taskExecutor TaskExecutor hostRateLimiter quotas.Limiter @@ -113,6 +116,7 @@ func NewTaskProcessor( historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, + emitter structured.Emitter, taskFetcher TaskFetcher, taskExecutor TaskExecutor, clock clock.TimeSource, @@ -134,14 +138,18 @@ func NewTaskProcessor( noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, clock) return &taskProcessorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: sourceCluster, - status: common.DaemonStatusInitialized, - shard: shard, - historyEngine: historyEngine, - historySerializer: persistence.NewPayloadSerializer(), - config: config, - metricsClient: metricsClient, + currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + sourceCluster: sourceCluster, + status: common.DaemonStatusInitialized, + shard: shard, + historyEngine: historyEngine, + historySerializer: persistence.NewPayloadSerializer(), + config: config, + metricsClient: metricsClient, + metrics: emitter, + metricTags: structured.TagsFromMap(map[string]string{ + "target_cluster": sourceCluster, // looks backwards, but matches historical behavior + }), logger: shard.GetLogger().WithTags(tag.SourceCluster(sourceCluster), tag.ShardID(shardID)), taskExecutor: taskExecutor, hostRateLimiter: taskFetcher.GetRateLimiter(), @@ -467,28 +475,44 @@ func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTa if err != nil { p.updateFailureMetric(scope, err, p.shard.GetShardID()) } else { - now := ts.Now() mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster)) domainID := replicationTask.HistoryTaskV2Attributes.GetDomainID() + var domainName string if domainID != "" { - domainName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) + name, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) if errorDomainName != nil { return errorDomainName } + domainName = name mScope = mScope.Tagged(metrics.DomainTag(domainName)) } + tags := p.metricTags.With( + "operation", structured.GetOperationString(scope), + "domain", domainName, + ) + + now := ts.Now() + processingLatency := now.Sub(startTime) + replicationLatency := now.Sub(time.Unix(0, replicationTask.GetCreationTime())) + + p.metrics.Histogram("task_processing_latency_ns", structured.Low1ms10s, processingLatency, tags) + // latency from task generated to task received + p.metrics.Histogram("task_replication_latency_ns", structured.Mid1ms24h, replicationLatency, tags) + // number of replication tasks + // this is an exact match for the legacy scope, so it would cause duplicates if emitted. + // because this is the first use of this new system, we'll verify the tags with the histograms first. + // p.metrics.Count("replication_tasks_applied_per_domain", 1, tags) + // emit single task processing latency - mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime)) + mScope.RecordTimer(metrics.TaskProcessingLatency, processingLatency) // emit latency from task generated to task received - mScope.RecordTimer( - metrics.ReplicationTaskLatency, - now.Sub(time.Unix(0, replicationTask.GetCreationTime())), - ) - // emit the number of replication tasks + mScope.RecordTimer(metrics.ReplicationTaskLatency, replicationLatency) + // emit the number of replication tasks. + // when removing, be sure to un-comment the p.Count above mScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) + shardScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster), metrics.InstanceTag(strconv.Itoa(p.shard.GetShardID()))) shardScope.IncCounter(metrics.ReplicationTasksApplied) - } return err diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index d7b54163ff6..0c320a14aa3 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -49,6 +49,7 @@ import ( "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" @@ -154,6 +155,7 @@ func (s *taskProcessorSuite) SetupTest() { s.mockEngine, s.config, metricsClient, + structured.NewTestEmitter(s.T(), nil), s.taskFetcher, s.taskExecutor, s.clock, @@ -646,6 +648,7 @@ func TestProcessorLoop_TaskExecuteFailed_ShardChangeErr(t *testing.T) { mockEngine, config, metricsClient, + structured.NewTestEmitter(t, nil), taskFetcher, taskExecutor, clock.NewMockedTimeSource(), diff --git a/service/history/resource/resource_mock.go b/service/history/resource/resource_mock.go index 184fa60e971..a97aefb5ade 100644 --- a/service/history/resource/resource_mock.go +++ b/service/history/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" algorithm "github.com/uber/cadence/common/quotas/global/algorithm" @@ -489,6 +490,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 9d0d2355d59..cecd64a40a0 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" @@ -65,6 +66,7 @@ type ( GetLogger() log.Logger GetThrottledLogger() log.Logger GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetTimeSource() clock.TimeSource PreviousShardOwnerWasDifferent() bool diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index dc774b2a714..027da55949b 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -22,6 +22,7 @@ import ( cluster "github.com/uber/cadence/common/cluster" log "github.com/uber/cadence/common/log" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" types "github.com/uber/cadence/common/types" config "github.com/uber/cadence/service/history/config" @@ -353,6 +354,20 @@ func (mr *MockContextMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockContext)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockContext) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockContextMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockContext)(nil).GetMetricsEmitter)) +} + // GetQueueAckLevel mocks base method. func (m *MockContext) GetQueueAckLevel(category persistence.HistoryTaskCategory) persistence.HistoryTaskKey { m.ctrl.T.Helper()