diff --git a/Makefile b/Makefile index 51bd35f16a6..6df7e49829d 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ $(BUILD)/goversion-lint: $(BUILD)/fmt: $(BUILD)/codegen # formatting must occur only after all other go-file-modifications are done # $(BUILD)/copyright # $(BUILD)/copyright: $(BUILD)/codegen # must add copyright to generated code, sometimes needs re-formatting -$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc +$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc $(BUILD)/metrics $(BUILD)/thrift: $(BUILD)/go_mod_check $(BUILD)/protoc: $(BUILD)/go_mod_check $(BUILD)/go_mod_check: @@ -211,6 +211,9 @@ $(BIN)/protoc-gen-gogofast: go.mod go.work | $(BIN) $(BIN)/protoc-gen-yarpc-go: go.mod go.work | $(BIN) $(call go_mod_build_tool,go.uber.org/yarpc/encoding/protobuf/protoc-gen-yarpc-go) +$(BIN)/metricslint: internal/tools/go.mod go.work $(wildcard internal/tools/metricslint/* internal/tools/metricslint/cmd/*) | $(BIN) + $(call go_build_tool,./metricslint/cmd,metricslint) + $(BUILD)/go_mod_check: go.mod internal/tools/go.mod go.work $Q # generated == used is occasionally important for gomock / mock libs in general. this is not a definite problem if violated though. $Q ./scripts/check-gomod-version.sh github.com/golang/mock/gomock $(if $(verbose),-v) @@ -404,6 +407,11 @@ $(BUILD)/code-lint: $(LINT_SRC) $(BIN)/revive | $(BUILD) fi $Q touch $@ +$(BUILD)/metrics-lint: $(ALL_SRC) $(BIN)/metricslint | $(BUILD) + $Q echo "linting metrics definitions..." + $Q $(BIN_PATH) $(BIN)/metricslint -skip cadence_requests_per_tl,2 -skip cache_hit,2 -skip cache_full,2 -skip cache_miss,2 -skip cross_cluster_fetch_errors,2 ./... + $Q touch $@ + $(BUILD)/goversion-lint: go.work Dockerfile docker/github_actions/Dockerfile${DOCKERFILE_SUFFIX} $Q echo "checking go version..." $Q # intentionally using go.work toolchain, as GOTOOLCHAIN is user-overridable @@ -458,7 +466,7 @@ endef # useful to actually re-run to get output again. # reuse the intermediates for simplicity and consistency. lint: ## (Re)run the linter - $(call remake,proto-lint gomod-lint code-lint goversion-lint) + $(call remake,proto-lint gomod-lint code-lint goversion-lint metrics-lint) # intentionally not re-making, it's a bit slow and it's clear when it's unnecessary fmt: $(BUILD)/fmt ## Run `gofmt` / organize imports / etc @@ -548,6 +556,7 @@ go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap ## Run ` $Q echo "running go generate ./..., this takes a minute or more..." $Q # add our bins to PATH so `go generate` can find them $Q $(BIN_PATH) go generate $(if $(verbose),-v) ./... + $Q touch $(BUILD)/metrics # whole-service go-generate also regenerates metrics $Q $(MAKE) --no-print-directory fmt # $Q echo "updating copyright headers" # $Q $(MAKE) --no-print-directory copyright @@ -577,7 +586,7 @@ tidy: ## `go mod tidy` all packages clean: ## Clean build products and SQLite database rm -f $(BINS) rm -Rf $(BUILD) - rm *.db + rm -f *.db $(if \ $(wildcard $(STABLE_BIN)/*), \ $(warning usually-stable build tools still exist, delete the $(STABLE_BIN) folder to rebuild them),) diff --git a/cmd/server/cadence/fx.go b/cmd/server/cadence/fx.go index 147dba12e01..719a2d48afa 100644 --- a/cmd/server/cadence/fx.go +++ b/cmd/server/cadence/fx.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/metrics/metricsfx" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/rpc/rpcfx" "github.com/uber/cadence/common/service" @@ -101,6 +102,7 @@ type AppParams struct { DynamicConfig dynamicconfig.Client Scope tally.Scope MetricsClient metrics.Client + Emitter structured.Emitter } // NewApp created a new Application from pre initalized config and logger. @@ -112,6 +114,7 @@ func NewApp(params AppParams) *App { dynamicConfig: params.DynamicConfig, scope: params.Scope, metricsClient: params.MetricsClient, + emitter: params.Emitter, } params.LifeCycle.Append(fx.StartHook(app.verifySchema)) @@ -128,13 +131,14 @@ type App struct { dynamicConfig dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter daemon common.Daemon service string } func (a *App) Start(_ context.Context) error { - a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient) + a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient, a.emitter) a.daemon.Start() return nil } diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 873ce35a4aa..01b8f5715c5 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -54,6 +54,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/peerprovider/ringpopprovider" pnt "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/resource" @@ -79,12 +80,13 @@ type ( dynamicCfgClient dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter } ) // newServer returns a new instance of a daemon // that represents a cadence service -func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client) common.Daemon { +func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client, emitter structured.Emitter) common.Daemon { return &server{ cfg: cfg, name: service, @@ -93,6 +95,7 @@ func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgC dynamicCfgClient: dynamicCfgClient, scope: scope, metricsClient: metricsClient, + emitter: emitter, } } @@ -142,6 +145,7 @@ func (s *server) startService() common.Daemon { params.MetricScope = s.scope params.MetricsClient = s.metricsClient + params.Emitter = s.emitter rpcParams, err := rpc.NewParams(params.Name, &s.cfg, dc, params.Logger, params.MetricsClient) if err != nil { diff --git a/cmd/server/cadence/server_test.go b/cmd/server/cadence/server_test.go index 902f5fd5eef..ea6bb16be66 100644 --- a/cmd/server/cadence/server_test.go +++ b/cmd/server/cadence/server_test.go @@ -44,6 +44,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" pt "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite" "github.com/uber/cadence/common/resource" @@ -110,7 +111,15 @@ func (s *ServerSuite) TestServerStartup() { }) for _, svc := range services { - server := newServer(svc, cfg, logger, dynamicconfig.NewNopClient(), tally.NoopScope, metrics.NewNoopMetricsClient()) + server := newServer( + svc, + cfg, + logger, + dynamicconfig.NewNopClient(), + tally.NoopScope, + metrics.NewNoopMetricsClient(), + structured.NewTestEmitter(s.T(), nil), + ) daemons = append(daemons, server) server.Start() } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 00214a52649..a1d546ede05 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -53,6 +53,10 @@ type ( ServiceIdx int ) +func (s scopeDefinition) GetOperationString() string { + return s.operation +} + // MetricTypes which are supported const ( Counter MetricType = iota @@ -1068,7 +1072,7 @@ const ( // -- Operation scopes for History service -- const ( // HistoryStartWorkflowExecutionScope tracks StartWorkflowExecution API calls received by service - HistoryStartWorkflowExecutionScope = iota + NumCommonScopes + HistoryStartWorkflowExecutionScope = iota + NumFrontendScopes // HistoryRecordActivityTaskHeartbeatScope tracks RecordActivityTaskHeartbeat API calls received by service HistoryRecordActivityTaskHeartbeatScope // HistoryRespondDecisionTaskCompletedScope tracks RespondDecisionTaskCompleted API calls received by service @@ -1356,7 +1360,7 @@ const ( // -- Operation scopes for Matching service -- const ( // PollForDecisionTaskScope tracks PollForDecisionTask API calls received by service - MatchingPollForDecisionTaskScope = iota + NumCommonScopes + MatchingPollForDecisionTaskScope = iota + NumHistoryScopes // PollForActivityTaskScope tracks PollForActivityTask API calls received by service MatchingPollForActivityTaskScope // MatchingAddActivityTaskScope tracks AddActivityTask API calls received by service @@ -1392,7 +1396,7 @@ const ( // -- Operation scopes for Worker service -- const ( // ReplicationScope is the scope used by all metric emitted by replicator - ReplicatorScope = iota + NumCommonScopes + ReplicatorScope = iota + NumMatchingScopes // DomainReplicationTaskScope is the scope used by domain task replication processing DomainReplicationTaskScope // ESProcessorScope is scope used by all metric emitted by esProcessor @@ -1440,7 +1444,7 @@ const ( // -- Operation scopes for ShardDistributor service -- const ( // ShardDistributorGetShardOwnerScope tracks GetShardOwner API calls received by service - ShardDistributorGetShardOwnerScope = iota + NumCommonScopes + ShardDistributorGetShardOwnerScope = iota + NumWorkerScopes ShardDistributorHeartbeatScope ShardDistributorAssignLoopScope @@ -2700,12 +2704,13 @@ const ( VirtualQueueCountGauge VirtualQueuePausedGauge VirtualQueueRunningGauge + NumHistoryMetrics ) // Matching metrics enum const ( - PollSuccessPerTaskListCounter = iota + NumCommonMetrics + PollSuccessPerTaskListCounter = iota + NumHistoryMetrics PollTimeoutPerTaskListCounter PollSuccessWithSyncPerTaskListCounter LeaseRequestPerTaskListCounter @@ -2783,12 +2788,13 @@ const ( IsolationGroupUpscale IsolationGroupDownscale PartitionDrained + NumMatchingMetrics ) // Worker metrics enum const ( - ReplicatorMessages = iota + NumCommonMetrics + ReplicatorMessages = iota + NumMatchingMetrics ReplicatorFailures ReplicatorMessagesDropped ReplicatorLatency @@ -2872,12 +2878,13 @@ const ( DiagnosticsWorkflowStartedCount DiagnosticsWorkflowSuccess DiagnosticsWorkflowExecutionLatency + NumWorkerMetrics ) // ShardDistributor metrics enum const ( - ShardDistributorRequests = iota + NumCommonMetrics + ShardDistributorRequests = iota + NumWorkerMetrics ShardDistributorFailures ShardDistributorLatency ShardDistributorErrContextTimeoutCounter diff --git a/common/metrics/defs_test.go b/common/metrics/defs_test.go index 122aa359645..db6c15a96a3 100644 --- a/common/metrics/defs_test.go +++ b/common/metrics/defs_test.go @@ -129,6 +129,39 @@ func TestMetricDefs(t *testing.T) { } } +// "index -> operation" must be unique for structured.DynamicOperationTags' int lookup to work consistently. +// Duplicate indexes with the same operation name are technically fine, but there doesn't seem to be any benefit in allowing it, +// and it trivially ensures that all indexes have only one operation name. +func TestOperationIndexesAreUnique(t *testing.T) { + seen := make(map[int]bool) + for serviceIdx, serviceOps := range ScopeDefs { + for idx := range serviceOps { + if seen[idx] { + t.Error("duplicate operation index:", idx, "with name:", serviceOps[idx].operation, "in service:", serviceIdx) + } + seen[idx] = true + } + } +} + +func TestMetricsAreUnique(t *testing.T) { + // Duplicate indexes is arguably fine, but there doesn't seem to be any benefit in allowing it. + // + // Duplicate names are also linted, but they're done via an analyzer (metricslint) instead, to + // allow checking across multiple formats. + t.Run("indexes", func(t *testing.T) { + seen := make(map[int]bool) + for _, serviceMetrics := range MetricDefs { + for idx := range serviceMetrics { + if seen[idx] { + t.Error("duplicate metric index:", idx, "with name:", serviceMetrics[idx].metricName) + } + seen[idx] = true + } + } + }) +} + func TestExponentialDurationBuckets(t *testing.T) { factor := math.Pow(2, 0.25) assert.Equal(t, 80, len(ExponentialDurationBuckets)) diff --git a/common/metrics/metricsfx/metricsfx.go b/common/metrics/metricsfx/metricsfx.go index 56702a26107..790880aee67 100644 --- a/common/metrics/metricsfx/metricsfx.go +++ b/common/metrics/metricsfx/metricsfx.go @@ -29,12 +29,15 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/service" ) // Module provides metrics client for fx application. var Module = fx.Module("metricsfx", - fx.Provide(buildClient)) + fx.Provide(buildClient), + structured.Module, +) // ModuleForExternalScope provides metrics client for fx application when tally.Scope is created outside. var ModuleForExternalScope = fx.Module("metricsfx", diff --git a/common/metrics/structured/base.go b/common/metrics/structured/base.go new file mode 100644 index 00000000000..6d447d9981f --- /dev/null +++ b/common/metrics/structured/base.go @@ -0,0 +1,291 @@ +package structured + +import ( + "fmt" + "maps" + "reflect" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/uber-go/tally" + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide(func(s tally.Scope) Emitter { + return Emitter{ + scope: s, + types: &sync.Map{}, + } + }), +) + +// Emitter is the base helper for emitting metrics, and it contains only low-level +// metrics-emitting funcs to keep it as simple as possible. +// +// It is intended to be used with the `make metrics` code generator and structs-of-tags, +// but it's intentionally possible to (ab)use it by hand because ad-hoc metrics +// should be easy and encouraged. +// +// Metadata can be constructed from any map via DynamicTags, but this API intentionally hides +// [tally.Scope.Tagged] because it's (somewhat) memory-wasteful, self-referential interfaces are +// difficult to mock, and it's very hard to figure out what tags may be present at runtime. +// +// TODO: this can / likely should be turned into an interface to allow disconnecting from tally, +// to allow providing a specific version or to drop it entirely if desired. +type Emitter struct { + // intentionally NOT no-op by default. + // + // use a test emitter in tests, it should be quite easy to construct, + // and this way it will panic if forgotten for some reason, rather than + // causing a misleading lack-of-metrics. + // + // currently, because this is constructed by common/config/metrics.go, + // this scope already contains the `cadence_service:cadence-{whatever}` tag, + // but essentially no others (aside from platform-level stuff). + // you can get the instance from go.uber.org/fx, as just `tally.Scope`. + scope tally.Scope + + // cached type information for metadata containers + types *sync.Map +} + +// Histogram records a duration-based histogram with the provided data. +// It adds a "histogram_scale" tag, so histograms can be accurately subset in queries or via middleware. +// +// `meta` must be either a DynamicTags or a struct with appropriate fields, checked by a linter. +func (b Emitter) Histogram(name string, buckets SubsettableHistogram, dur time.Duration, meta any) { + tags := b.getTags(meta) + // all subsettable histograms need to emit scale values so scale changes + // can be correctly merged at query time. + tags = tags.With("histogram_scale", strconv.Itoa(buckets.scale)) + + if !strings.HasSuffix(name, "_ns") { + // duration-based histograms are always in nanoseconds, + // and the name MUST be different from timers while we migrate, + // so this ensures we always have a unique _ns suffix. + // + // hopefully this is never used, but it'll at least make it clear if it is. + name = name + "_error_missing_suffix_ns" + } + b.scope.Tagged(tags.m).Histogram(name, buckets).RecordDuration(dur) +} + +// IntHistogram records a count-based histogram with the provided data. +// It adds a "histogram_scale" tag, so histograms can be accurately subset in queries or via middleware. +// +// `meta` must be either a DynamicTags or a struct with appropriate fields, checked by a linter. +func (b Emitter) IntHistogram(name string, buckets IntSubsettableHistogram, num int, meta any) { + tags := b.getTags(meta) + + // all subsettable histograms need to emit scale values so scale changes + // can be correctly merged at query time. + tags = tags.With("histogram_scale", strconv.Itoa(buckets.scale)) + + if !strings.HasSuffix(name, "_counts") { + // int-based histograms are always in "_counts" (currently anyway), + // and the name MUST be different from timers while we migrate. + // so this ensures we always have a unique _counts suffix. + // + // hopefully this is never used, but it'll at least make it clear if it is. + name = name + "_error_missing_suffix_counts" + } + b.scope.Tagged(tags.m).Histogram(name, buckets).RecordDuration(time.Duration(num)) +} + +// TODO: make a MinMaxHistogram helper which maintains a precise, rolling +// min/max gauge, over a window larger than the metrics granularity (e.g. ~20s) +// to work around gauges' last-data-only behavior. +// +// This will likely require some additional state though, and might benefit from +// keeping that state further up the Tags-stack to keep contention and +// series-deduplication-costs low. +// +// Maybe OTEL / Prometheus will natively support this one day. It'd be simple. + +// Count records a counter with the provided data. +// +// `meta` must be either a DynamicTags or a struct with appropriate fields, checked by a linter. +func (b Emitter) Count(name string, num int, meta any) { + b.scope.Tagged(b.getTags(meta).m).Counter(name).Inc(int64(num)) +} + +// Gauge emits a gauge with the provided data. +// +// `meta` must be either a DynamicTags or a struct with appropriate fields, checked by a linter. +func (b Emitter) Gauge(name string, val float64, meta any) { + b.scope.Tagged(b.getTags(meta).m).Gauge(name).Update(val) +} + +// Tags is a simple read-only map wrapper, to prevent accidental mutation. +// +// The zero value is ready to use, but generally you should be asking the Emitter to create one for you. +type Tags struct{ m map[string]string } + +func (t Tags) With(key, value string, pairs ...string) Tags { + dup := make(map[string]string, len(t.m)+1+(len(pairs)/2)) + maps.Copy(dup, t.m) + dup[key] = value + for i := 0; i < len(pairs); i += 2 { + dup[pairs[i]] = pairs[i+1] + } + return Tags{dup} +} + +func (b Emitter) TagsFrom(meta any) Tags { + return b.getTags(meta) +} + +type fieldAccess struct { + index []int // path from outer tags-type to the usable field + name string // tag name to use, linter enforces uniqueness per nested structure + getter reflect.Value // optional getter func, must be func(self)string +} + +func (b Emitter) getTags(tags any) Tags { + if t, ok := tags.(map[string]string); ok { + return Tags{maps.Clone(t)} + } + result := map[string]string{} + + // find the type info + v := reflect.ValueOf(tags) + vt := v.Type() + fields := b.fields(vt) + + // read the field access paths to get the values + for _, field := range fields { + ff := v.FieldByIndex(field.index) + + // look for a custom getter + if field.getter.IsValid() { + got := field.getter.Call([]reflect.Value{v}) + result[field.name] = got[0].Interface().(string) + continue + } + + // must be a primitive type, enforced by the linter and panics if wrong. + + // check for nils, dereference if ptr + if ff.Kind() == reflect.Ptr { + if ff.IsNil() { + result[field.name] = "" + continue + } + ff = ff.Elem() + } + + // must be a trivial type, check it and convert if necessary + ffi := ff.Interface() + switch ffiv := ffi.(type) { + case int, int32, int64, bool: + result[field.name] = fmt.Sprintf("%v", ffiv) + case string: + result[field.name] = ffiv + default: + // prevented by the linter + panic(fmt.Sprintf("field %q (index %v) on type %T has an unexpected type: %T", field.name, field.index, tags, ffi)) + } + } + + return Tags{result} +} + +// recursively gather types. higher levels override lower. +func (b Emitter) fields(t reflect.Type) []fieldAccess { + if val, ok := b.types.Load(t); ok { + return val.([]fieldAccess) + } + + var result []fieldAccess + tagNames := make(map[string]bool, t.NumField()) + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) // `t.Field(i)` == `f.Index` always has just one value == `i` + + // embedded field (yes the name is bad) + if f.Anonymous { + contains := b.fields(f.Type) + for _, field := range contains { + if tagNames[field.name] { + // duplicates are checked to catch linter bugs, this should + // be prevented by the linter normally. + panic(fmt.Sprintf("duplicate tag name on type %v.%v field %v", t.PkgPath(), t.Name(), f.Name)) + } + tagNames[field.name] = true + result = append(result, fieldAccess{ + index: append(f.Index, field.index...), + name: field.name, + getter: field.getter, + }) + } + continue + } + + // check for `struct{}` embedding + if f.Type.Name() == "" { + // this is true of all anonymous types, but it seems fine to call this good enough, and let the linter prevent other types. + // reserved field, ignore + continue + } + + // get the tag name + tagName := f.Tag.Get("tag") + if tagName == "" { + // prevented by the linter + panic(fmt.Sprintf("missing tag on type %v.%v field %v", t.PkgPath(), t.Name(), f.Name)) + } + tagNames[tagName] = true + + // get the custom getter func on the parent struct, if requested + var getter reflect.Value + getterName := f.Tag.Get("getter") + if getterName != "" { + // must be a method on the parent type + getterm, ok := t.MethodByName(getterName) + if !ok || !getterm.IsExported() { + // prevented by the linter + panic(fmt.Sprintf("missing exported getter func %q on type %v.%v field %v", getterName, t.PkgPath(), t.Name(), f.Name)) + } + // must be the correct type signature + if getterm.Type.Kind() != reflect.Func || + getterm.Type.NumIn() != 1 || getterm.Type.In(0) != t || // instance methods have an implicit self-arg of the receiver type + getterm.Type.NumOut() != 1 || getterm.Type.Out(0).Kind() != reflect.String { + // prevented by the linter + panic(fmt.Sprintf("getter %q on type %v.%v for field %v is invalid, must be a `func() string` method on the type", getterName, t.PkgPath(), t.Name(), f.Name)) + } + getter = getterm.Func + } + + // must be a public field or have a public getter + if !getter.IsValid() && !f.IsExported() { + panic(fmt.Sprintf("field %v on type %v.%v must be public or have a getter func on the parent struct", f.Name, t.PkgPath(), t.Name())) + } + + result = append(result, fieldAccess{ + index: f.Index, + name: tagName, + getter: getter, + }) + } + prev, loaded := b.types.LoadOrStore(t, result) + if loaded { + return prev.([]fieldAccess) + } + return result +} + +// NewTestEmitter creates an emitter for tests, optionally using the provided scope. +// If scope is nil, a no-op scope will be used. +func NewTestEmitter(t *testing.T, scope tally.Scope) Emitter { + t.Name() // require non-nil + if scope == nil { + scope = tally.NoopScope + } + return Emitter{ + scope: scope, + types: &sync.Map{}, + } +} diff --git a/common/metrics/structured/doc.go b/common/metrics/structured/doc.go new file mode 100644 index 00000000000..d454f735166 --- /dev/null +++ b/common/metrics/structured/doc.go @@ -0,0 +1 @@ +package structured diff --git a/common/metrics/structured/histograms.go b/common/metrics/structured/histograms.go new file mode 100644 index 00000000000..6502be4cac1 --- /dev/null +++ b/common/metrics/structured/histograms.go @@ -0,0 +1,187 @@ +package structured + +import ( + "fmt" + "math" + "slices" + "time" + + "github.com/uber-go/tally" +) + +// Nearly all histograms should use pre-defined buckets here, to encourage consistency. +// Name them more by their intent than their exact ranges - if you need something outside the intent, +// make a new bucket. +// +// Extreme, rare cases should still create a new bucket here, not declare one in-line elsewhere. +// +// Partly this helps us document our ranges, and partly it ensures that any customized emitter +// can easily detect buckets with an `==` comparison, and can choose to use a different one +// if needed (e.g. to reduce scale if it is too costly to support). +var ( + // Default1ms10m is our "default" set of buckets, targeting 1ms through 100s, + // and is "rounded up" slightly to reach 80 buckets == 16 minutes (100s needs 68 buckets), + // plus multi-minute exceptions are common enough to support for the small additional cost. + // + // If you need sub-millisecond precision, or substantially longer durations than about 1 minute, + // consider using a different histogram. + Default1ms10m = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + return last >= 10*time.Minute && length == 80 + }) + + // High1ms24h covers things like activity latency, where "longer than 1 day" is not + // particularly worth being precise about. + // + // This uses a lot of buckets, so it must be used with relatively-low cardinality elsewhere, + // and/or consider dual emitting (Mid1ms24h or lower) so overviews can be queried efficiently. + High1ms24h = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + // 24h leads to 108 buckets, raise to 112 for cleaner subsetting + return last >= 24*time.Hour && length == 112 + }) + + // Mid1ms24h is a one-scale-lower version of High1ms24h, + // for use when we know it's too detailed to be worth emitting. + // + // This uses 57 buckets, half of High1ms24h's 113 + Mid1ms24h = High1ms24h.subsetTo(1) + + // Low1ms10s is for things that are usually very fast, like most individual database calls, + // and is MUCH lower cardinality than Default1ms10m so it's more suitable for things like per-shard metrics. + Low1ms10s = makeSubsettableHistogram(time.Millisecond, 1, func(last time.Duration, length int) bool { + // 10s needs 26 buckets, raise to 32 for cleaner subsetting + return last >= 10*time.Second && length == 32 + }) + + // Mid1To32k is a histogram for small counters, like "how many replication tasks did we receive". + // This targets 1 to 32k + Mid1To32k = IntSubsettableHistogram(makeSubsettableHistogram(1, 2, func(last time.Duration, length int) bool { + // 10k needs 56 buckets, raise to 64 for cleaner subsetting + return last >= 10000 && length == 64 + })) +) + +// SubsettableHistogram is a duration-based histogram that can be subset to a lower scale +// in a standardized, predictable way. It is intentionally compatible with Prometheus and OTEL's +// "exponential histograms": https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram +// +// These histograms MUST always have a "_ns" suffix in their name to avoid confusion with timers. +type SubsettableHistogram struct { + tally.DurationBuckets + + scale int +} + +// IntSubsettableHistogram is a non-duration-based integer-distribution histogram, otherwise identical +// to SubsettableHistogram. +// +// These histograms MUST always have a "_counts" suffix in their name to avoid confusion with timers, +// or modify the Emitter to allow different suffixes if something else reads better. +type IntSubsettableHistogram SubsettableHistogram + +// currently we have no apparent need for float-histograms, +// as all our value ranges go from >=1 to many thousands, where +// decimal precision is pointless and mostly just looks bad. +// +// if we ever need 0..1 precision in histograms, we can add them then. +// type FloatSubsettableHistogram struct { +// tally.ValueBuckets +// scale int +// } + +func (s SubsettableHistogram) subsetTo(newScale int) SubsettableHistogram { + if newScale >= s.scale { + panic(fmt.Sprintf("scale %v is not less than the current scale %v", newScale, s.scale)) + } + if newScale < 0 { + panic(fmt.Sprintf("negative scales (%v == greater than *2 per step) are possible, but do not have tests yet", newScale)) + } + dup := SubsettableHistogram{ + DurationBuckets: slices.Clone(s.DurationBuckets), + scale: s.scale, + } + // remove every other bucket per -1 scale + for dup.scale > newScale { + if (len(dup.DurationBuckets)-1)%2 != 0 { + panic(fmt.Sprintf("cannot subset from scale %v to %v, %v-buckets is not divisible by 2", dup.scale, dup.scale-1, len(dup.DurationBuckets)-1)) + } + half := make(tally.DurationBuckets, 0, ((len(dup.DurationBuckets)-1)/2)+1) + half = append(half, dup.DurationBuckets[0]) // keep the zero value + for i := 1; i < len(dup.DurationBuckets); i += 2 { + half = append(half, dup.DurationBuckets[i]) // add first, third, etc + } + dup.DurationBuckets = half + dup.scale-- + } + return dup +} + +func (i IntSubsettableHistogram) subsetTo(newScale int) IntSubsettableHistogram { + return IntSubsettableHistogram(SubsettableHistogram(i).subsetTo(newScale)) +} + +// makeSubsettableHistogram is a replacement for tally.MustMakeExponentialDurationBuckets, +// tailored to make "construct a range" or "ensure N buckets" simpler for OTEL-compatible exponential histograms +// (i.e. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram), +// ensures values are as precise as possible (preventing display-space-wasteful numbers like 3.99996ms), +// and that all histograms start with a 0 value (else erroneous negative values are impossible to notice). +// +// Start time must be positive, scale must be 0..3 (inclusive), and the loop MUST stop within 320 or fewer steps, +// to prevent extremely-costly histograms. +// Even 320 is far too many tbh, target 160 for very-high-value data, and generally around 80 or less (ideally a value +// that is divisible by 2 many times). +// +// The stop callback will be given the current largest value and the number of values generated so far. +// This excludes the zero value (you should ignore it anyway), so you can stop at your target value, +// or == a highly-divisible-by-2 number (do not go over). +// The buckets produced may be padded further to reach a "full" power-of-2 row, as this simplifies math elsewhere +// and costs very little compared to the rest of the histogram. +// +// For all values produced, please add a test to print the concrete values, and record the length and maximum value +// so they can be quickly checked when reading. +func makeSubsettableHistogram(start time.Duration, scale int, stop func(last time.Duration, length int) bool) SubsettableHistogram { + if start <= 0 { + panic(fmt.Sprintf("start must be greater than 0 or it will not grow exponentially, got %v", start)) + } + if scale < 0 || scale > 3 { + // anything outside this range is currently not expected and probably a mistake + panic(fmt.Sprintf("scale must be between 0 (grows by *2) and 3 (grows by *2^1/8), got scale: %v", scale)) + } + buckets := tally.DurationBuckets{ + time.Duration(0), // else "too low" and "negative" are impossible to tell apart. + // ^ note this must be excluded from calculations below, hence -1 everywhere. + } + for { + if len(buckets) > 320 { + panic(fmt.Sprintf("over 320 buckets is too many, choose a smaller range or smaller scale. "+ + "started at: %v, scale: %v, last value: %v", + start, scale, buckets[len(buckets)-1])) + } + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + + // stop when requested. + if stop(buckets[len(buckets)-1], len(buckets)-1) { + break + } + } + + // fill in as many buckets as are necessary to make a full "row", i.e. just + // before the next power of 2 from the original value. + // this ensures subsetting keeps "round" numbers as long as possible. + powerOfTwoWidth := int(math.Pow(2, float64(scale))) // num of buckets needed to double a value + for (len(buckets)-1)%powerOfTwoWidth != 0 { + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + } + return SubsettableHistogram{ + DurationBuckets: buckets, + scale: scale, + } +} + +func nextBucket(start time.Duration, num int, scale int) time.Duration { + // calculating it from `start` each time reduces floating point error, ensuring "clean" multiples + // at every power of 2 (and possibly others), instead of e.g. "1ms ... 1.9999994ms" which occurs + // if you try to build from the previous value each time. + return time.Duration( + float64(start) * + math.Pow(2, float64(num)/math.Pow(2, float64(scale)))) +} diff --git a/common/metrics/structured/histograms_test.go b/common/metrics/structured/histograms_test.go new file mode 100644 index 00000000000..7d47aa73e16 --- /dev/null +++ b/common/metrics/structured/histograms_test.go @@ -0,0 +1,132 @@ +package structured + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" +) + +func TestHistogramValues(t *testing.T) { + t.Run("default_1ms_to_10m", func(t *testing.T) { + checkHistogram(t, Default1ms10m) + assert.Equal(t, 81, Default1ms10m.len(), "wrong number of buckets") + assertBetween(t, 10*time.Minute, Default1ms10m.max(), 15*time.Minute) // roughly 14m 42s + }) + t.Run("high_1ms_to_24h", func(t *testing.T) { + checkHistogram(t, High1ms24h) + assert.Equal(t, 113, High1ms24h.len(), "wrong number of buckets") + assertBetween(t, 24*time.Hour, High1ms24h.max(), 64*time.Hour) // roughly 63h + }) + t.Run("mid_1ms_24h", func(t *testing.T) { + checkHistogram(t, Mid1ms24h) + assert.Equal(t, 57, Mid1ms24h.len(), "wrong number of buckets") + assertBetween(t, 12*time.Hour, Mid1ms24h.max(), 64*time.Hour) // roughly 53h + }) + t.Run("mid_to_32k_ints", func(t *testing.T) { + // note: this histogram has some duplicates: + // [0] + // [1 1 1 1] + // [2 2 2 3] + // [4 4 5 6] + // [8 9 11 13] + // ... + // + // this wastes a bit of memory, but Tally will choose the same index each + // time for a specific value, so it does not cause extra non-zero series + // to be stored or emitted. + // + // if this turns out to be expensive in Prometheus / etc, it's easy + // enough to start the histogram at 8, and dual-emit a non-exponential + // histogram for lower values. + checkHistogram(t, Mid1To32k) + assert.Equal(t, 65, Mid1To32k.len(), "wrong number of buckets") + assertBetween(t, 32_000, int(Mid1To32k.max()), 64_000) // 55108 + }) + t.Run("low_cardinality_1ms_10s", func(t *testing.T) { + checkHistogram(t, Low1ms10s) + assert.Equal(t, 33, Low1ms10s.len(), "wrong number of buckets") + assertBetween(t, 10*time.Second, Low1ms10s.max(), time.Minute) // roughly 46s + }) +} + +// test helpers, but could be moved elsewhere if they prove useful +func (s SubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s SubsettableHistogram) len() int { return len(s.DurationBuckets) } +func (s SubsettableHistogram) max() time.Duration { return s.DurationBuckets[len(s.DurationBuckets)-1] } +func (s SubsettableHistogram) buckets() tally.DurationBuckets { return s.DurationBuckets } + +func (s IntSubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s IntSubsettableHistogram) len() int { return len(s.DurationBuckets) } +func (s IntSubsettableHistogram) max() time.Duration { + return s.DurationBuckets[len(s.DurationBuckets)-1] +} +func (s IntSubsettableHistogram) buckets() tally.DurationBuckets { return s.DurationBuckets } + +type histogrammy interface { + SubsettableHistogram | IntSubsettableHistogram + + width() int + len() int + max() time.Duration + buckets() tally.DurationBuckets +} + +type numeric interface { + ~int | ~int64 +} + +func assertBetween[T numeric](t *testing.T, min, actual, max T, msgAndArgs ...interface{}) { + if actual < min || actual > max { + assert.Fail(t, fmt.Sprintf("value %v not between %v and %v", actual, min, max), msgAndArgs...) + } +} + +// most histograms should pass this check, but fuzzy comparison is fine if needed for extreme cases. +func checkHistogram[T histogrammy](t *testing.T, h T) { + printHistogram(t, h) + buckets := h.buckets() + assert.EqualValues(t, 0, buckets[0], "first bucket should always be zero") + for i := 1; i < len(buckets); i += h.width() { + if i > 1 { + // ensure good float math. + // + // this is *intentionally* doing exact comparisons, as floating point math with + // human-friendly integers have precise power-of-2 multiples for a very long time. + // + // note that the equivalent tally buckets, e.g.: + // tally.MustMakeExponentialDurationBuckets(time.Millisecond, math.Pow(2, 1.0/4.0)) + // fails this test, and the logs produced show ugly e.g. 31.999942ms values. + // it also produces incorrect results if you start at e.g. 1, as it never exceeds 1. + assert.Equalf(t, buckets[i-h.width()]*2, buckets[i], + "current row's value (%v) is not a power of 2 greater than previous (%v), skewed / bad math?", + buckets[i-h.width()], buckets[i]) + } + } +} + +func printHistogram[T histogrammy](t *testing.T, histogram T) { + switch h := (any)(histogram).(type) { + case SubsettableHistogram: + t.Log(h.buckets()[:1]) + for i := 1; i < len(h.buckets()); i += h.width() { + t.Log(h.buckets()[i : i+h.width()]) // display row by row for easier reading. + // ^ this will panic if the histograms are not an even multiple of `width`, + // that's also a sign that it's constructed incorrectly. + } + case IntSubsettableHistogram: + hi := make([]int, len(h.buckets())) // convert to int + for i, v := range h.buckets() { + hi[i] = int(v) + } + t.Log(hi[:1]) + for i := 1; i < len(hi); i += h.width() { + t.Log(hi[i : i+h.width()]) + } + default: + panic("unreachable") + } +} diff --git a/common/metrics/structured/operation.go b/common/metrics/structured/operation.go new file mode 100644 index 00000000000..f3ae2df8fd4 --- /dev/null +++ b/common/metrics/structured/operation.go @@ -0,0 +1,40 @@ +package structured + +import "github.com/uber/cadence/common/metrics" + +// OperationTags is an embeddable struct that provides a single "operation" tag, +// as we need it to be included almost everywhere. +type OperationTags struct { + // Operation is our historical "the thing doing something" name, unchanged for consistency + Operation string `tag:"operation"` +} + +// DynamicOperationTags is like OperationTags, but it's intended for places +// where we're dynamically choosing the operation per metric. +// +// This is intentionally an "incomplete" metric struct, and using it requires +// some manual work to populate the tags when emitting metrics. +type DynamicOperationTags struct { + // DynamicOperation has the same intent as OperationTags.Operation, + // but is provided at runtime via WithOperation or WithOperationInt. + DynamicOperation struct{} `tag:"operation"` +} + +// WithOperation adds the "operation" tag +func (d DynamicOperationTags) WithOperation(operation string, addTo Tags) Tags { + return addTo.With("operation", operation) +} + +// WithOperationInt adds the "operation" tag via the given scopeDefinition's operation string. +func (d DynamicOperationTags) WithOperationInt(operation int, addTo Tags) Tags { + return d.WithOperation(getOperationString(operation), addTo) +} + +func getOperationString(op int) string { + for _, serviceOps := range metrics.ScopeDefs { + if serviceOp, ok := serviceOps[op]; ok { + return serviceOp.GetOperationString() // unit test ensures this is unique + } + } + return "unknown_operation_int" // should never happen +} diff --git a/common/metrics/structured/operation_test.go b/common/metrics/structured/operation_test.go new file mode 100644 index 00000000000..2a4dd3f5c0c --- /dev/null +++ b/common/metrics/structured/operation_test.go @@ -0,0 +1,104 @@ +package structured + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetTags(t *testing.T) { + // intentionally shared to increase concurrency + e := NewTestEmitter(t, nil) + t.Run("operation", func(t *testing.T) { + t.Parallel() + tags := e.getTags(OperationTags{Operation: "test"}) + assert.Equal(t, Tags{map[string]string{ + "operation": "test", + }}, tags) + }) + t.Run("dynamic operation", func(t *testing.T) { + t.Parallel() + tags := e.getTags(DynamicOperationTags{DynamicOperation: struct{}{}}) + assert.Empty(t, tags) + }) + t.Run("embedded operations", func(t *testing.T) { + t.Parallel() + type x struct { + OperationTags + + Field string `tag:"field"` + } + tags := e.getTags(x{ + OperationTags: OperationTags{Operation: "test"}, + Field: "value", + }) + assert.Equal(t, Tags{map[string]string{ + "field": "value", + "operation": "test", + }}, tags) + }) + t.Run("deeply embedded operations", func(t *testing.T) { + t.Parallel() + type X struct { // must be public so it can be accessed through Y + OperationTags + Field string `tag:"field"` + } + type Y struct { // must be public so it can be accessed through Z + TopField string `tag:"top_field"` + X + } + type z struct { + Reserved struct{} `tag:"reserved"` + Y + Another int `tag:"another"` + } + tags := e.getTags(z{ + Y: Y{ + TopField: "top", + X: X{ + OperationTags: OperationTags{ + Operation: "test", + }, + Field: "value", + }, + }, + Another: 42, + }) + assert.Equal(t, Tags{map[string]string{ + "top_field": "top", + "another": "42", + "operation": "test", + "field": "value", + }}, tags) + }) + t.Run("private with getter", func(t *testing.T) { + t.Parallel() + tags := e.getTags(privategetter{"anything"}) + assert.Equal(t, Tags{map[string]string{ + "private": "anything_via_getter", + }}, tags) + }) + t.Run("invalid", func(t *testing.T) { + t.Parallel() + type q struct { + private string `tag:"private"` + } + assert.PanicsWithValue(t, + "field private on type github.com/uber/cadence/common/metrics/structured.q "+ + "must be public or have a getter func on the parent struct", + func() { + tags := e.getTags(q{"anything"}) + t.Errorf("%#v", tags) // unreachable if working correctly + }, + ) + }) + +} + +type privategetter struct { + private string `tag:"private" getter:"GetIt"` +} + +func (p privategetter) GetIt() string { + return p.private + "_via_getter" +} diff --git a/common/resource/params.go b/common/resource/params.go index 7e48a56d220..aaed4ff26c5 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -42,6 +42,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" persistenceClient "github.com/uber/cadence/common/persistence/client" "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/rpc" @@ -68,6 +69,7 @@ type ( ClusterMetadata cluster.Metadata ReplicatorConfig config.Replicator MetricsClient metrics.Client + Emitter structured.Emitter MessagingClient messaging.Client BlobstoreClient blobstore.Client ESClient es.GenericClient diff --git a/common/resource/resource_impl.go b/common/resource/resource_impl.go index 48a9be7bd66..bd50919328d 100644 --- a/common/resource/resource_impl.go +++ b/common/resource/resource_impl.go @@ -57,6 +57,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -98,6 +99,7 @@ type Impl struct { timeSource clock.TimeSource payloadSerializer persistence.PayloadSerializer metricsClient metrics.Client + emitter structured.Emitter messagingClient messaging.Client blobstoreClient blobstore.Client archivalMetadata archiver.ArchivalMetadata @@ -360,6 +362,7 @@ func New( timeSource: clock.NewRealTimeSource(), payloadSerializer: persistence.NewPayloadSerializer(), metricsClient: params.MetricsClient, + emitter: params.Emitter, messagingClient: params.MessagingClient, blobstoreClient: params.BlobstoreClient, archivalMetadata: params.ArchivalMetadata, @@ -534,6 +537,11 @@ func (h *Impl) GetMetricsClient() metrics.Client { return h.metricsClient } +// GetMetricsEmitter returns a base structured metrics emitter +func (h *Impl) GetMetricsEmitter() structured.Emitter { + return h.emitter +} + // GetMessagingClient return messaging client func (h *Impl) GetMessagingClient() messaging.Client { return h.messagingClient diff --git a/common/resource/resource_mock.go b/common/resource/resource_mock.go index 58127259301..1fbf82491cd 100644 --- a/common/resource/resource_mock.go +++ b/common/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" rpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -498,6 +499,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/common/resource/resource_test_utils.go b/common/resource/resource_test_utils.go index 18249940c32..b40e89640fe 100644 --- a/common/resource/resource_test_utils.go +++ b/common/resource/resource_test_utils.go @@ -52,6 +52,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" @@ -74,6 +75,7 @@ type ( TimeSource clock.TimeSource PayloadSerializer persistence.PayloadSerializer MetricsClient metrics.Client + Emitter structured.Emitter ArchivalMetadata *archiver.MockArchivalMetadata ArchiverProvider *provider.MockArchiverProvider BlobstoreClient *blobstore.MockClient @@ -188,6 +190,7 @@ func NewTest( TimeSource: clock.NewRealTimeSource(), PayloadSerializer: persistence.NewPayloadSerializer(), MetricsClient: metrics.NewClient(scope, serviceMetricsIndex), + Emitter: structured.NewTestEmitter(t, scope), ArchivalMetadata: &archiver.MockArchivalMetadata{}, ArchiverProvider: &provider.MockArchiverProvider{}, BlobstoreClient: &blobstore.MockClient{}, @@ -295,6 +298,11 @@ func (s *Test) GetMetricsClient() metrics.Client { return s.MetricsClient } +// GetMetricsEmitter for testing +func (s *Test) GetMetricsEmitter() structured.Emitter { + return s.Emitter +} + // GetMessagingClient for testing func (s *Test) GetMessagingClient() messaging.Client { panic("user should implement this method for test") diff --git a/common/resource/types.go b/common/resource/types.go index fdc3701ece7..11920e32339 100644 --- a/common/resource/types.go +++ b/common/resource/types.go @@ -47,6 +47,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -79,6 +80,7 @@ type Resource interface { GetTimeSource() clock.TimeSource GetPayloadSerializer() persistence.PayloadSerializer GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetArchiverProvider() provider.ArchiverProvider GetMessagingClient() messaging.Client GetBlobstoreClient() blobstore.Client diff --git a/internal/tools/metricslint/analyzer.go b/internal/tools/metricslint/analyzer.go new file mode 100644 index 00000000000..87ac3e40f2a --- /dev/null +++ b/internal/tools/metricslint/analyzer.go @@ -0,0 +1,288 @@ +package metricslint + +import ( + "fmt" + "go/ast" + "go/token" + "go/types" + "maps" + "slices" + "strings" + + "golang.org/x/tools/go/analysis" + "golang.org/x/tools/go/analysis/passes/inspect" + "golang.org/x/tools/go/ast/inspector" +) + +const ( + emitterPkg = "github.com/uber/cadence/common/metrics/structured" + emitterTypeName = emitterPkg + ".Emitter" +) + +// Analyzer reports all (inline string) metric names passed to Emitter methods or defined in metrics/defs.go. +// +// This is NOT intended to be used directly, it's just using the analysis framework to simplify type checking. +// The output needs to be checked for duplicates or error messages by a separate process (see cmd/main.go). +var Analyzer = &analysis.Analyzer{ + Name: "metricnames", + Doc: "finds calls to metrics-emitting methods on Emitter, and metricDefinition names, and reports them for further analysis", + Requires: []*analysis.Analyzer{inspect.Analyzer}, + Run: run, +} + +// method names on the emitter that we care about, and the suffixes that are required (if any). +// method names are checked against the actual methods on the emitter type, +// but we need the list ahead of time or passes might be racing with initializing it. +var emitterMethods = map[string]map[string]bool{ + "Histogram": {"_ns": true}, // all our durations are in nanoseconds + "IntHistogram": {"_counts": true}, // differentiates from durations, will likely have multiple suffixes + "Count": nil, + "Gauge": nil, + "TagsFrom": nil, +} + +// TODO: does not yet ensure deep structure is valid, required by this reflection-branch to be safe at runtime +func run(pass *analysis.Pass) (interface{}, error) { + i := pass.ResultOf[inspect.Analyzer].(*inspector.Inspector) + + if pass.Pkg.Path() == "github.com/uber/cadence/common/metrics" { + // report all hard-coded metricDefinition names, so we can make sure + // there are no duplicates between the integers and the const strings + // passed to the emitter. + // + // if you want a duplicate: no, you really don't. + // use a different name, and once verified just swap to the old name and + // delete the integer def. otherwise you risk double-counting events if + // both are emitted, and can't tell if your tags are wrong. + // + // if you're swapping calls in one step, also delete the old definition. + // + // if you really do have an exception, modify this linter to allow it. + reportMetricDefinitionNames(pass, i) + // and DO NOT return, in case there are emitter calls in this package too, + // though currently that would imply an import loop. + } else if pass.Pkg.Path() == emitterPkg { + // read the methods on the emitter and the target methods, and make sure they're in sync. + checkTargetNamesMap(pass) + } + + // print calls to metrics-emitting methods on Emitter, regardless of location + reportMetricEmitterCalls(pass, i) + + return nil, nil +} + +func reportMetricDefinitionNames(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.CompositeLit)(nil), + } + i.Preorder(nodeFilter, func(n ast.Node) { + decl := n.(*ast.CompositeLit) + var metricNameField *ast.KeyValueExpr + for _, el := range decl.Elts { + kv, ok := el.(*ast.KeyValueExpr) + if !ok { + return // not a struct initializer + } + ident, ok := kv.Key.(*ast.Ident) + if !ok { + return // dynamic key in a dict or something, ignore + } + if ident.Name == "metricName" { + metricNameField = kv // currently this is exclusively used by metricDefinition + } + break + } + if metricNameField == nil { + return + } + str, ok := getConstantString(metricNameField.Value) + if !ok { + pass.Reportf(metricNameField.Pos(), "metricDefinition{metricName: ...} value must be an inline string literal") + return + } + pass.Reportf(metricNameField.Pos(), "success: %v", str) + }) +} + +func checkTargetNamesMap(pass *analysis.Pass) { + named, ok := pass.Pkg.Scope().Lookup("Emitter").Type().(*types.Named) + if !ok { + pass.Reportf(token.NoPos, "could not find Emitter type in %v", emitterPkg) + return + } + ms := types.NewMethodSet(named) + foundTargets := make(map[string]map[string]bool, len(emitterMethods)) + maps.Copy(foundTargets, emitterMethods) + for i := 0; i < ms.Len(); i++ { + m := ms.At(i) + if !m.Obj().Exported() { + continue // private methods are irrelevant + } + _, ok := emitterMethods[m.Obj().Name()] + if !ok { + foundTargets[m.Obj().Name()] = nil // add any missing keys to the map + } else { + delete(foundTargets, m.Obj().Name()) // remove any found ones from the map + } + } + if len(foundTargets) != 0 { + pass.Reportf(named.Obj().Pos(), "target methods do not match Emitter methods, diff: %v", foundTargets) + } +} + +func reportMetricEmitterCalls(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.File)(nil), // to skip test files + (*ast.FuncDecl)(nil), // to skip test helper funcs + (*ast.CallExpr)(nil), + } + i.Nodes(nodeFilter, func(n ast.Node, push bool) (proceed bool) { + // always descend by default, in case the call contains a closure that emits metrics. + // this covers the sync.Once case in the testdata, for example. + proceed = true + if !push { + return // do nothing when ascending the ast tree + } + + // check for test files, ignore their content if found + file, ok := n.(*ast.File) + if ok { + filename := pass.Fset.Position(file.Pos()).Filename + if strings.HasSuffix(filename, "_test.go") { + return false // don't inspect test files + } + return + } + + // check for test helper funcs anywhere, ignore their content if found. + // these are identified by a *testing.T as their first arg. + if fn, ok := n.(*ast.FuncDecl); ok { + if len(fn.Type.Params.List) > 0 { + firstArgType := pass.TypesInfo.TypeOf(fn.Type.Params.List[0].Type) + asStr := types.TypeString(firstArgType, nil) // "full/path/to.Type" + if asStr == "*testing.T" { + return false // don't inspect test helpers + } + } + return + } + + call := n.(*ast.CallExpr) // only other possibility due to nodeFilter + + // check if this is a method call (receiver.Method() == X.Sel) + // for one of the names we care about (as an early / efficient check) + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return + } + methodName := sel.Sel.Name + requiredSuffixes, ok := emitterMethods[methodName] + if !ok { + return + } + + // check if the receiver type is one we care about + receiver := pass.TypesInfo.TypeOf(sel.X) + // pointer receivers are not allowed, complain if found. + // if we just ignore them here, we might miss a metric call with a duplicate name. + isPointerReceiver := false + ptr, ok := receiver.(*types.Pointer) + for ; ok; ptr, ok = receiver.(*types.Pointer) { + isPointerReceiver = true + receiver = ptr.Elem() + } + named, ok := receiver.(*types.Named) + if !ok { + // allowed by the type system, but should be impossible in a CallExpr + pass.Reportf(sel.Pos(), "anonymous receiver of a method call should be impossible?") + return + } + fullName := getPkgAndName(named) + if fullName != emitterTypeName { + return // apparently not the emitter, it just has similarly-named methods (e.g. Tally). ignore it. + } + + // at this point we know that it's "a metrics method", make sure it's valid. + + if isPointerReceiver { + // could be allowed with a bit more .Elem() dereferencing in this analyzer, + // but currently blocked to intentionally force value types. + pass.Reportf(sel.Pos(), "pointer receivers are not allowed on metrics emission calls: %v", types.ExprString(sel)) + return + } + if len(call.Args) < 2 { + // 0 or 1 arg == not currently in use + pass.Reportf(call.Pos(), "method call %v looks like a metrics method, but has too few args", call) + return + } + + // pull out the first arg, and make sure it's a constant inline string. + nameArg := call.Args[0] + metricName, isConstant := getConstantString(nameArg) + if !isConstant { + pass.Reportf(nameArg.Pos(), "metric names must be in-line strings, not consts or vars: %v", nameArg) + return + } + if len(requiredSuffixes) > 0 { + matched := false + for suffix := range requiredSuffixes { + if strings.HasSuffix(metricName, suffix) { + matched = true + break + } + } + if !matched { + suffixes := make([]string, 0, len(requiredSuffixes)) + for s := range requiredSuffixes { + suffixes = append(suffixes, fmt.Sprintf("%q", s)) + } + slices.Sort(suffixes) + pass.Reportf( + nameArg.Pos(), + "metric name %q is not valid for method %v, it must have one of the following suffixes: %v", + metricName, methodName, strings.Join(suffixes, ", "), + ) + return + } + } + + // valid call! + // + // "report" the data we've found so it can be checked with a separate process (see cmd/main.go). + // that will error if any non-"success" lines were found, else check that the list is unique. + // + // tests may lead to the same package being processed more than once, but + // the reported lines for "path/to/file:line success: {name}" are still unique + // and that won't count as a duplicate. + pass.Reportf(call.Pos(), "success: %s", metricName) + return + }) +} + +func getPkgAndName(named *types.Named) (fullName string) { + obj := named.Obj() + if obj == nil { + // afaik not possible, this would be "a named type without a type" + return "" + } + pkg := obj.Pkg() + if pkg == nil { + // given where this is used, I believe this would currently imply + // "a builtin type with a method", which does not exist. + // but it's fine to return partial values, in case it's used in more places. + return obj.Name() + } + return pkg.Path() + "." + obj.Name() +} + +func getConstantString(expr ast.Expr) (str string, ok bool) { + lit, ok := expr.(*ast.BasicLit) + if !ok { + return "", false + } + if lit.Kind != token.STRING { + return "", false + } + return strings.Trim(lit.Value, `"`), true +} diff --git a/internal/tools/metricslint/analyzer_test.go b/internal/tools/metricslint/analyzer_test.go new file mode 100644 index 00000000000..55e262e7186 --- /dev/null +++ b/internal/tools/metricslint/analyzer_test.go @@ -0,0 +1,12 @@ +package metricslint + +import ( + "testing" + + "golang.org/x/tools/go/analysis/analysistest" +) + +func TestAnalyzer(t *testing.T) { + testdata := analysistest.TestData() + analysistest.Run(t, testdata, Analyzer, "github.com/uber/cadence/common/metrics/structured") +} diff --git a/internal/tools/metricslint/cmd/main.go b/internal/tools/metricslint/cmd/main.go new file mode 100644 index 00000000000..6fff6b1306e --- /dev/null +++ b/internal/tools/metricslint/cmd/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/exec" + "slices" + "strconv" + "strings" + + "golang.org/x/tools/go/analysis/singlechecker" + + "github.com/uber/cadence/internal/tools/metricslint" +) + +func main() { + var analyze bool + flag.BoolVar(&analyze, "analyze", false, "if true, run the analyzer normally (e.g. for debugging). if false, run the analyzer internally and interpret the results as a linter") + skip := map[string]int{} + flag.Func("skip", "metric name to ignore, comma, followed by the number of times it is duplicated. repeat to skip more things.", func(s string) error { + parts := strings.SplitN(s, ",", 2) + if len(parts) != 2 { + return fmt.Errorf("skip argument %q is not in the form `name,count`", s) + } + count, err := strconv.Atoi(parts[1]) + if err != nil || count < 2 { + return fmt.Errorf("skip argument %q has invalid count %q: %v", s, parts[1], err) + } + skip[parts[0]] = count + return nil + }) + flag.Parse() + + if analyze { + singlechecker.Main(metricslint.Analyzer) + } else { + os.Exit(checkAnalyzerOutput(skip)) + } +} + +func checkAnalyzerOutput(skip map[string]int) (exitCode int) { + cmd := exec.Command(os.Args[0], append([]string{"-analyze"}, os.Args[1:]...)...) + out, _ := cmd.CombinedOutput() + + defer func() { + // in case of crashes, print the output we got so far as it likely narrows down the cause. + if r := recover(); r != nil { + _, _ = fmt.Fprint(os.Stderr, "\n\n"+string(out)) + panic(r) + } + }() + + lines := strings.Split(string(out), "\n") + // map of metric name to set of lines using it (to deduplicate) + names := make(map[string]map[string]struct{}, len(lines)) + var failures []string + for _, line := range lines { + if line == "" { + continue // empty lines are fine + } + words := strings.Fields(line) + if len(words) == 3 && words[1] == "success:" { + prev, ok := names[words[2]] + if !ok { + prev = make(map[string]struct{}) + } + prev[line] = struct{}{} + names[words[2]] = prev + } else { + failures = append(failures, line) + } + } + + if len(failures) > 0 { + _, _ = fmt.Fprintln(os.Stderr, strings.Join(failures, "\n")+"\n") + return 1 + } + + exitCode = 0 + for name, usages := range names { + var complain bool + expected, ok := skip[name] + if ok { + complain = len(usages) != expected // expectation mismatch + } else { + complain = len(usages) > 1 // must not have duplicates + } + if complain { + if expected > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in an unexpected number of places:\n", name) + } else { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in multiple places:\n", name) + } + // frustratingly: this can't use `maps.Keys` because it + // returns an iterator, not a slice, so it can't be sorted or `...`-spread. + // bleh. + sorted := make([]string, 0, len(usages)) + for usage := range usages { + sorted = append(sorted, usage) + } + slices.Sort(sorted) + for usage := range usages { + _, _ = fmt.Fprintf(os.Stderr, "\t%s\n", usage) + } + exitCode = 1 + } + } + + return exitCode +} diff --git a/internal/tools/metricslint/testdata/go.mod b/internal/tools/metricslint/testdata/go.mod new file mode 100644 index 00000000000..42d6cbc8c14 --- /dev/null +++ b/internal/tools/metricslint/testdata/go.mod @@ -0,0 +1,4 @@ +module github.com/uber/cadence/common/metrics/structured +// ^ must be the same package as the emitter's definition for the hardcoded path to work right + +go 1.24 \ No newline at end of file diff --git a/internal/tools/metricslint/testdata/metrics.go b/internal/tools/metricslint/testdata/metrics.go new file mode 100644 index 00000000000..b6078034f00 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics.go @@ -0,0 +1,67 @@ +package structured + +import ( + "sync" + "testing" + "time" +) + +const disallowedConst = "dynamic-name" + +type Emitter struct{} +type Tags map[string]string + +func (e Emitter) Count(name string, value int, meta Tags) {} +func (e Emitter) Histogram(name string, buckets []time.Duration, dur time.Duration, meta Tags) {} +func (e Emitter) Gauge(name string, value float64, meta Tags) {} +func (e Emitter) IntHistogram(name string, buckets []time.Duration, num int, meta Tags) {} + +type UnrelatedThing struct{ metricName string } + +var _ = UnrelatedThing{metricName: disallowedConst} + +func (n UnrelatedThing) Gauge(name string) {} +func (n UnrelatedThing) Count(name string, value int, meta Tags) {} + +// test helper funcs can violate the rules, ignore them. +// they must have *testing.T as the first arg +func TestHelper(t *testing.T, name string) { + var emitter Emitter + emitter.Count(name, 5, nil) +} + +func someMetricsCalls() { + var emitter Emitter + tags := Tags{"key": "value"} + + // valid calls on the emitter + emitter.Count("test-count", 5, tags) // want `success: test-count` + emitter.Histogram("test-histogram_ns", nil, time.Second, tags) // want `success: test-histogram_ns` + + // duplicates are not blocked at this stage, it's run separately because of how the analysis framework works + emitter.Count("duplicate-metric", 1, tags) // want `success: duplicate-metric` + emitter.Count("duplicate-metric", 2, tags) // want `success: duplicate-metric` + + // wrong suffix + emitter.Histogram("test-histogram", nil, time.Second, tags) // want `metric name "test-histogram" is not valid for method Histogram, it must have one of the following suffixes: "_ns"` + + // string vars are not good + dynamicName := "dynamic-name" + emitter.Count(dynamicName, 1, tags) // want `metric names must be in-line strings, not consts or vars: dynamicName` + + // named consts are also not good + // (this is fairly easy to allow though, if strongly desired) + emitter.Count(disallowedConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + + var unrelated UnrelatedThing + name := "asdf" + unrelated.Gauge(name) // ignored, not on Emitter + unrelated.Count(name, 42, tags) // ignored, not on Emitter + + // contents of closures must be checked too (ensures we recurse into calls) + var once sync.Once + once.Do(func() { + emitter.Gauge(disallowedConst, 9001, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + emitter.IntHistogram("hist_counts", nil, 42, tags) // want `success: hist_counts` + }) +} diff --git a/internal/tools/metricslint/testdata/metrics_gen.go b/internal/tools/metricslint/testdata/metrics_gen.go new file mode 100644 index 00000000000..02669e19d57 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_gen.go @@ -0,0 +1,20 @@ +package structured + +// Code generated by test_shenanigans; DO NOT EDIT + +import "time" + +func GeneratedFunction() { + var emitter Emitter + tags := Tags{} + + // allowed, generated code needs to follow the same rules + emitter.Count("base_count", 1, tags) // want `success: base_count` + emitter.Gauge("gauge", 12, tags) // want `success: gauge` + emitter.Histogram("hist_ns", nil, time.Second, tags) // want `success: hist_ns` + + // disallowed, generated code needs to follow the same rules + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` + emitter.Count(sneakyConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` +} diff --git a/internal/tools/metricslint/testdata/metrics_test.go b/internal/tools/metricslint/testdata/metrics_test.go new file mode 100644 index 00000000000..0067efd8e67 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_test.go @@ -0,0 +1,20 @@ +package structured + +import ( + "time" +) + +func ignoredInTestFiles() { + var emitter Emitter + tags := Tags{} + + // test code is allowed to break the rules because it does not run in production + + emitter.Count("base_count", 1, tags) + emitter.Gauge("gauge", 12, tags) + emitter.Histogram("hist_ns", nil, time.Second, tags) + + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) + emitter.Count(sneakyConst, 1, tags) +} diff --git a/revive.toml b/revive.toml index 2238d4dd4c9..d1b939df7af 100644 --- a/revive.toml +++ b/revive.toml @@ -20,7 +20,7 @@ warningCode = 0 [rule.errorf] # [rule.exported] # disabled due to lack of value / encouraging bad habits [rule.if-return] -[rule.increment-decrement] +# [rule.increment-decrement] # noisy on generated code, otherwise seems fine - if there's a way to fix that, probably re-enable [rule.indent-error-flow] # Disabled because we have 158 packages that need package comments; we could instead add ignore # directives for existing packages and require it for new packages. @@ -62,7 +62,7 @@ arguments=[["loop","method-call","recover","return", "immediate-recover"]] [rule.range-val-address] # beneficial [rule.range-val-in-closure] # beneficial [rule.unconditional-recursion] # probably a good idea -[rule.unreachable-code] # code simplifier +# [rule.unreachable-code] # reasonable code simplifier, but does not match compiler behavior (e.g. log.Fatal still needs panic or return) [rule.waitgroup-by-value] # correct use of sync code, important #### unused utilities diff --git a/service/history/engine/engineimpl/history_engine.go b/service/history/engine/engineimpl/history_engine.go index 576b2998707..573ac483846 100644 --- a/service/history/engine/engineimpl/history_engine.go +++ b/service/history/engine/engineimpl/history_engine.go @@ -279,6 +279,7 @@ func NewEngineWithShardContext( historyEngImpl, config, shard.GetMetricsClient(), + shard.GetMetricsEmitter(), replicationTaskFetcher, replicationTaskExecutor, shard.GetTimeSource(), diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 2e0c9416950..abc33861f94 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation" @@ -73,18 +74,19 @@ type ( // taskProcessorImpl is responsible for processing replication tasks for a shard. taskProcessorImpl struct { - currentCluster string - sourceCluster string - status int32 - shard shard.Context - historyEngine engine.Engine - historySerializer persistence.PayloadSerializer - config *config.Config - metricsClient metrics.Client - logger log.Logger - taskExecutor TaskExecutor - hostRateLimiter quotas.Limiter - shardRateLimiter quotas.Limiter + currentCluster string + sourceCluster string + status int32 + shard shard.Context + historyEngine engine.Engine + historySerializer persistence.PayloadSerializer + config *config.Config + metricsClient metrics.Client + perDomainTaskMetrics perDomainTaskMetricTags + logger log.Logger + taskExecutor TaskExecutor + hostRateLimiter quotas.Limiter + shardRateLimiter quotas.Limiter taskRetryPolicy backoff.RetryPolicy dlqRetryPolicy backoff.RetryPolicy @@ -103,8 +105,49 @@ type ( token *types.ReplicationToken respChan chan<- *types.ReplicationMessages } + + // all metrics tags are dynamic per task and cannot be filled in up-front. + // + // skip:Convenience unable to use ad-hoc due to dynamic values + perDomainTaskMetricTags struct { + structured.Emitter + structured.DynamicOperationTags + + TargetCluster string `tag:"target_cluster"` + Domain struct{} `tag:"domain"` + + ts clock.TimeSource + } ) +func (p perDomainTaskMetricTags) taskProcessed(operation int, domain string, processingStart time.Time, task *types.ReplicationTask, legacyScope metrics.Scope) { + tags := p.TagsFrom(p).With("domain", domain) + tags = p.WithOperationInt(operation, tags) + + now := p.ts.Now() + processingLatency := now.Sub(processingStart) + replicationLatency := now.Sub(time.Unix(0, task.GetCreationTime())) + + // single-task processing latency + // caution: prometheus will not allow timers and histograms to use the same name, + // so the "_ns" suffix here is necessary. + p.Histogram("task_processing_latency_ns", structured.Low1ms10s, processingLatency, tags) + // latency from task generated to task received + p.Histogram("task_replication_latency_ns", structured.Mid1ms24h, replicationLatency, tags) + // number of replication tasks + // this is an exact match for the legacy scope, so it would cause duplicates if emitted. + // because this is the first use of this new system, we'll verify the tags with the histograms first. + // p.Count("replication_tasks_applied_per_domain", 1, tags) + + // emit single task processing latency + legacyScope.RecordTimer(metrics.TaskProcessingLatency, processingLatency) + // emit latency from task generated to task received + legacyScope.RecordTimer(metrics.ReplicationTaskLatency, replicationLatency) + // emit the number of replication tasks. + // when removing, be sure to un-comment the p.Count above + legacyScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) +} + var _ TaskProcessor = (*taskProcessorImpl)(nil) // NewTaskProcessor creates a new replication task processor. @@ -113,6 +156,7 @@ func NewTaskProcessor( historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, + emitter structured.Emitter, taskFetcher TaskFetcher, taskExecutor TaskExecutor, clock clock.TimeSource, @@ -134,14 +178,19 @@ func NewTaskProcessor( noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, clock) return &taskProcessorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: sourceCluster, - status: common.DaemonStatusInitialized, - shard: shard, - historyEngine: historyEngine, - historySerializer: persistence.NewPayloadSerializer(), - config: config, - metricsClient: metricsClient, + currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + sourceCluster: sourceCluster, + status: common.DaemonStatusInitialized, + shard: shard, + historyEngine: historyEngine, + historySerializer: persistence.NewPayloadSerializer(), + config: config, + metricsClient: metricsClient, + perDomainTaskMetrics: perDomainTaskMetricTags{ + Emitter: emitter, + TargetCluster: sourceCluster, // emitted as "target_cluster" and logged inconsistently, unsure why + ts: clock, + }, logger: shard.GetLogger().WithTags(tag.SourceCluster(sourceCluster), tag.ShardID(shardID)), taskExecutor: taskExecutor, hostRateLimiter: taskFetcher.GetRateLimiter(), @@ -467,25 +516,18 @@ func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTa if err != nil { p.updateFailureMetric(scope, err, p.shard.GetShardID()) } else { - now := ts.Now() mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster)) domainID := replicationTask.HistoryTaskV2Attributes.GetDomainID() + var domainName string if domainID != "" { - domainName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) + name, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) if errorDomainName != nil { return errorDomainName } + domainName = name mScope = mScope.Tagged(metrics.DomainTag(domainName)) } - // emit single task processing latency - mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime)) - // emit latency from task generated to task received - mScope.RecordTimer( - metrics.ReplicationTaskLatency, - now.Sub(time.Unix(0, replicationTask.GetCreationTime())), - ) - // emit the number of replication tasks - mScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) + p.perDomainTaskMetrics.taskProcessed(scope, domainName, startTime, replicationTask, mScope) shardScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster), metrics.InstanceTag(strconv.Itoa(p.shard.GetShardID()))) shardScope.IncCounter(metrics.ReplicationTasksApplied) diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index d7b54163ff6..0c320a14aa3 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -49,6 +49,7 @@ import ( "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" @@ -154,6 +155,7 @@ func (s *taskProcessorSuite) SetupTest() { s.mockEngine, s.config, metricsClient, + structured.NewTestEmitter(s.T(), nil), s.taskFetcher, s.taskExecutor, s.clock, @@ -646,6 +648,7 @@ func TestProcessorLoop_TaskExecuteFailed_ShardChangeErr(t *testing.T) { mockEngine, config, metricsClient, + structured.NewTestEmitter(t, nil), taskFetcher, taskExecutor, clock.NewMockedTimeSource(), diff --git a/service/history/resource/resource_mock.go b/service/history/resource/resource_mock.go index 184fa60e971..a97aefb5ade 100644 --- a/service/history/resource/resource_mock.go +++ b/service/history/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" algorithm "github.com/uber/cadence/common/quotas/global/algorithm" @@ -489,6 +490,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 9d0d2355d59..cecd64a40a0 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" @@ -65,6 +66,7 @@ type ( GetLogger() log.Logger GetThrottledLogger() log.Logger GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetTimeSource() clock.TimeSource PreviousShardOwnerWasDifferent() bool diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index dc774b2a714..027da55949b 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -22,6 +22,7 @@ import ( cluster "github.com/uber/cadence/common/cluster" log "github.com/uber/cadence/common/log" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" types "github.com/uber/cadence/common/types" config "github.com/uber/cadence/service/history/config" @@ -353,6 +354,20 @@ func (mr *MockContextMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockContext)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockContext) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockContextMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockContext)(nil).GetMetricsEmitter)) +} + // GetQueueAckLevel mocks base method. func (m *MockContext) GetQueueAckLevel(category persistence.HistoryTaskCategory) persistence.HistoryTaskKey { m.ctrl.T.Helper()