diff --git a/Makefile b/Makefile index 51bd35f16a6..3d41543de5e 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ $(BUILD)/goversion-lint: $(BUILD)/fmt: $(BUILD)/codegen # formatting must occur only after all other go-file-modifications are done # $(BUILD)/copyright # $(BUILD)/copyright: $(BUILD)/codegen # must add copyright to generated code, sometimes needs re-formatting -$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc +$(BUILD)/codegen: $(BUILD)/thrift $(BUILD)/protoc $(BUILD)/metrics $(BUILD)/thrift: $(BUILD)/go_mod_check $(BUILD)/protoc: $(BUILD)/go_mod_check $(BUILD)/go_mod_check: @@ -211,6 +211,12 @@ $(BIN)/protoc-gen-gogofast: go.mod go.work | $(BIN) $(BIN)/protoc-gen-yarpc-go: go.mod go.work | $(BIN) $(call go_mod_build_tool,go.uber.org/yarpc/encoding/protobuf/protoc-gen-yarpc-go) +$(BIN)/metricsgen: internal/tools/go.mod go.work $(wildcard internal/tools/metricsgen/*) | $(BIN) + $(call go_build_tool,./metricsgen) + +$(BIN)/metricslint: internal/tools/go.mod go.work $(wildcard internal/tools/metricslint/* internal/tools/metricslint/cmd/*) | $(BIN) + $(call go_build_tool,./metricslint/cmd,metricslint) + $(BUILD)/go_mod_check: go.mod internal/tools/go.mod go.work $Q # generated == used is occasionally important for gomock / mock libs in general. this is not a definite problem if violated though. $Q ./scripts/check-gomod-version.sh github.com/golang/mock/gomock $(if $(verbose),-v) @@ -352,6 +358,10 @@ $(BUILD)/protoc: $(PROTO_FILES) $(STABLE_BIN)/$(PROTOC_VERSION_BIN) $(BIN)/proto fi $Q touch $@ +$(BUILD)/metrics: $(ALL_SRC) $(BIN)/metricsgen + $Q $(BIN_PATH) go generate -run=metricsgen ./... + $Q touch $@ + # ==================================== # Rule-breaking targets intended ONLY for special cases with no good alternatives. # ==================================== @@ -404,6 +414,11 @@ $(BUILD)/code-lint: $(LINT_SRC) $(BIN)/revive | $(BUILD) fi $Q touch $@ +$(BUILD)/metrics-lint: $(ALL_SRC) $(BIN)/metricslint | $(BUILD) + $Q echo "linting metrics definitions..." + $Q $(BIN_PATH) $(BIN)/metricslint -skip cadence_requests_per_tl,2 -skip cache_hit,2 -skip cache_full,2 -skip cache_miss,2 -skip cross_cluster_fetch_errors,2 ./... + $Q touch $@ + $(BUILD)/goversion-lint: go.work Dockerfile docker/github_actions/Dockerfile${DOCKERFILE_SUFFIX} $Q echo "checking go version..." $Q # intentionally using go.work toolchain, as GOTOOLCHAIN is user-overridable @@ -458,7 +473,7 @@ endef # useful to actually re-run to get output again. # reuse the intermediates for simplicity and consistency. lint: ## (Re)run the linter - $(call remake,proto-lint gomod-lint code-lint goversion-lint) + $(call remake,proto-lint gomod-lint code-lint goversion-lint metrics-lint) # intentionally not re-making, it's a bit slow and it's clear when it's unnecessary fmt: $(BUILD)/fmt ## Run `gofmt` / organize imports / etc @@ -544,14 +559,20 @@ bins: $(BINS) ## Build all binaries, and any fast codegen needed (does not refre tools: $(TOOLS) -go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap ## Run `go generate` to regen mocks, enums, etc +go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap $(BIN)/metricsgen ## Run `go generate` to regen mocks, enums, etc $Q echo "running go generate ./..., this takes a minute or more..." $Q # add our bins to PATH so `go generate` can find them $Q $(BIN_PATH) go generate $(if $(verbose),-v) ./... + $Q touch $(BUILD)/metrics # whole-service go-generate also regenerates metrics $Q $(MAKE) --no-print-directory fmt # $Q echo "updating copyright headers" # $Q $(MAKE) --no-print-directory copyright +metrics: $(BIN)/metricsgen ## metrics-only code regen, much faster than go-generate + $Q echo "re-generating metrics structs..." + $Q $(MAKE) $(BUILD)/metrics + $Q $(MAKE) fmt # clean up imports + release: ## Re-generate generated code and run tests $(MAKE) --no-print-directory go-generate $(MAKE) --no-print-directory test @@ -577,7 +598,7 @@ tidy: ## `go mod tidy` all packages clean: ## Clean build products and SQLite database rm -f $(BINS) rm -Rf $(BUILD) - rm *.db + rm -f *.db $(if \ $(wildcard $(STABLE_BIN)/*), \ $(warning usually-stable build tools still exist, delete the $(STABLE_BIN) folder to rebuild them),) diff --git a/cmd/server/cadence/fx.go b/cmd/server/cadence/fx.go index 147dba12e01..719a2d48afa 100644 --- a/cmd/server/cadence/fx.go +++ b/cmd/server/cadence/fx.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/metrics/metricsfx" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/rpc/rpcfx" "github.com/uber/cadence/common/service" @@ -101,6 +102,7 @@ type AppParams struct { DynamicConfig dynamicconfig.Client Scope tally.Scope MetricsClient metrics.Client + Emitter structured.Emitter } // NewApp created a new Application from pre initalized config and logger. @@ -112,6 +114,7 @@ func NewApp(params AppParams) *App { dynamicConfig: params.DynamicConfig, scope: params.Scope, metricsClient: params.MetricsClient, + emitter: params.Emitter, } params.LifeCycle.Append(fx.StartHook(app.verifySchema)) @@ -128,13 +131,14 @@ type App struct { dynamicConfig dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter daemon common.Daemon service string } func (a *App) Start(_ context.Context) error { - a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient) + a.daemon = newServer(a.service, a.cfg, a.logger, a.dynamicConfig, a.scope, a.metricsClient, a.emitter) a.daemon.Start() return nil } diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 873ce35a4aa..01b8f5715c5 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -54,6 +54,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/peerprovider/ringpopprovider" pnt "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/resource" @@ -79,12 +80,13 @@ type ( dynamicCfgClient dynamicconfig.Client scope tally.Scope metricsClient metrics.Client + emitter structured.Emitter } ) // newServer returns a new instance of a daemon // that represents a cadence service -func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client) common.Daemon { +func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgClient dynamicconfig.Client, scope tally.Scope, metricsClient metrics.Client, emitter structured.Emitter) common.Daemon { return &server{ cfg: cfg, name: service, @@ -93,6 +95,7 @@ func newServer(service string, cfg config.Config, logger log.Logger, dynamicCfgC dynamicCfgClient: dynamicCfgClient, scope: scope, metricsClient: metricsClient, + emitter: emitter, } } @@ -142,6 +145,7 @@ func (s *server) startService() common.Daemon { params.MetricScope = s.scope params.MetricsClient = s.metricsClient + params.Emitter = s.emitter rpcParams, err := rpc.NewParams(params.Name, &s.cfg, dc, params.Logger, params.MetricsClient) if err != nil { diff --git a/cmd/server/cadence/server_test.go b/cmd/server/cadence/server_test.go index 902f5fd5eef..ea6bb16be66 100644 --- a/cmd/server/cadence/server_test.go +++ b/cmd/server/cadence/server_test.go @@ -44,6 +44,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" pt "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite" "github.com/uber/cadence/common/resource" @@ -110,7 +111,15 @@ func (s *ServerSuite) TestServerStartup() { }) for _, svc := range services { - server := newServer(svc, cfg, logger, dynamicconfig.NewNopClient(), tally.NoopScope, metrics.NewNoopMetricsClient()) + server := newServer( + svc, + cfg, + logger, + dynamicconfig.NewNopClient(), + tally.NoopScope, + metrics.NewNoopMetricsClient(), + structured.NewTestEmitter(s.T(), nil), + ) daemons = append(daemons, server) server.Start() } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 00214a52649..a1d546ede05 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -53,6 +53,10 @@ type ( ServiceIdx int ) +func (s scopeDefinition) GetOperationString() string { + return s.operation +} + // MetricTypes which are supported const ( Counter MetricType = iota @@ -1068,7 +1072,7 @@ const ( // -- Operation scopes for History service -- const ( // HistoryStartWorkflowExecutionScope tracks StartWorkflowExecution API calls received by service - HistoryStartWorkflowExecutionScope = iota + NumCommonScopes + HistoryStartWorkflowExecutionScope = iota + NumFrontendScopes // HistoryRecordActivityTaskHeartbeatScope tracks RecordActivityTaskHeartbeat API calls received by service HistoryRecordActivityTaskHeartbeatScope // HistoryRespondDecisionTaskCompletedScope tracks RespondDecisionTaskCompleted API calls received by service @@ -1356,7 +1360,7 @@ const ( // -- Operation scopes for Matching service -- const ( // PollForDecisionTaskScope tracks PollForDecisionTask API calls received by service - MatchingPollForDecisionTaskScope = iota + NumCommonScopes + MatchingPollForDecisionTaskScope = iota + NumHistoryScopes // PollForActivityTaskScope tracks PollForActivityTask API calls received by service MatchingPollForActivityTaskScope // MatchingAddActivityTaskScope tracks AddActivityTask API calls received by service @@ -1392,7 +1396,7 @@ const ( // -- Operation scopes for Worker service -- const ( // ReplicationScope is the scope used by all metric emitted by replicator - ReplicatorScope = iota + NumCommonScopes + ReplicatorScope = iota + NumMatchingScopes // DomainReplicationTaskScope is the scope used by domain task replication processing DomainReplicationTaskScope // ESProcessorScope is scope used by all metric emitted by esProcessor @@ -1440,7 +1444,7 @@ const ( // -- Operation scopes for ShardDistributor service -- const ( // ShardDistributorGetShardOwnerScope tracks GetShardOwner API calls received by service - ShardDistributorGetShardOwnerScope = iota + NumCommonScopes + ShardDistributorGetShardOwnerScope = iota + NumWorkerScopes ShardDistributorHeartbeatScope ShardDistributorAssignLoopScope @@ -2700,12 +2704,13 @@ const ( VirtualQueueCountGauge VirtualQueuePausedGauge VirtualQueueRunningGauge + NumHistoryMetrics ) // Matching metrics enum const ( - PollSuccessPerTaskListCounter = iota + NumCommonMetrics + PollSuccessPerTaskListCounter = iota + NumHistoryMetrics PollTimeoutPerTaskListCounter PollSuccessWithSyncPerTaskListCounter LeaseRequestPerTaskListCounter @@ -2783,12 +2788,13 @@ const ( IsolationGroupUpscale IsolationGroupDownscale PartitionDrained + NumMatchingMetrics ) // Worker metrics enum const ( - ReplicatorMessages = iota + NumCommonMetrics + ReplicatorMessages = iota + NumMatchingMetrics ReplicatorFailures ReplicatorMessagesDropped ReplicatorLatency @@ -2872,12 +2878,13 @@ const ( DiagnosticsWorkflowStartedCount DiagnosticsWorkflowSuccess DiagnosticsWorkflowExecutionLatency + NumWorkerMetrics ) // ShardDistributor metrics enum const ( - ShardDistributorRequests = iota + NumCommonMetrics + ShardDistributorRequests = iota + NumWorkerMetrics ShardDistributorFailures ShardDistributorLatency ShardDistributorErrContextTimeoutCounter diff --git a/common/metrics/defs_test.go b/common/metrics/defs_test.go index 122aa359645..db6c15a96a3 100644 --- a/common/metrics/defs_test.go +++ b/common/metrics/defs_test.go @@ -129,6 +129,39 @@ func TestMetricDefs(t *testing.T) { } } +// "index -> operation" must be unique for structured.DynamicOperationTags' int lookup to work consistently. +// Duplicate indexes with the same operation name are technically fine, but there doesn't seem to be any benefit in allowing it, +// and it trivially ensures that all indexes have only one operation name. +func TestOperationIndexesAreUnique(t *testing.T) { + seen := make(map[int]bool) + for serviceIdx, serviceOps := range ScopeDefs { + for idx := range serviceOps { + if seen[idx] { + t.Error("duplicate operation index:", idx, "with name:", serviceOps[idx].operation, "in service:", serviceIdx) + } + seen[idx] = true + } + } +} + +func TestMetricsAreUnique(t *testing.T) { + // Duplicate indexes is arguably fine, but there doesn't seem to be any benefit in allowing it. + // + // Duplicate names are also linted, but they're done via an analyzer (metricslint) instead, to + // allow checking across multiple formats. + t.Run("indexes", func(t *testing.T) { + seen := make(map[int]bool) + for _, serviceMetrics := range MetricDefs { + for idx := range serviceMetrics { + if seen[idx] { + t.Error("duplicate metric index:", idx, "with name:", serviceMetrics[idx].metricName) + } + seen[idx] = true + } + } + }) +} + func TestExponentialDurationBuckets(t *testing.T) { factor := math.Pow(2, 0.25) assert.Equal(t, 80, len(ExponentialDurationBuckets)) diff --git a/common/metrics/metricsfx/metricsfx.go b/common/metrics/metricsfx/metricsfx.go index 56702a26107..790880aee67 100644 --- a/common/metrics/metricsfx/metricsfx.go +++ b/common/metrics/metricsfx/metricsfx.go @@ -29,12 +29,15 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/service" ) // Module provides metrics client for fx application. var Module = fx.Module("metricsfx", - fx.Provide(buildClient)) + fx.Provide(buildClient), + structured.Module, +) // ModuleForExternalScope provides metrics client for fx application when tally.Scope is created outside. var ModuleForExternalScope = fx.Module("metricsfx", diff --git a/common/metrics/structured/base.go b/common/metrics/structured/base.go new file mode 100644 index 00000000000..542e8616d9b --- /dev/null +++ b/common/metrics/structured/base.go @@ -0,0 +1,149 @@ +package structured + +import ( + "maps" + "strconv" + "strings" + "testing" + "time" + + "github.com/uber-go/tally" + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide(func(s tally.Scope) Emitter { + return Emitter{scope: s} + }), +) + +// Metadata is a shared interface for all "...Tags" structs. +// +// You are generally NOT expected to implement any of this yourself. +// Just define your struct, and let the code generator take care of it (`make metrics`). +// +// For the intended usage and implementation, see generated code. +type Metadata interface { + NumTags() int // for efficient pre-allocation + PutTags(into map[string]string) // populates the map + GetTags() map[string]string // returns a pre-allocated and pre-populated map +} + +// DynamicTags is a very simple helper for treating an arbitrary map as a Metadata. +// +// This can be used externally (for completely manual metrics) or in metrics-emitting +// methods to simplify adding custom tags (e.g. it is returned from GetTags). +type DynamicTags map[string]string + +var _ Metadata = DynamicTags{} + +func (o DynamicTags) NumTags() int { return len(o) } +func (o DynamicTags) PutTags(into map[string]string) { maps.Copy(into, o) } +func (o DynamicTags) GetTags() map[string]string { return maps.Clone(o) } + +// Emitter is the base helper for emitting metrics, and it contains only low-level +// metrics-emitting funcs to keep it as simple as possible. +// +// It is intended to be used with the `make metrics` code generator and structs-of-tags, +// but it's intentionally possible to (ab)use it by hand because ad-hoc metrics +// should be easy and encouraged. +// +// Metadata can be constructed from any map via DynamicTags, but this API intentionally hides +// [tally.Scope.Tagged] because it's (somewhat) memory-wasteful, self-referential interfaces are +// difficult to mock, and it's very hard to figure out what tags may be present at runtime. +// +// TODO: this can / likely should be turned into an interface to allow disconnecting from tally, +// to allow providing a specific version or to drop it entirely if desired. +type Emitter struct { + // intentionally NOT no-op by default. + // + // use a test emitter in tests, it should be quite easy to construct, + // and this way it will panic if forgotten for some reason, rather than + // causing a misleading lack-of-metrics. + // + // currently, because this is constructed by common/config/metrics.go, + // this scope already contains the `cadence_service:cadence-{whatever}` tag, + // but essentially no others (aside from platform-level stuff). + // you can get the instance from go.uber.org/fx, as just `tally.Scope`. + scope tally.Scope +} + +// Histogram records a duration-based histogram with the provided data. +// It adds a "histogram_scale" tag, so histograms can be accurately subset in queries or via middleware. +func (b Emitter) Histogram(name string, buckets SubsettableHistogram, dur time.Duration, meta Metadata) { + tags := make(DynamicTags, meta.NumTags()+1) + meta.PutTags(tags) + + // all subsettable histograms need to emit scale values so scale changes + // can be correctly merged at query time. + if _, ok := tags["histogram_scale"]; ok { + // rewrite the existing tag so it can be noticed + tags["error_rename_this_tag_histogram_scale"] = tags["histogram_scale"] + } + tags["histogram_scale"] = strconv.Itoa(buckets.scale) + + if !strings.HasSuffix(name, "_ns") { + // duration-based histograms are always in nanoseconds, + // and the name MUST be different from timers while we migrate, + // so this ensures we always have a unique _ns suffix. + // + // hopefully this is never used, but it'll at least make it clear if it is. + name = name + "_error_missing_suffix_ns" + } + b.scope.Tagged(tags).Histogram(name, buckets).RecordDuration(dur) +} + +// IntHistogram records a count-based histogram with the provided data. +// It adds a "histogram_scale" tag, so histograms can be accurately subset in queries or via middleware. +func (b Emitter) IntHistogram(name string, buckets IntSubsettableHistogram, num int, meta Metadata) { + tags := make(DynamicTags, meta.NumTags()+1) + meta.PutTags(tags) + + // all subsettable histograms need to emit scale values so scale changes + // can be correctly merged at query time. + if _, ok := tags["histogram_scale"]; ok { + // rewrite the existing tag so it can be noticed + tags["error_rename_this_tag_histogram_scale"] = tags["histogram_scale"] + } + tags["histogram_scale"] = strconv.Itoa(buckets.scale) + + if !strings.HasSuffix(name, "_counts") { + // int-based histograms are always in "_counts" (currently anyway), + // and the name MUST be different from timers while we migrate. + // so this ensures we always have a unique _counts suffix. + // + // hopefully this is never used, but it'll at least make it clear if it is. + name = name + "_error_missing_suffix_counts" + } + b.scope.Tagged(tags).Histogram(name, buckets).RecordDuration(time.Duration(num)) +} + +// TODO: make a MinMaxHistogram helper which maintains a precise, rolling +// min/max gauge, over a window larger than the metrics granularity (e.g. ~20s) +// to work around gauges' last-data-only behavior. +// +// This will likely require some additional state though, and might benefit from +// keeping that state further up the Tags-stack to keep contention and +// series-deduplication-costs low. +// +// Maybe OTEL / Prometheus will natively support this one day. It'd be simple. + +// Count records a counter with the provided data. +func (b Emitter) Count(name string, num int, meta Metadata) { + b.scope.Tagged(meta.GetTags()).Counter(name).Inc(int64(num)) +} + +// Gauge emits a gauge with the provided data. +func (b Emitter) Gauge(name string, val float64, meta Metadata) { + b.scope.Tagged(meta.GetTags()).Gauge(name).Update(val) +} + +// NewTestEmitter creates an emitter for tests, optionally using the provided scope. +// If scope is nil, a no-op scope will be used. +func NewTestEmitter(t *testing.T, scope tally.Scope) Emitter { + t.Name() // require non-nil + if scope == nil { + scope = tally.NoopScope + } + return Emitter{scope} +} diff --git a/common/metrics/structured/doc.go b/common/metrics/structured/doc.go new file mode 100644 index 00000000000..d8dc6c0d567 --- /dev/null +++ b/common/metrics/structured/doc.go @@ -0,0 +1,103 @@ +/* +Package structured contains the base objects for a struct-based metrics system. + +This is intended to be used with internal/tools/metricsgen, but the Emitter is +public on many StructTags to ensure ad-hoc metrics are still simple to emit (and +to make codegen reasonably easy). + +For concrete details, check the generated code of any ...Tags structs, or the +generator in [github.com/uber/cadence/internal/tools/metricsgen]. + +# To make a new metrics-tag-containing struct + + - Define a `type ...Tags struct` anywhere. These can be public or private. + - Embed any parent ...Tags structs desired, and add any fields to store tag values + (or declare that they will be emitted, if they are not static) + - Add a `//go:generate metricsgen` comment to the file (if not already present) + - Run `make metrics` to generate the supporting code + +In many cases, that's likely enough. Construct your new thing and use it: + + thing := NewYourTags(parents, and, tags) // get it from somewhere + thing.Count("name", 1) // "name" must be unique within Cadence + // or inside a method on YourTags: + func (y YourTags) ItHappened() { + y.Count("it_happened", 1) + } + +to emit a metric with all the associated tags. + +# To add new tags to existing metrics / structs + +Add the field and run `make metrics`. + +This will re-generate the constructor(s), which will lead to a broken build. +Just chase build failures until you've ensured that every code path has access +to the new data you wanted to add. + +# To see what tags an existing metric has + +Find the name string (e.g. grep for it), open it in an IDE, and just ask the +IDE to auto-complete a field access: + + yourTagsInstance. + +In Goland, VSCode, and likely elsewhere, this will give you a drop-down of all +fields inherited from all parents, for easy reading. + +# Best practices + +Use constant, in-line strings for metric names. Prometheus requires that each +"name" must have a stable set of tags, so there is no safety benefit to using a +const - generally speaking it must NOT be shared. + +Ad-hoc metrics are encouraged to use the convenience methods for simplicity. +When curious about something, just emit a metric and find out later (but watch +out for cardinality). + +Avoid pointers, both for the ...Tags struct and its values, to prevent mutation. +This also implies you should generally use "simple" and minimal field types, as +they will be copied repeatedly - avoid e.g. complex thrift objects. Hopefully +this will end up being nicer to the garbage collector than pointers everywhere. + +For any metrics (or "events" which have multiple metrics) you consider "stable" +or have alerts or dashboards based on, strongly consider declaring a method on +your ...Tags struct and emitting in there. This helps inform reviewers that +changing the metrics might cause problems elsewhere, and documents intent for +Cadence operators if they get an alert or see strange numbers. + +# Code generation customization + +Fields have two major options available: they can declare a custom to-string +conversion, and they can "reserve" a tag without defining a value: + + type YourTags struct { + Fancy protobuf.Thing `tag:"fancy" convert:"{{.}}.String()"` + Reserved struct{} `tag:"reserved"` + } + +Custom conversion is just a text/template string, where `.` will be filled in +with the field access (i.e. `y.Fancy`). Strings work automatically, and +integers (int, int32, and int64) will be automatically `strconv.Itoa`-converted, +but all other types will require custom conversion. As you cannot declare new +imports in this string, make sure you've imported any packages you need to +stringify a value in the same file as the ...Tags is declared. + +Reserved tags serve two purposes: + - They document that a tag will be emitted, so it can be discovered + - They reserve space in the map returned by `GetTags()`, so you can + efficiently add it at runtime + +Because reserved tags will not be filled in by convenience methods like `Count`, +they are almost exclusively useful for methods that emit specific metrics. + +For the simplest use cases, use a method on the ...Tags struct and add the tags +by hand: + + func (s SomethingTags) ItHappened(times int) { + tags := s.GetTags() // get all static tags + tags["reserved"] = fmt.Sprint(rand.Intn(10)) // add the reserved one(s) + s.Emitter.Count("it_happened", times, tags) // use the lower-level Emitter + } +*/ +package structured diff --git a/common/metrics/structured/histograms.go b/common/metrics/structured/histograms.go new file mode 100644 index 00000000000..6502be4cac1 --- /dev/null +++ b/common/metrics/structured/histograms.go @@ -0,0 +1,187 @@ +package structured + +import ( + "fmt" + "math" + "slices" + "time" + + "github.com/uber-go/tally" +) + +// Nearly all histograms should use pre-defined buckets here, to encourage consistency. +// Name them more by their intent than their exact ranges - if you need something outside the intent, +// make a new bucket. +// +// Extreme, rare cases should still create a new bucket here, not declare one in-line elsewhere. +// +// Partly this helps us document our ranges, and partly it ensures that any customized emitter +// can easily detect buckets with an `==` comparison, and can choose to use a different one +// if needed (e.g. to reduce scale if it is too costly to support). +var ( + // Default1ms10m is our "default" set of buckets, targeting 1ms through 100s, + // and is "rounded up" slightly to reach 80 buckets == 16 minutes (100s needs 68 buckets), + // plus multi-minute exceptions are common enough to support for the small additional cost. + // + // If you need sub-millisecond precision, or substantially longer durations than about 1 minute, + // consider using a different histogram. + Default1ms10m = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + return last >= 10*time.Minute && length == 80 + }) + + // High1ms24h covers things like activity latency, where "longer than 1 day" is not + // particularly worth being precise about. + // + // This uses a lot of buckets, so it must be used with relatively-low cardinality elsewhere, + // and/or consider dual emitting (Mid1ms24h or lower) so overviews can be queried efficiently. + High1ms24h = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool { + // 24h leads to 108 buckets, raise to 112 for cleaner subsetting + return last >= 24*time.Hour && length == 112 + }) + + // Mid1ms24h is a one-scale-lower version of High1ms24h, + // for use when we know it's too detailed to be worth emitting. + // + // This uses 57 buckets, half of High1ms24h's 113 + Mid1ms24h = High1ms24h.subsetTo(1) + + // Low1ms10s is for things that are usually very fast, like most individual database calls, + // and is MUCH lower cardinality than Default1ms10m so it's more suitable for things like per-shard metrics. + Low1ms10s = makeSubsettableHistogram(time.Millisecond, 1, func(last time.Duration, length int) bool { + // 10s needs 26 buckets, raise to 32 for cleaner subsetting + return last >= 10*time.Second && length == 32 + }) + + // Mid1To32k is a histogram for small counters, like "how many replication tasks did we receive". + // This targets 1 to 32k + Mid1To32k = IntSubsettableHistogram(makeSubsettableHistogram(1, 2, func(last time.Duration, length int) bool { + // 10k needs 56 buckets, raise to 64 for cleaner subsetting + return last >= 10000 && length == 64 + })) +) + +// SubsettableHistogram is a duration-based histogram that can be subset to a lower scale +// in a standardized, predictable way. It is intentionally compatible with Prometheus and OTEL's +// "exponential histograms": https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram +// +// These histograms MUST always have a "_ns" suffix in their name to avoid confusion with timers. +type SubsettableHistogram struct { + tally.DurationBuckets + + scale int +} + +// IntSubsettableHistogram is a non-duration-based integer-distribution histogram, otherwise identical +// to SubsettableHistogram. +// +// These histograms MUST always have a "_counts" suffix in their name to avoid confusion with timers, +// or modify the Emitter to allow different suffixes if something else reads better. +type IntSubsettableHistogram SubsettableHistogram + +// currently we have no apparent need for float-histograms, +// as all our value ranges go from >=1 to many thousands, where +// decimal precision is pointless and mostly just looks bad. +// +// if we ever need 0..1 precision in histograms, we can add them then. +// type FloatSubsettableHistogram struct { +// tally.ValueBuckets +// scale int +// } + +func (s SubsettableHistogram) subsetTo(newScale int) SubsettableHistogram { + if newScale >= s.scale { + panic(fmt.Sprintf("scale %v is not less than the current scale %v", newScale, s.scale)) + } + if newScale < 0 { + panic(fmt.Sprintf("negative scales (%v == greater than *2 per step) are possible, but do not have tests yet", newScale)) + } + dup := SubsettableHistogram{ + DurationBuckets: slices.Clone(s.DurationBuckets), + scale: s.scale, + } + // remove every other bucket per -1 scale + for dup.scale > newScale { + if (len(dup.DurationBuckets)-1)%2 != 0 { + panic(fmt.Sprintf("cannot subset from scale %v to %v, %v-buckets is not divisible by 2", dup.scale, dup.scale-1, len(dup.DurationBuckets)-1)) + } + half := make(tally.DurationBuckets, 0, ((len(dup.DurationBuckets)-1)/2)+1) + half = append(half, dup.DurationBuckets[0]) // keep the zero value + for i := 1; i < len(dup.DurationBuckets); i += 2 { + half = append(half, dup.DurationBuckets[i]) // add first, third, etc + } + dup.DurationBuckets = half + dup.scale-- + } + return dup +} + +func (i IntSubsettableHistogram) subsetTo(newScale int) IntSubsettableHistogram { + return IntSubsettableHistogram(SubsettableHistogram(i).subsetTo(newScale)) +} + +// makeSubsettableHistogram is a replacement for tally.MustMakeExponentialDurationBuckets, +// tailored to make "construct a range" or "ensure N buckets" simpler for OTEL-compatible exponential histograms +// (i.e. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram), +// ensures values are as precise as possible (preventing display-space-wasteful numbers like 3.99996ms), +// and that all histograms start with a 0 value (else erroneous negative values are impossible to notice). +// +// Start time must be positive, scale must be 0..3 (inclusive), and the loop MUST stop within 320 or fewer steps, +// to prevent extremely-costly histograms. +// Even 320 is far too many tbh, target 160 for very-high-value data, and generally around 80 or less (ideally a value +// that is divisible by 2 many times). +// +// The stop callback will be given the current largest value and the number of values generated so far. +// This excludes the zero value (you should ignore it anyway), so you can stop at your target value, +// or == a highly-divisible-by-2 number (do not go over). +// The buckets produced may be padded further to reach a "full" power-of-2 row, as this simplifies math elsewhere +// and costs very little compared to the rest of the histogram. +// +// For all values produced, please add a test to print the concrete values, and record the length and maximum value +// so they can be quickly checked when reading. +func makeSubsettableHistogram(start time.Duration, scale int, stop func(last time.Duration, length int) bool) SubsettableHistogram { + if start <= 0 { + panic(fmt.Sprintf("start must be greater than 0 or it will not grow exponentially, got %v", start)) + } + if scale < 0 || scale > 3 { + // anything outside this range is currently not expected and probably a mistake + panic(fmt.Sprintf("scale must be between 0 (grows by *2) and 3 (grows by *2^1/8), got scale: %v", scale)) + } + buckets := tally.DurationBuckets{ + time.Duration(0), // else "too low" and "negative" are impossible to tell apart. + // ^ note this must be excluded from calculations below, hence -1 everywhere. + } + for { + if len(buckets) > 320 { + panic(fmt.Sprintf("over 320 buckets is too many, choose a smaller range or smaller scale. "+ + "started at: %v, scale: %v, last value: %v", + start, scale, buckets[len(buckets)-1])) + } + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + + // stop when requested. + if stop(buckets[len(buckets)-1], len(buckets)-1) { + break + } + } + + // fill in as many buckets as are necessary to make a full "row", i.e. just + // before the next power of 2 from the original value. + // this ensures subsetting keeps "round" numbers as long as possible. + powerOfTwoWidth := int(math.Pow(2, float64(scale))) // num of buckets needed to double a value + for (len(buckets)-1)%powerOfTwoWidth != 0 { + buckets = append(buckets, nextBucket(start, len(buckets)-1, scale)) + } + return SubsettableHistogram{ + DurationBuckets: buckets, + scale: scale, + } +} + +func nextBucket(start time.Duration, num int, scale int) time.Duration { + // calculating it from `start` each time reduces floating point error, ensuring "clean" multiples + // at every power of 2 (and possibly others), instead of e.g. "1ms ... 1.9999994ms" which occurs + // if you try to build from the previous value each time. + return time.Duration( + float64(start) * + math.Pow(2, float64(num)/math.Pow(2, float64(scale)))) +} diff --git a/common/metrics/structured/histograms_test.go b/common/metrics/structured/histograms_test.go new file mode 100644 index 00000000000..7d47aa73e16 --- /dev/null +++ b/common/metrics/structured/histograms_test.go @@ -0,0 +1,132 @@ +package structured + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" +) + +func TestHistogramValues(t *testing.T) { + t.Run("default_1ms_to_10m", func(t *testing.T) { + checkHistogram(t, Default1ms10m) + assert.Equal(t, 81, Default1ms10m.len(), "wrong number of buckets") + assertBetween(t, 10*time.Minute, Default1ms10m.max(), 15*time.Minute) // roughly 14m 42s + }) + t.Run("high_1ms_to_24h", func(t *testing.T) { + checkHistogram(t, High1ms24h) + assert.Equal(t, 113, High1ms24h.len(), "wrong number of buckets") + assertBetween(t, 24*time.Hour, High1ms24h.max(), 64*time.Hour) // roughly 63h + }) + t.Run("mid_1ms_24h", func(t *testing.T) { + checkHistogram(t, Mid1ms24h) + assert.Equal(t, 57, Mid1ms24h.len(), "wrong number of buckets") + assertBetween(t, 12*time.Hour, Mid1ms24h.max(), 64*time.Hour) // roughly 53h + }) + t.Run("mid_to_32k_ints", func(t *testing.T) { + // note: this histogram has some duplicates: + // [0] + // [1 1 1 1] + // [2 2 2 3] + // [4 4 5 6] + // [8 9 11 13] + // ... + // + // this wastes a bit of memory, but Tally will choose the same index each + // time for a specific value, so it does not cause extra non-zero series + // to be stored or emitted. + // + // if this turns out to be expensive in Prometheus / etc, it's easy + // enough to start the histogram at 8, and dual-emit a non-exponential + // histogram for lower values. + checkHistogram(t, Mid1To32k) + assert.Equal(t, 65, Mid1To32k.len(), "wrong number of buckets") + assertBetween(t, 32_000, int(Mid1To32k.max()), 64_000) // 55108 + }) + t.Run("low_cardinality_1ms_10s", func(t *testing.T) { + checkHistogram(t, Low1ms10s) + assert.Equal(t, 33, Low1ms10s.len(), "wrong number of buckets") + assertBetween(t, 10*time.Second, Low1ms10s.max(), time.Minute) // roughly 46s + }) +} + +// test helpers, but could be moved elsewhere if they prove useful +func (s SubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s SubsettableHistogram) len() int { return len(s.DurationBuckets) } +func (s SubsettableHistogram) max() time.Duration { return s.DurationBuckets[len(s.DurationBuckets)-1] } +func (s SubsettableHistogram) buckets() tally.DurationBuckets { return s.DurationBuckets } + +func (s IntSubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s IntSubsettableHistogram) len() int { return len(s.DurationBuckets) } +func (s IntSubsettableHistogram) max() time.Duration { + return s.DurationBuckets[len(s.DurationBuckets)-1] +} +func (s IntSubsettableHistogram) buckets() tally.DurationBuckets { return s.DurationBuckets } + +type histogrammy interface { + SubsettableHistogram | IntSubsettableHistogram + + width() int + len() int + max() time.Duration + buckets() tally.DurationBuckets +} + +type numeric interface { + ~int | ~int64 +} + +func assertBetween[T numeric](t *testing.T, min, actual, max T, msgAndArgs ...interface{}) { + if actual < min || actual > max { + assert.Fail(t, fmt.Sprintf("value %v not between %v and %v", actual, min, max), msgAndArgs...) + } +} + +// most histograms should pass this check, but fuzzy comparison is fine if needed for extreme cases. +func checkHistogram[T histogrammy](t *testing.T, h T) { + printHistogram(t, h) + buckets := h.buckets() + assert.EqualValues(t, 0, buckets[0], "first bucket should always be zero") + for i := 1; i < len(buckets); i += h.width() { + if i > 1 { + // ensure good float math. + // + // this is *intentionally* doing exact comparisons, as floating point math with + // human-friendly integers have precise power-of-2 multiples for a very long time. + // + // note that the equivalent tally buckets, e.g.: + // tally.MustMakeExponentialDurationBuckets(time.Millisecond, math.Pow(2, 1.0/4.0)) + // fails this test, and the logs produced show ugly e.g. 31.999942ms values. + // it also produces incorrect results if you start at e.g. 1, as it never exceeds 1. + assert.Equalf(t, buckets[i-h.width()]*2, buckets[i], + "current row's value (%v) is not a power of 2 greater than previous (%v), skewed / bad math?", + buckets[i-h.width()], buckets[i]) + } + } +} + +func printHistogram[T histogrammy](t *testing.T, histogram T) { + switch h := (any)(histogram).(type) { + case SubsettableHistogram: + t.Log(h.buckets()[:1]) + for i := 1; i < len(h.buckets()); i += h.width() { + t.Log(h.buckets()[i : i+h.width()]) // display row by row for easier reading. + // ^ this will panic if the histograms are not an even multiple of `width`, + // that's also a sign that it's constructed incorrectly. + } + case IntSubsettableHistogram: + hi := make([]int, len(h.buckets())) // convert to int + for i, v := range h.buckets() { + hi[i] = int(v) + } + t.Log(hi[:1]) + for i := 1; i < len(hi); i += h.width() { + t.Log(hi[i : i+h.width()]) + } + default: + panic("unreachable") + } +} diff --git a/common/metrics/structured/operation.go b/common/metrics/structured/operation.go new file mode 100644 index 00000000000..03c3e168801 --- /dev/null +++ b/common/metrics/structured/operation.go @@ -0,0 +1,55 @@ +package structured + +import "github.com/uber/cadence/common/metrics" + +//go:generate metricsgen + +// OperationTags is an embeddable struct that provides a single "operation" tag. +// It cooperates in tag emission, but does not contain an emitter. +type OperationTags struct { + Operation string `tag:"operation"` // our historical "the thing doing something" name, unchanged for consistency +} + +// DynamicOperationTags is like OperationTags, but it's intended for places +// where we're dynamically choosing the operation per metric. +// +// This is intentionally an "incomplete" metric struct, and using it requires +// some manual work to populate the tags when emitting metrics. +// +// When this type is embedded, the generated code will require passing an +// operation int to GetTags and PutTags, so it cannot be forgotten. +// You are free to ignore this and use PutOperation instead, if you have the +// string already. +// +// skip:New skip:Convenience skip:PutTags skip:GetTags +type DynamicOperationTags struct { + DynamicOperation struct{} `tag:"operation"` // same intent as OperationTags +} + +// PutOperation sets the "operation" tag to the given string. +func (d DynamicOperationTags) PutOperation(op string, into DynamicTags) { + into["operation"] = op +} + +// PutTags is a custom implementation that requires an operation-metric int to +// be passed in, for safety. +func (d DynamicOperationTags) PutTags(operation int, into DynamicTags) { + into["operation"] = getOperationString(operation) +} + +// GetTags is a custom implementation that requires an operation-metric int to +// be passed in, for safety. +func (d DynamicOperationTags) GetTags(operation int) DynamicTags { + tags := make(DynamicTags, d.NumTags()) + d.PutTags(operation, tags) + return tags +} + +func getOperationString(op int) string { + for _, serviceOps := range metrics.ScopeDefs { + if serviceOp, ok := serviceOps[op]; ok { + return serviceOp.GetOperationString() // unit test ensures this is unique + } + } + return "unknown_operation_int" // should never happen +} diff --git a/common/metrics/structured/operation_metrics_gen.go b/common/metrics/structured/operation_metrics_gen.go new file mode 100644 index 00000000000..d2974f83026 --- /dev/null +++ b/common/metrics/structured/operation_metrics_gen.go @@ -0,0 +1,46 @@ +package structured + +// Code generated ./internal/tools/metricsgen; DO NOT EDIT + +// NewOperationTags constructs a new metric-tag-holding OperationTags, +// and it must be used, instead of custom initialization to ensure newly added +// tags can be detected at compile time instead of missing them at run time. +func NewOperationTags( + Operation string, +) OperationTags { + o := OperationTags{ + Operation: Operation, + } + return o +} + +// NumTags returns the number of tags that are intended to be written in all +// cases. This will include all embedded parent tags and all reserved tags, +// and is intended to be used to pre-allocate maps of tags. +func (o OperationTags) NumTags() int { + num := 1 // num of self fields + num += 0 // num of reserved fields + return num +} + +// PutTags writes this set of tags (and its embedded parents) to the passed map. +func (o OperationTags) PutTags(into DynamicTags) { + into["operation"] = o.Operation +} + +// GetTags is a minor helper to get a pre-allocated-and-filled map with room +// for reserved fields (i.e. 'struct{}' type fields, which only declare intent). +func (o OperationTags) GetTags() DynamicTags { + tags := make(DynamicTags, o.NumTags()) + o.PutTags(tags) + return tags +} + +// NumTags returns the number of tags that are intended to be written in all +// cases. This will include all embedded parent tags and all reserved tags, +// and is intended to be used to pre-allocate maps of tags. +func (d DynamicOperationTags) NumTags() int { + num := 0 // num of self fields + num += 1 // num of reserved fields + return num +} diff --git a/common/resource/params.go b/common/resource/params.go index 7e48a56d220..aaed4ff26c5 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -42,6 +42,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" persistenceClient "github.com/uber/cadence/common/persistence/client" "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/rpc" @@ -68,6 +69,7 @@ type ( ClusterMetadata cluster.Metadata ReplicatorConfig config.Replicator MetricsClient metrics.Client + Emitter structured.Emitter MessagingClient messaging.Client BlobstoreClient blobstore.Client ESClient es.GenericClient diff --git a/common/resource/resource_impl.go b/common/resource/resource_impl.go index 48a9be7bd66..bd50919328d 100644 --- a/common/resource/resource_impl.go +++ b/common/resource/resource_impl.go @@ -57,6 +57,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -98,6 +99,7 @@ type Impl struct { timeSource clock.TimeSource payloadSerializer persistence.PayloadSerializer metricsClient metrics.Client + emitter structured.Emitter messagingClient messaging.Client blobstoreClient blobstore.Client archivalMetadata archiver.ArchivalMetadata @@ -360,6 +362,7 @@ func New( timeSource: clock.NewRealTimeSource(), payloadSerializer: persistence.NewPayloadSerializer(), metricsClient: params.MetricsClient, + emitter: params.Emitter, messagingClient: params.MessagingClient, blobstoreClient: params.BlobstoreClient, archivalMetadata: params.ArchivalMetadata, @@ -534,6 +537,11 @@ func (h *Impl) GetMetricsClient() metrics.Client { return h.metricsClient } +// GetMetricsEmitter returns a base structured metrics emitter +func (h *Impl) GetMetricsEmitter() structured.Emitter { + return h.emitter +} + // GetMessagingClient return messaging client func (h *Impl) GetMessagingClient() messaging.Client { return h.messagingClient diff --git a/common/resource/resource_mock.go b/common/resource/resource_mock.go index 58127259301..1fbf82491cd 100644 --- a/common/resource/resource_mock.go +++ b/common/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" rpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -498,6 +499,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/common/resource/resource_test_utils.go b/common/resource/resource_test_utils.go index 18249940c32..b40e89640fe 100644 --- a/common/resource/resource_test_utils.go +++ b/common/resource/resource_test_utils.go @@ -52,6 +52,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" @@ -74,6 +75,7 @@ type ( TimeSource clock.TimeSource PayloadSerializer persistence.PayloadSerializer MetricsClient metrics.Client + Emitter structured.Emitter ArchivalMetadata *archiver.MockArchivalMetadata ArchiverProvider *provider.MockArchiverProvider BlobstoreClient *blobstore.MockClient @@ -188,6 +190,7 @@ func NewTest( TimeSource: clock.NewRealTimeSource(), PayloadSerializer: persistence.NewPayloadSerializer(), MetricsClient: metrics.NewClient(scope, serviceMetricsIndex), + Emitter: structured.NewTestEmitter(t, scope), ArchivalMetadata: &archiver.MockArchivalMetadata{}, ArchiverProvider: &provider.MockArchiverProvider{}, BlobstoreClient: &blobstore.MockClient{}, @@ -295,6 +298,11 @@ func (s *Test) GetMetricsClient() metrics.Client { return s.MetricsClient } +// GetMetricsEmitter for testing +func (s *Test) GetMetricsEmitter() structured.Emitter { + return s.Emitter +} + // GetMessagingClient for testing func (s *Test) GetMessagingClient() messaging.Client { panic("user should implement this method for test") diff --git a/common/resource/types.go b/common/resource/types.go index fdc3701ece7..11920e32339 100644 --- a/common/resource/types.go +++ b/common/resource/types.go @@ -47,6 +47,7 @@ import ( "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" qrpc "github.com/uber/cadence/common/quotas/global/rpc" @@ -79,6 +80,7 @@ type Resource interface { GetTimeSource() clock.TimeSource GetPayloadSerializer() persistence.PayloadSerializer GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetArchiverProvider() provider.ArchiverProvider GetMessagingClient() messaging.Client GetBlobstoreClient() blobstore.Client diff --git a/internal/tools/metricsgen/main.go b/internal/tools/metricsgen/main.go new file mode 100644 index 00000000000..73f5d6eeaf7 --- /dev/null +++ b/internal/tools/metricsgen/main.go @@ -0,0 +1,620 @@ +package main + +import ( + "context" + "flag" + "fmt" + "go/ast" + "go/parser" + "go/token" + "io" + "log" + "maps" + "os" + "os/exec" + "strings" + "text/template" + "time" + + "github.com/fatih/structtag" +) + +// some ugly globals for easier error reporting +var ( + FSET *token.FileSet + SRC []byte + VERBOSE bool +) + +// list of all skippable things, so mis-spellings and whatnot can be caught. +var skipNames = map[string]bool{ + "New": true, + "NumTags": true, + "PutTags": true, + "GetTags": true, + "Convenience": true, + "Inc": true, + "Count": true, + "Gauge": true, + "Histogram": true, + "IntHistogram": true, +} + +// intermediate product, structs that should be inspected further +type namedStruct struct { + Name string + Node *ast.StructType + Skip map[string]bool +} + +type genField struct { + Name string + MetricTag string + Convert string + Imported string + Type string +} +type constructorField struct { + Name string + Type string +} +type genEmbed struct { + Name string + Imported string + Type string +} + +// struct to generate +type gen struct { + Name string + Self string + Fields []genField + Embeds []genEmbed + ConstructorOnlyFields []constructorField // private non-tag fields that need to go in the constructor + DynamicOperationTags bool // requires special handling + Emitter bool // requires special handling + NumReserved int + Skip map[string]bool + NewFunc string // if the tags struct is private, this will be "newThing", to make the constructor private. else it is "NewThing". + StructuredPkg string // "structured." anywhere outside the structured package +} + +// Metricsgen is a code generator to simplify creating structured metrics based on +// the patterns in [github.com/uber/cadence/common/metrics/structured]. +// +// To use, just add a `//go:generate metricsgen` in the source file with any new +// `...Tags` structs, and run `make metrics`. +// `make go-generate` will also run this implicitly, but it is far slower. +func main() { + // I would highly suggest exploring https://caixw.github.io/goast-viewer/index.html a bit + // (forked from https://yuroyoro.github.io/goast-viewer/ but with support for generics) + // to get a feel for how the AST is structured, and to see if a newly-desired + // syntax/feature is recognizable at the AST level. + // + // if it is not, this may need to be changed to use `packages.Load` to get type info, + // but that is DRAMATICALLY slower and I would prefer to avoid it until necessary. + // the only tolerable way to do that is to process the whole repository in a single + // pass, and generate everything at once. + + // Currently this finds all "...Tags" things and assumes they are StructTags for metrics. + // If that becomes a problem, just optionally pass a list of args == names of types to generate, + // and ignore the rest. Should be very easy. + + flag.BoolVar(&VERBOSE, "v", false, "verbose output, e.g. print all types found") + + log.SetFlags(log.Lshortfile) // TODO: I really can't stand this log package, replace? + filename := os.Getenv("GOFILE") + FSET = token.NewFileSet() + var err error + SRC, err = os.ReadFile(filename) // hold onto the source code so errors can show the line of text that's wrong + if err != nil { + log.Fatal(err) + } + f, err := parser.ParseFile(FSET, filename, SRC, parser.SkipObjectResolution|parser.ParseComments) + if err != nil { + log.Fatal(err) + } + + interesting := findStructs(f) + if len(interesting) == 0 { + log.Fatalf("no interesting structs (named '...Tags') found in %v, bad go generate or bad definition?", filename) + } + + // Metric-related fields must be singular (no comma-separated lists), public, and either named or embedded. + // These fields require struct tags: + // - `tag:"..."` to define the tag key name (so it's easily greppable) + // - optionally `convert:"..."` to change how they're stringified (else primitive ints are `Itoa` and any others are untouched) + structuredPkg := "structured." + if os.Getenv("GOPACKAGE") == "structured" { + structuredPkg = "" + } + toGenerate := getGenStructs(interesting, structuredPkg) + + basename := strings.TrimSuffix(filename, ".go") + var out io.Writer // smaller interface so it can be replaced with os.Stdout for manual testing + out, err = os.OpenFile(basename+"_metrics_gen.go", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + log.Fatal(err) + } + + // package declaration + write(out, "package %v", os.Getenv("GOPACKAGE")) + write(out, "") // and a blank line + + // magic generated-source comment + write(out, "// Code generated ./internal/tools/metricsgen; DO NOT EDIT") + write(out, "") + + write(out, "import (") + + // copy all imports, possibly duplicating, and add fmt if needed. + // goimports will deduplicate and remove any as needed. + imports := append(f.Imports, []*ast.ImportSpec{ + {Path: &ast.BasicLit{Value: `"fmt"`}}, // for strconv + {Path: &ast.BasicLit{Value: `"time"`}}, // for Histogram's time.Duration + // {Path: &ast.BasicLit{Value: `"github.com/uber/cadence/common/metrics"`}}, // for PutOperationTags + }...) + if structuredPkg != "" { + // add the import for emitter and histogram types + imports = append(imports, &ast.ImportSpec{Path: &ast.BasicLit{Value: `"github.com/uber/cadence/common/metrics/structured"`}}) + } + for _, i := range imports { + // i.Path.Value already has quotes + if i.Name != nil { + write(out, "\t%s %s", i.Name, i.Path.Value) + } else { + write(out, "\t%s", i.Path.Value) + } + } + write(out, ")") + + // generate the template + for i, t := range toGenerate { + if i > 0 { + write(out, "") // blank line between things + } + err := tmpl.Execute(out, t) + if err != nil { + log.Fatalf("error generating code for %v: %v", t.Name, err) + } + } +} + +func findStructs(f *ast.File) []namedStruct { + // ast's Doc and Comment fields are somewhat confusing and inconsistent, at + // least as far as how humans think of it, and there are many empty and/or + // verbose-to-gather ways to get comments. + // + // so this uses the CommentMap, which acts like humans think of comments: + // all text above or on the same line are considered "comments" on a thing. + cm := ast.NewCommentMap(FSET, f, f.Comments) + + var results []namedStruct + // range over the top-level declarations in the file, find *Tags to generate. + // this conveniently excludes any in-func types. + for _, d := range f.Decls { + gd, ok := d.(*ast.GenDecl) + if !ok || gd.Tok != token.TYPE { + continue // only care about type declarations + } + for _, spec := range gd.Specs { + ts, ok := spec.(*ast.TypeSpec) + if !ok || ts.Name == nil || ts.Name.Name == "" { + continue // only care about named types + } + if !strings.HasSuffix(ts.Name.Name, "Tags") { + continue // only care about type names ending in "Tags" + } + st, ok := ts.Type.(*ast.StructType) + if !ok || st.Fields == nil || len(st.Fields.List) == 0 { + // could be allowed, but seems more likely to be a mistake + log.Fatal(invalidCodeMsg(ts, "struct %v looks like a structured ...Tags type, but it has no fields", ts.Name.Name)) + panic("unreachable") + } + + verbosef("found struct %v", ts.Name.Name) + found := namedStruct{ + Name: ts.Name.Name, + Node: st, + Skip: map[string]bool{}, + } + + // check struct comments for skip tags. + // use `gd`, not `ts/st`, to gather all comments regardless of how + // they are technically placed in the AST. + for _, cg := range cm.Filter(gd).Comments() { + for _, word := range strings.Fields(cg.Text()) { // splits on whitespace + if toskip, ok := strings.CutPrefix(word, "skip:"); ok { + // validate the word + if !skipNames[toskip] { + // it's possible to point to the exact chars in the comment that are wrong, but calculating + // that is a bit fiddly because there isn't a convenient way to strip off comment decorator + // chars line by line. Text() does that for us, and the struct decl is close enough to the comment. + log.Fatal(invalidCodeMsg(gd, "unknown skip marker %q, must be one of: %v", toskip, maps.Keys(skipNames))) + } + verbosef("found skip %q (comment %q) for %v", toskip, word, found.Name) + found.Skip[toskip] = true + } + } + } + results = append(results, found) + } + } + return results +} + +func getGenStructs(interesting []namedStruct, structuredPkg string) []gen { + var results []gen + for _, s := range interesting { + var newFunc string + if s.Name[0:1] == strings.ToLower(s.Name[0:1]) { + // private constructor, uppercase the name so it reads nicely + newFunc = "new" + strings.ToUpper(s.Name[0:1]) + s.Name[1:] + } else { + // public struct, so the constructor should be public too + newFunc = "New" + s.Name + } + g := gen{ + Name: s.Name, + Skip: s.Skip, + Self: strings.ToLower(s.Name[0:1]), + NewFunc: newFunc, + StructuredPkg: structuredPkg, + } + for _, f := range s.Node.Fields.List { + switch len(f.Names) { + case 0: // embed + imported, id, ascode := sourceType(f.Type) + if id.Name == "Emitter" { + g.Emitter = true + continue // emitter always has special handling + } + if id.Name == "DynamicOperationTags" { + if ascode == "structured.DynamicOperationTags" { + verbosef("found embedded %v, customizing codegen", ascode) + g.DynamicOperationTags = true + continue + } else { + // could be allowed, but more likely a mistake until it proves needed. + log.Fatal(invalidCodeMsg(f, "DynamicOperationTags must be embedded as structured.DynamicOperationTags")) + } + } + if !strings.HasSuffix(id.Name, "Tags") { + log.Fatal(invalidCodeMsg(f, `embedded types must end with "Tags", as they must conform to the metrics-interface`)) + } + + g.Embeds = append(g.Embeds, genEmbed{ + Name: id.Name, + Imported: imported, + Type: ascode, + }) + case 1: // named field + fname := f.Names[0] + if !fname.IsExported() { + // private field, see if it needs to be added to the constructor but don't otherwise handle it + _, _, ascode := sourceType(f.Type) // type can be private, e.g. `string` + g.ConstructorOnlyFields = append(g.ConstructorOnlyFields, constructorField{ + Name: fname.Name, + Type: ascode, + }) + continue + } + + // ...Tags types cannot be pointers, but others are allowed + if strings.HasSuffix(fname.Name, "Tags") { + if _, ok := f.Type.(*ast.StarExpr); ok { + log.Fatal(invalidCodeMsg(f, "embedded Tags structs cannot be pointers")) + } + } + imported, _, ascode := sourceType(f.Type) // type can be private, e.g. `string` + metricTag := getNameTag(f) // the tag's name is always required + if ascode == "struct{}" { + g.NumReserved++ + // special placeholder field, ignore it. + // these serve to give type-hints that a field *will* exist, + // but the needs for it are too dynamic to pre-populate. + continue + } + + convertTag := getConvertTag(f) + if convertTag == "" { + if isInt(f.Type) { + convertTag = `fmt.Sprintf("%d", {{.}})` + } else { + convertTag = "{{.}}" // defaults to assuming the field is a string, included verbatim + } + } + // run the template to get the source code to stringify this field + convertTmpl := template.Must(template.New("").Parse(convertTag)) + var out strings.Builder + err := convertTmpl.Execute(&out, g.Self+"."+fname.Name) + if err != nil { + // use the original tag, not the empty-replaced one + log.Fatal(invalidCodeMsg(f, "bad convert tag, must be a valid text template or empty: convert:%q\nerror: %v", getConvertTag(f), err)) + } + g.Fields = append(g.Fields, genField{ + Name: fname.Name, + MetricTag: metricTag, + Convert: out.String(), + Imported: imported, + Type: ascode, + }) + default: + // comma-separated, never allowed + log.Fatal(invalidCodeMsg(f, "cannot have comma-separated fields as struct tags must be unique")) + } + } + results = append(results, g) + } + return results +} + +func verbosef(format string, args ...any) { + if VERBOSE { + fmt.Printf(format+"\n", args...) + } +} + +// --- tag helpers --- + +func getNameTag(f *ast.Field) string { + value := getTag(f, "tag") + if value == "" { + // must have tag + log.Fatal(invalidCodeMsg(f, "metric tags must have an explicit `tag:\"name\"`")) + } + return value +} + +func getConvertTag(f *ast.Field) string { + value := getTag(f, "convert") // currently a single string, could be complexified + if value == "" { + return "" + } + if !strings.Contains(value, "{{") { + // malformed convert. + // technically possible to use e.g. a static value or computed from other fields, but that's less likely + // and would probably be better done with a reserved field / custom PutTags instead. + log.Fatal(invalidCodeMsg(f, "convert tags must contain a `{{ . }}` template where the field will be interpolated")) + } + // valid format + return value +} + +func getTag(f *ast.Field, name string) string { + if f.Tag == nil { + return "" + } + // trim off the outer ` characters, as the AST's value keeps them + st, err := structtag.Parse(strings.Trim(f.Tag.Value, "`")) + if err != nil { + log.Fatal(invalidCodeMsg(f, "bad tag format: %q, err: %v", f.Tag.Value, err)) + } + t, err := st.Get(name) + _ = err // no defined type for "tag does not exist", just use t's presence + if t == nil { + return "" + } + return t.Value() +} + +// --- type checking helpers --- + +func isInt(fieldType ast.Expr) bool { + // sourceType already restricted to these types, + // no need to check others exhaustively. + switch t := fieldType.(type) { + case *ast.StarExpr: + return false // no reliable fallback, force custom handling + case *ast.Ident: + return t.Name == "int" || t.Name == "int32" || t.Name == "int64" + case *ast.SelectorExpr: + return false // imported types cannot be builtin ints + case *ast.StructType: + if t.Fields == nil || len(t.Fields.List) == 0 { + return false // `struct{}` is allowed and not an int + } + } + log.Fatal(invalidCodeMsg(fieldType, "unknown field type %T: %#v", fieldType, fieldType)) + panic("unreachable") +} + +// given a field type node, returns: +// - pkg.Type: "pkg", Type, "pkg.Type" +// - Type: "", Type, "Type" +func sourceType(fieldType ast.Expr) (imported string, name *ast.Ident, ascode string) { + if t, ok := fieldType.(*ast.StarExpr); ok { + imported, name, ascode = sourceType(t.X) // recurse to get the inner type + return imported, name, "*" + ascode // restore the pointer + } + + switch t := fieldType.(type) { + case *ast.Ident: // bare identifier, no package. local type or builtin. + return "", t, t.Name + case *ast.SelectorExpr: // thing.Thing == X.Sel + if x, ok := t.X.(*ast.Ident); ok { + imported = x.Name + return imported, t.Sel, imported + "." + t.Sel.Name + } + case *ast.StructType: + if t.Fields == nil || len(t.Fields.List) == 0 { + return "", nil, "struct{}" + } + } + log.Fatal(invalidCodeMsg(fieldType, "unknown field type %T, must be `Type` or `pkg.Type`: %#v", fieldType, fieldType)) + panic("unreachable") +} + +// --- error reporting funcs, to show file:line and the contents of the line for easier troubleshooting --- + +func invalidCodeMsg(node ast.Node, msg string, args ...interface{}) string { + path, source := getSourceCodeLine(node) + msg = fmt.Sprintf(msg, args...) + return fmt.Sprintf("%s\n%s: %s", msg, path, source) +} + +func getSourceCodeLine(node ast.Node) (path string, source string) { + // seems silly that getting the source code from the AST requires holding on to the source code. + // but eh, I suppose it's more memory-efficient? + pos := FSET.Position(node.Pos()) + l := pos.Line + lines := strings.Split(string(SRC), "\n") + if l > len(lines)-1 { + // should not be possible, at least without `//line` directives + log.Fatalf("source line too large (%d > %d)", l, len(lines)-1) + } + // pos only contains filename, not full path + p, err := os.Getwd() + if err != nil { + // bad shell location, basically + log.Fatal("could not os.Getwd(), does your current folder still exist?", err) + } + // figure out the shared part of the absolute path, if possible. + // 1s timeout is far more than is necessary locally, unless it needs to download, + // and we don't want to wait for that because it won't show any output. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cmd := exec.CommandContext(ctx, "go", "list", "-m", "-f", "{{.Dir}}", "github.com/uber/cadence") + cmd.Stderr = os.Stderr + st, err := cmd.Output() + if err == nil { + // remove the path to the main module from the absolute path, it's large and annoying + moduleRoot := strings.TrimSpace(string(st)) // remove the newline + p = strings.TrimPrefix(p, moduleRoot+"/") + } // else just give up and use the absolute path, it's long but not wrong. + return p + "/" + pos.String(), lines[l-1] +} + +// --- template helpers --- + +func write(out io.Writer, format string, args ...any) { + _, err := fmt.Fprintf(out, format+"\n", args...) + if err != nil { + log.Fatal("failed to write generated code to output:", err) + } +} + +// $self is used instead of .Self because it gets lost when ranging (changes `.` scope) +var tmpl = template.Must(template.New("").Parse(` +{{- $self := .Self }} +{{- if not .Skip.New }} + // {{.NewFunc}} constructs a new metric-tag-holding {{.Name}}, + // and it must be used, instead of custom initialization to ensure newly added + // tags can be detected at compile time instead of missing them at run time. + func {{.NewFunc}}( + {{- if .Emitter }} + emitter {{.StructuredPkg}}Emitter, + {{- end }} + {{- range .Embeds }} + {{ .Name }} {{ .Type }}, + {{- end }} + {{- range .ConstructorOnlyFields }} + {{ .Name }} {{ .Type }}, + {{- end }} + {{- range .Fields }} + {{ .Name }} {{ .Type }}, + {{- end }} + ) {{ .Name }} { + {{$self}} := {{ .Name }}{ + {{- if .Emitter }} + Emitter: emitter, + {{- end }} + {{- range .Embeds }} + {{ .Name }}: {{ .Name }}, + {{- end }} + {{- range .ConstructorOnlyFields }} + {{ .Name }}: {{ .Name }}, + {{- end }} + {{- range .Fields }} + {{ .Name }}: {{ .Name }}, + {{- end }} + } + return {{$self}} + } +{{- end }} + +{{- if not .Skip.NumTags }} + // NumTags returns the number of tags that are intended to be written in all + // cases. This will include all embedded parent tags and all reserved tags, + // and is intended to be used to pre-allocate maps of tags. + func ({{$self}} {{.Name}}) NumTags() int { + num := {{ .Fields | len }} // num of self fields + num += {{ .NumReserved }} // num of reserved fields + {{- range .Embeds }} + num += {{$self}}.{{ .Name }}.NumTags() + {{- end }} + {{- if .DynamicOperationTags }} + num += {{$self}}.DynamicOperationTags.NumTags() + {{- end }} + return num + } +{{- end }} + +{{- if not .Skip.PutTags }} + // PutTags writes this set of tags (and its embedded parents) to the passed map. + func ({{$self}} {{.Name}}) PutTags({{ if .DynamicOperationTags }}operation int, {{ end }}into {{.StructuredPkg}}DynamicTags) { + {{- range .Embeds }} + {{$self}}.{{ .Name }}.PutTags(into) + {{- end }} + {{- if .DynamicOperationTags }} + {{$self}}.DynamicOperationTags.PutTags(operation, into) + {{- end }} + {{- range .Fields }} + into["{{.MetricTag}}"] = {{ .Convert }} + {{- end }} + } +{{- end }} + +{{- if not .Skip.GetTags }} + // GetTags is a minor helper to get a pre-allocated-and-filled map with room + // for reserved fields (i.e. 'struct{}' type fields, which only declare intent). + func ({{$self}} {{.Name}}) GetTags({{ if .DynamicOperationTags }}operation int, {{ end }}) {{.StructuredPkg}}DynamicTags { + tags := make({{.StructuredPkg}}DynamicTags, {{$self}}.NumTags()) + {{$self}}.PutTags({{ if .DynamicOperationTags }}operation, {{ end }}tags) + return tags + } +{{- end }} + +{{- if and (or .Emitter .Embeds) (not .Skip.Convenience) }} + // convenience methods + {{/* ---- forcing a blank line */}} + {{- if not .Skip.Inc }} + // Inc increments a named counter with the tags on this struct. + func ({{$self}} {{.Name}}) Inc(name string) { + {{$self}}.Count(name, 1) + } + {{- end }} + + {{- if not .Skip.Count }} + // Count adds to a named counter with the tags on this struct. + func ({{$self}} {{.Name}}) Count(name string, num int) { + {{$self}}.Emitter.Count(name, num, {{$self}}) + } + {{- end }} + + {{- if not .Skip.Gauge }} + // Gauge adds to a named gauge with the tags on this struct. + func ({{$self}} {{.Name}}) Gauge(name string, num float64) { + {{$self}}.Emitter.Gauge(name, num, {{$self}}) + } + {{- end }} + + {{- if not .Skip.Histogram }} + // Histogram records a histogram with the specified buckets. + // + // Buckets should essentially ALWAYS be one of the pre-defined values in [github.com/uber/cadence/common/metrics/structured]. + func ({{$self}} {{.Name}}) Histogram(name string, buckets {{.StructuredPkg}}SubsettableBuckets, dur time.Duration) { + {{$self}}.Emitter.Histogram(name, buckets, dur, {{$self}}) + } + {{- end }} + + {{- if not .Skip.IntHistogram }} + // IntHistogram records an integer-based histogram with the specified buckets. + // + // Buckets should essentially ALWAYS be one of the pre-defined values in [github.com/uber/cadence/common/metrics/structured]. + func ({{$self}} {{.Name}}) Histogram(name string, buckets {{.StructuredPkg}}IntSubsettableBuckets, value int) { + {{$self}}.Emitter.IntHistogram(name, buckets, value, {{$self}}) + } + {{- end }} +{{- end }} +`)) diff --git a/internal/tools/metricslint/analyzer.go b/internal/tools/metricslint/analyzer.go new file mode 100644 index 00000000000..9704caf1ce4 --- /dev/null +++ b/internal/tools/metricslint/analyzer.go @@ -0,0 +1,283 @@ +package metricslint + +import ( + "fmt" + "go/ast" + "go/token" + "go/types" + "maps" + "slices" + "strings" + + "golang.org/x/tools/go/analysis" + "golang.org/x/tools/go/analysis/passes/inspect" + "golang.org/x/tools/go/ast/inspector" +) + +const ( + emitterPkg = "github.com/uber/cadence/common/metrics/structured" + emitterTypeName = emitterPkg + ".Emitter" +) + +// Analyzer reports all (inline string) metric names passed to Emitter methods or defined in metrics/defs.go. +// +// This is NOT intended to be used directly, it's just using the analysis framework to simplify type checking. +// The output needs to be checked for duplicates or error messages by a separate process (see cmd/main.go). +var Analyzer = &analysis.Analyzer{ + Name: "metricnames", + Doc: "finds calls to metrics-emitting methods on Emitter, and metricDefinition names, and reports them for further analysis", + Requires: []*analysis.Analyzer{inspect.Analyzer}, + Run: run, +} + +// method names on the emitter that we care about, and the suffixes that are required (if any). +// method names are checked against the actual methods on the emitter type, +// but we need the list ahead of time or passes might be racing with initializing it. +var emitterMethods = map[string]map[string]bool{ + "Histogram": {"_ns": true}, // all our durations are in nanoseconds + "IntHistogram": {"_counts": true}, // differentiates from durations, will likely have multiple suffixes + "Count": nil, + "Gauge": nil, +} + +func run(pass *analysis.Pass) (interface{}, error) { + i := pass.ResultOf[inspect.Analyzer].(*inspector.Inspector) + + if pass.Pkg.Path() == "github.com/uber/cadence/common/metrics" { + // report all hard-coded metricDefinition names, so we can make sure + // there are no duplicates between the integers and the const strings + // passed to the emitter. + // + // if you want a duplicate: no, you really don't. + // use a different name, and once verified just swap to the old name and + // delete the integer def. otherwise you risk double-counting events if + // both are emitted, and can't tell if your tags are wrong. + // + // if you're swapping calls in one step, also delete the old definition. + // + // if you really do have an exception, modify this linter to allow it. + reportMetricDefinitionNames(pass, i) + // and DO NOT return, in case there are emitter calls in this package too, + // though currently that would imply an import loop. + } else if pass.Pkg.Path() == emitterPkg { + // read the methods on the emitter and the target methods, and make sure they're in sync. + checkTargetNamesMap(pass) + } + + // print calls to metrics-emitting methods on Emitter, regardless of location + reportMetricEmitterCalls(pass, i) + + return nil, nil +} + +func reportMetricDefinitionNames(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.CompositeLit)(nil), + } + i.Preorder(nodeFilter, func(n ast.Node) { + decl := n.(*ast.CompositeLit) + var metricNameField *ast.KeyValueExpr + for _, el := range decl.Elts { + kv, ok := el.(*ast.KeyValueExpr) + if !ok { + return // not a struct initializer + } + ident, ok := kv.Key.(*ast.Ident) + if !ok { + return // dynamic key in a dict or something, ignore + } + if ident.Name == "metricName" { + metricNameField = kv // currently this is exclusively used by metricDefinition + } + break + } + if metricNameField == nil { + return + } + str, ok := getConstantString(metricNameField.Value) + if !ok { + pass.Reportf(metricNameField.Pos(), "metricDefinition{metricName: ...} value must be an inline string literal") + return + } + pass.Reportf(metricNameField.Pos(), "success: %v", str) + }) +} + +func checkTargetNamesMap(pass *analysis.Pass) { + named, ok := pass.Pkg.Scope().Lookup("Emitter").Type().(*types.Named) + if !ok { + pass.Reportf(token.NoPos, "could not find Emitter type in %v", emitterPkg) + return + } + ms := types.NewMethodSet(named) + foundTargets := make(map[string]map[string]bool, len(emitterMethods)) + maps.Copy(foundTargets, emitterMethods) + for i := 0; i < ms.Len(); i++ { + m := ms.At(i) + _, ok := emitterMethods[m.Obj().Name()] + if !ok { + foundTargets[m.Obj().Name()] = nil // add any missing keys to the map + } else { + delete(foundTargets, m.Obj().Name()) // remove any found ones from the map + } + } + if len(foundTargets) != 0 { + pass.Reportf(named.Obj().Pos(), "target methods do not match Emitter methods, diff: %v", foundTargets) + } +} + +func reportMetricEmitterCalls(pass *analysis.Pass, i *inspector.Inspector) { + nodeFilter := []ast.Node{ + (*ast.File)(nil), // to skip test files + (*ast.FuncDecl)(nil), // to skip test helper funcs + (*ast.CallExpr)(nil), + } + i.Nodes(nodeFilter, func(n ast.Node, push bool) (proceed bool) { + // always descend by default, in case the call contains a closure that emits metrics. + // this covers the sync.Once case in the testdata, for example. + proceed = true + if !push { + return // do nothing when ascending the ast tree + } + + // check for test files, ignore their content if found + file, ok := n.(*ast.File) + if ok { + filename := pass.Fset.Position(file.Pos()).Filename + if strings.HasSuffix(filename, "_test.go") { + return false // don't inspect test files + } + return + } + + // check for test helper funcs anywhere, ignore their content if found. + // these are identified by a *testing.T as their first arg. + if fn, ok := n.(*ast.FuncDecl); ok { + if len(fn.Type.Params.List) > 0 { + firstArgType := pass.TypesInfo.TypeOf(fn.Type.Params.List[0].Type) + asStr := types.TypeString(firstArgType, nil) // "full/path/to.Type" + if asStr == "*testing.T" { + return false // don't inspect test helpers + } + } + return + } + + call := n.(*ast.CallExpr) // only other possibility due to nodeFilter + + // check if this is a method call (receiver.Method() == X.Sel) + // for one of the names we care about (as an early / efficient check) + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return + } + methodName := sel.Sel.Name + requiredSuffixes, ok := emitterMethods[methodName] + if !ok { + return + } + + // check if the receiver type is one we care about + receiver := pass.TypesInfo.TypeOf(sel.X) + // pointer receivers are not allowed, complain if found. + // if we just ignore them here, we might miss a metric call with a duplicate name. + isPointerReceiver := false + ptr, ok := receiver.(*types.Pointer) + for ; ok; ptr, ok = receiver.(*types.Pointer) { + isPointerReceiver = true + receiver = ptr.Elem() + } + named, ok := receiver.(*types.Named) + if !ok { + // allowed by the type system, but should be impossible in a CallExpr + pass.Reportf(sel.Pos(), "anonymous receiver of a method call should be impossible?") + return + } + fullName := getPkgAndName(named) + if fullName != emitterTypeName { + return // apparently not the emitter, it just has similarly-named methods (e.g. Tally). ignore it. + } + + // at this point we know that it's "a metrics method", make sure it's valid. + + if isPointerReceiver { + // could be allowed with a bit more .Elem() dereferencing in this analyzer, + // but currently blocked to intentionally force value types. + pass.Reportf(sel.Pos(), "pointer receivers are not allowed on metrics emission calls: %v", types.ExprString(sel)) + return + } + if len(call.Args) < 2 { + // 0 or 1 arg == not currently in use + pass.Reportf(call.Pos(), "method call %v looks like a metrics method, but has too few args", call) + return + } + + // pull out the first arg, and make sure it's a constant inline string. + nameArg := call.Args[0] + metricName, isConstant := getConstantString(nameArg) + if !isConstant { + pass.Reportf(nameArg.Pos(), "metric names must be in-line strings, not consts or vars: %v", nameArg) + return + } + if len(requiredSuffixes) > 0 { + matched := false + for suffix := range requiredSuffixes { + if strings.HasSuffix(metricName, suffix) { + matched = true + break + } + } + if !matched { + suffixes := make([]string, 0, len(requiredSuffixes)) + for s := range requiredSuffixes { + suffixes = append(suffixes, fmt.Sprintf("%q", s)) + } + slices.Sort(suffixes) + pass.Reportf( + nameArg.Pos(), + "metric name %q is not valid for method %v, it must have one of the following suffixes: %v", + metricName, methodName, strings.Join(suffixes, ", "), + ) + return + } + } + + // valid call! + // + // "report" the data we've found so it can be checked with a separate process (see cmd/main.go). + // that will error if any non-"success" lines were found, else check that the list is unique. + // + // tests may lead to the same package being processed more than once, but + // the reported lines for "path/to/file:line success: {name}" are still unique + // and that won't count as a duplicate. + pass.Reportf(call.Pos(), "success: %s", metricName) + return + }) +} + +func getPkgAndName(named *types.Named) (fullName string) { + obj := named.Obj() + if obj == nil { + // afaik not possible, this would be "a named type without a type" + return "" + } + pkg := obj.Pkg() + if pkg == nil { + // given where this is used, I believe this would currently imply + // "a builtin type with a method", which does not exist. + // but it's fine to return partial values, in case it's used in more places. + return obj.Name() + } + return pkg.Path() + "." + obj.Name() +} + +func getConstantString(expr ast.Expr) (str string, ok bool) { + lit, ok := expr.(*ast.BasicLit) + if !ok { + return "", false + } + if lit.Kind != token.STRING { + return "", false + } + return strings.Trim(lit.Value, `"`), true +} diff --git a/internal/tools/metricslint/analyzer_test.go b/internal/tools/metricslint/analyzer_test.go new file mode 100644 index 00000000000..55e262e7186 --- /dev/null +++ b/internal/tools/metricslint/analyzer_test.go @@ -0,0 +1,12 @@ +package metricslint + +import ( + "testing" + + "golang.org/x/tools/go/analysis/analysistest" +) + +func TestAnalyzer(t *testing.T) { + testdata := analysistest.TestData() + analysistest.Run(t, testdata, Analyzer, "github.com/uber/cadence/common/metrics/structured") +} diff --git a/internal/tools/metricslint/cmd/main.go b/internal/tools/metricslint/cmd/main.go new file mode 100644 index 00000000000..6fff6b1306e --- /dev/null +++ b/internal/tools/metricslint/cmd/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/exec" + "slices" + "strconv" + "strings" + + "golang.org/x/tools/go/analysis/singlechecker" + + "github.com/uber/cadence/internal/tools/metricslint" +) + +func main() { + var analyze bool + flag.BoolVar(&analyze, "analyze", false, "if true, run the analyzer normally (e.g. for debugging). if false, run the analyzer internally and interpret the results as a linter") + skip := map[string]int{} + flag.Func("skip", "metric name to ignore, comma, followed by the number of times it is duplicated. repeat to skip more things.", func(s string) error { + parts := strings.SplitN(s, ",", 2) + if len(parts) != 2 { + return fmt.Errorf("skip argument %q is not in the form `name,count`", s) + } + count, err := strconv.Atoi(parts[1]) + if err != nil || count < 2 { + return fmt.Errorf("skip argument %q has invalid count %q: %v", s, parts[1], err) + } + skip[parts[0]] = count + return nil + }) + flag.Parse() + + if analyze { + singlechecker.Main(metricslint.Analyzer) + } else { + os.Exit(checkAnalyzerOutput(skip)) + } +} + +func checkAnalyzerOutput(skip map[string]int) (exitCode int) { + cmd := exec.Command(os.Args[0], append([]string{"-analyze"}, os.Args[1:]...)...) + out, _ := cmd.CombinedOutput() + + defer func() { + // in case of crashes, print the output we got so far as it likely narrows down the cause. + if r := recover(); r != nil { + _, _ = fmt.Fprint(os.Stderr, "\n\n"+string(out)) + panic(r) + } + }() + + lines := strings.Split(string(out), "\n") + // map of metric name to set of lines using it (to deduplicate) + names := make(map[string]map[string]struct{}, len(lines)) + var failures []string + for _, line := range lines { + if line == "" { + continue // empty lines are fine + } + words := strings.Fields(line) + if len(words) == 3 && words[1] == "success:" { + prev, ok := names[words[2]] + if !ok { + prev = make(map[string]struct{}) + } + prev[line] = struct{}{} + names[words[2]] = prev + } else { + failures = append(failures, line) + } + } + + if len(failures) > 0 { + _, _ = fmt.Fprintln(os.Stderr, strings.Join(failures, "\n")+"\n") + return 1 + } + + exitCode = 0 + for name, usages := range names { + var complain bool + expected, ok := skip[name] + if ok { + complain = len(usages) != expected // expectation mismatch + } else { + complain = len(usages) > 1 // must not have duplicates + } + if complain { + if expected > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in an unexpected number of places:\n", name) + } else { + _, _ = fmt.Fprintf(os.Stderr, "Metric name %q used in multiple places:\n", name) + } + // frustratingly: this can't use `maps.Keys` because it + // returns an iterator, not a slice, so it can't be sorted or `...`-spread. + // bleh. + sorted := make([]string, 0, len(usages)) + for usage := range usages { + sorted = append(sorted, usage) + } + slices.Sort(sorted) + for usage := range usages { + _, _ = fmt.Fprintf(os.Stderr, "\t%s\n", usage) + } + exitCode = 1 + } + } + + return exitCode +} diff --git a/internal/tools/metricslint/testdata/go.mod b/internal/tools/metricslint/testdata/go.mod new file mode 100644 index 00000000000..42d6cbc8c14 --- /dev/null +++ b/internal/tools/metricslint/testdata/go.mod @@ -0,0 +1,4 @@ +module github.com/uber/cadence/common/metrics/structured +// ^ must be the same package as the emitter's definition for the hardcoded path to work right + +go 1.24 \ No newline at end of file diff --git a/internal/tools/metricslint/testdata/metrics.go b/internal/tools/metricslint/testdata/metrics.go new file mode 100644 index 00000000000..b6078034f00 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics.go @@ -0,0 +1,67 @@ +package structured + +import ( + "sync" + "testing" + "time" +) + +const disallowedConst = "dynamic-name" + +type Emitter struct{} +type Tags map[string]string + +func (e Emitter) Count(name string, value int, meta Tags) {} +func (e Emitter) Histogram(name string, buckets []time.Duration, dur time.Duration, meta Tags) {} +func (e Emitter) Gauge(name string, value float64, meta Tags) {} +func (e Emitter) IntHistogram(name string, buckets []time.Duration, num int, meta Tags) {} + +type UnrelatedThing struct{ metricName string } + +var _ = UnrelatedThing{metricName: disallowedConst} + +func (n UnrelatedThing) Gauge(name string) {} +func (n UnrelatedThing) Count(name string, value int, meta Tags) {} + +// test helper funcs can violate the rules, ignore them. +// they must have *testing.T as the first arg +func TestHelper(t *testing.T, name string) { + var emitter Emitter + emitter.Count(name, 5, nil) +} + +func someMetricsCalls() { + var emitter Emitter + tags := Tags{"key": "value"} + + // valid calls on the emitter + emitter.Count("test-count", 5, tags) // want `success: test-count` + emitter.Histogram("test-histogram_ns", nil, time.Second, tags) // want `success: test-histogram_ns` + + // duplicates are not blocked at this stage, it's run separately because of how the analysis framework works + emitter.Count("duplicate-metric", 1, tags) // want `success: duplicate-metric` + emitter.Count("duplicate-metric", 2, tags) // want `success: duplicate-metric` + + // wrong suffix + emitter.Histogram("test-histogram", nil, time.Second, tags) // want `metric name "test-histogram" is not valid for method Histogram, it must have one of the following suffixes: "_ns"` + + // string vars are not good + dynamicName := "dynamic-name" + emitter.Count(dynamicName, 1, tags) // want `metric names must be in-line strings, not consts or vars: dynamicName` + + // named consts are also not good + // (this is fairly easy to allow though, if strongly desired) + emitter.Count(disallowedConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + + var unrelated UnrelatedThing + name := "asdf" + unrelated.Gauge(name) // ignored, not on Emitter + unrelated.Count(name, 42, tags) // ignored, not on Emitter + + // contents of closures must be checked too (ensures we recurse into calls) + var once sync.Once + once.Do(func() { + emitter.Gauge(disallowedConst, 9001, tags) // want `metric names must be in-line strings, not consts or vars: disallowedConst` + emitter.IntHistogram("hist_counts", nil, 42, tags) // want `success: hist_counts` + }) +} diff --git a/internal/tools/metricslint/testdata/metrics_gen.go b/internal/tools/metricslint/testdata/metrics_gen.go new file mode 100644 index 00000000000..02669e19d57 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_gen.go @@ -0,0 +1,20 @@ +package structured + +// Code generated by test_shenanigans; DO NOT EDIT + +import "time" + +func GeneratedFunction() { + var emitter Emitter + tags := Tags{} + + // allowed, generated code needs to follow the same rules + emitter.Count("base_count", 1, tags) // want `success: base_count` + emitter.Gauge("gauge", 12, tags) // want `success: gauge` + emitter.Histogram("hist_ns", nil, time.Second, tags) // want `success: hist_ns` + + // disallowed, generated code needs to follow the same rules + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` + emitter.Count(sneakyConst, 1, tags) // want `metric names must be in-line strings, not consts or vars: sneakyConst` +} diff --git a/internal/tools/metricslint/testdata/metrics_test.go b/internal/tools/metricslint/testdata/metrics_test.go new file mode 100644 index 00000000000..0067efd8e67 --- /dev/null +++ b/internal/tools/metricslint/testdata/metrics_test.go @@ -0,0 +1,20 @@ +package structured + +import ( + "time" +) + +func ignoredInTestFiles() { + var emitter Emitter + tags := Tags{} + + // test code is allowed to break the rules because it does not run in production + + emitter.Count("base_count", 1, tags) + emitter.Gauge("gauge", 12, tags) + emitter.Histogram("hist_ns", nil, time.Second, tags) + + const sneakyConst = "bad_metric" + emitter.IntHistogram(sneakyConst, nil, 3, tags) + emitter.Count(sneakyConst, 1, tags) +} diff --git a/revive.toml b/revive.toml index 2238d4dd4c9..d1b939df7af 100644 --- a/revive.toml +++ b/revive.toml @@ -20,7 +20,7 @@ warningCode = 0 [rule.errorf] # [rule.exported] # disabled due to lack of value / encouraging bad habits [rule.if-return] -[rule.increment-decrement] +# [rule.increment-decrement] # noisy on generated code, otherwise seems fine - if there's a way to fix that, probably re-enable [rule.indent-error-flow] # Disabled because we have 158 packages that need package comments; we could instead add ignore # directives for existing packages and require it for new packages. @@ -62,7 +62,7 @@ arguments=[["loop","method-call","recover","return", "immediate-recover"]] [rule.range-val-address] # beneficial [rule.range-val-in-closure] # beneficial [rule.unconditional-recursion] # probably a good idea -[rule.unreachable-code] # code simplifier +# [rule.unreachable-code] # reasonable code simplifier, but does not match compiler behavior (e.g. log.Fatal still needs panic or return) [rule.waitgroup-by-value] # correct use of sync code, important #### unused utilities diff --git a/service/history/engine/engineimpl/history_engine.go b/service/history/engine/engineimpl/history_engine.go index 576b2998707..573ac483846 100644 --- a/service/history/engine/engineimpl/history_engine.go +++ b/service/history/engine/engineimpl/history_engine.go @@ -279,6 +279,7 @@ func NewEngineWithShardContext( historyEngImpl, config, shard.GetMetricsClient(), + shard.GetMetricsEmitter(), replicationTaskFetcher, replicationTaskExecutor, shard.GetTimeSource(), diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 2e0c9416950..66fa0ce32c2 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation" @@ -73,18 +74,19 @@ type ( // taskProcessorImpl is responsible for processing replication tasks for a shard. taskProcessorImpl struct { - currentCluster string - sourceCluster string - status int32 - shard shard.Context - historyEngine engine.Engine - historySerializer persistence.PayloadSerializer - config *config.Config - metricsClient metrics.Client - logger log.Logger - taskExecutor TaskExecutor - hostRateLimiter quotas.Limiter - shardRateLimiter quotas.Limiter + currentCluster string + sourceCluster string + status int32 + shard shard.Context + historyEngine engine.Engine + historySerializer persistence.PayloadSerializer + config *config.Config + metricsClient metrics.Client + perDomainTaskMetrics perDomainTaskMetricTags + logger log.Logger + taskExecutor TaskExecutor + hostRateLimiter quotas.Limiter + shardRateLimiter quotas.Limiter taskRetryPolicy backoff.RetryPolicy dlqRetryPolicy backoff.RetryPolicy @@ -103,8 +105,51 @@ type ( token *types.ReplicationToken respChan chan<- *types.ReplicationMessages } + + // all metrics tags are dynamic per task and cannot be filled in up-front. + // + // skip:Convenience unable to use ad-hoc due to dynamic values + perDomainTaskMetricTags struct { + structured.Emitter + structured.DynamicOperationTags + + TargetCluster string `tag:"target_cluster"` + Domain struct{} `tag:"domain"` + + ts clock.TimeSource + } ) +//go:generate metricsgen + +func (p perDomainTaskMetricTags) taskProcessed(operation int, domain string, processingStart time.Time, task *types.ReplicationTask, legacyScope metrics.Scope) { + tags := p.GetTags(operation) + tags["domain"] = domain + + now := p.ts.Now() + processingLatency := now.Sub(processingStart) + replicationLatency := now.Sub(time.Unix(0, task.GetCreationTime())) + + // single-task processing latency + // caution: prometheus will not allow timers and histograms to use the same name, + // so the "_ns" suffix here is necessary. + p.Histogram("task_processing_latency_ns", structured.Low1ms10s, processingLatency, tags) + // latency from task generated to task received + p.Histogram("task_replication_latency_ns", structured.Mid1ms24h, replicationLatency, tags) + // number of replication tasks + // this is an exact match for the legacy scope, so it would cause duplicates if emitted. + // because this is the first use of this new system, we'll verify the tags with the histograms first. + // p.Count("replication_tasks_applied_per_domain", 1, tags) + + // emit single task processing latency + legacyScope.RecordTimer(metrics.TaskProcessingLatency, processingLatency) + // emit latency from task generated to task received + legacyScope.RecordTimer(metrics.ReplicationTaskLatency, replicationLatency) + // emit the number of replication tasks. + // when removing, be sure to un-comment the p.Count above + legacyScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) +} + var _ TaskProcessor = (*taskProcessorImpl)(nil) // NewTaskProcessor creates a new replication task processor. @@ -113,6 +158,7 @@ func NewTaskProcessor( historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, + emitter structured.Emitter, taskFetcher TaskFetcher, taskExecutor TaskExecutor, clock clock.TimeSource, @@ -134,14 +180,19 @@ func NewTaskProcessor( noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, clock) return &taskProcessorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: sourceCluster, - status: common.DaemonStatusInitialized, - shard: shard, - historyEngine: historyEngine, - historySerializer: persistence.NewPayloadSerializer(), - config: config, - metricsClient: metricsClient, + currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + sourceCluster: sourceCluster, + status: common.DaemonStatusInitialized, + shard: shard, + historyEngine: historyEngine, + historySerializer: persistence.NewPayloadSerializer(), + config: config, + metricsClient: metricsClient, + perDomainTaskMetrics: newPerDomainTaskMetricTags( + emitter, + clock, + sourceCluster, // emitted as "target_cluster" and logged inconsistently, unsure why + ), logger: shard.GetLogger().WithTags(tag.SourceCluster(sourceCluster), tag.ShardID(shardID)), taskExecutor: taskExecutor, hostRateLimiter: taskFetcher.GetRateLimiter(), @@ -467,25 +518,18 @@ func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTa if err != nil { p.updateFailureMetric(scope, err, p.shard.GetShardID()) } else { - now := ts.Now() mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster)) domainID := replicationTask.HistoryTaskV2Attributes.GetDomainID() + var domainName string if domainID != "" { - domainName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) + name, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) if errorDomainName != nil { return errorDomainName } + domainName = name mScope = mScope.Tagged(metrics.DomainTag(domainName)) } - // emit single task processing latency - mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime)) - // emit latency from task generated to task received - mScope.RecordTimer( - metrics.ReplicationTaskLatency, - now.Sub(time.Unix(0, replicationTask.GetCreationTime())), - ) - // emit the number of replication tasks - mScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) + p.perDomainTaskMetrics.taskProcessed(scope, domainName, startTime, replicationTask, mScope) shardScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster), metrics.InstanceTag(strconv.Itoa(p.shard.GetShardID()))) shardScope.IncCounter(metrics.ReplicationTasksApplied) diff --git a/service/history/replication/task_processor_metrics_gen.go b/service/history/replication/task_processor_metrics_gen.go new file mode 100644 index 00000000000..8f80fb2b3ca --- /dev/null +++ b/service/history/replication/task_processor_metrics_gen.go @@ -0,0 +1,48 @@ +package replication + +// Code generated ./internal/tools/metricsgen; DO NOT EDIT + +import ( + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/metrics/structured" +) + +// newPerDomainTaskMetricTags constructs a new metric-tag-holding perDomainTaskMetricTags, +// and it must be used, instead of custom initialization to ensure newly added +// tags can be detected at compile time instead of missing them at run time. +func newPerDomainTaskMetricTags( + emitter structured.Emitter, + ts clock.TimeSource, + TargetCluster string, +) perDomainTaskMetricTags { + p := perDomainTaskMetricTags{ + Emitter: emitter, + ts: ts, + TargetCluster: TargetCluster, + } + return p +} + +// NumTags returns the number of tags that are intended to be written in all +// cases. This will include all embedded parent tags and all reserved tags, +// and is intended to be used to pre-allocate maps of tags. +func (p perDomainTaskMetricTags) NumTags() int { + num := 1 // num of self fields + num += 1 // num of reserved fields + num += p.DynamicOperationTags.NumTags() + return num +} + +// PutTags writes this set of tags (and its embedded parents) to the passed map. +func (p perDomainTaskMetricTags) PutTags(operation int, into structured.DynamicTags) { + p.DynamicOperationTags.PutTags(operation, into) + into["target_cluster"] = p.TargetCluster +} + +// GetTags is a minor helper to get a pre-allocated-and-filled map with room +// for reserved fields (i.e. 'struct{}' type fields, which only declare intent). +func (p perDomainTaskMetricTags) GetTags(operation int) structured.DynamicTags { + tags := make(structured.DynamicTags, p.NumTags()) + p.PutTags(operation, tags) + return tags +} diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index d7b54163ff6..0c320a14aa3 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -49,6 +49,7 @@ import ( "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" @@ -154,6 +155,7 @@ func (s *taskProcessorSuite) SetupTest() { s.mockEngine, s.config, metricsClient, + structured.NewTestEmitter(s.T(), nil), s.taskFetcher, s.taskExecutor, s.clock, @@ -646,6 +648,7 @@ func TestProcessorLoop_TaskExecuteFailed_ShardChangeErr(t *testing.T) { mockEngine, config, metricsClient, + structured.NewTestEmitter(t, nil), taskFetcher, taskExecutor, clock.NewMockedTimeSource(), diff --git a/service/history/resource/resource_mock.go b/service/history/resource/resource_mock.go index 184fa60e971..a97aefb5ade 100644 --- a/service/history/resource/resource_mock.go +++ b/service/history/resource/resource_mock.go @@ -36,6 +36,7 @@ import ( membership "github.com/uber/cadence/common/membership" messaging "github.com/uber/cadence/common/messaging" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" client0 "github.com/uber/cadence/common/persistence/client" algorithm "github.com/uber/cadence/common/quotas/global/algorithm" @@ -489,6 +490,20 @@ func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockResource) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockResourceMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockResource)(nil).GetMetricsEmitter)) +} + // GetPayloadSerializer mocks base method. func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { m.ctrl.T.Helper() diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 9d0d2355d59..cecd64a40a0 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/structured" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" @@ -65,6 +66,7 @@ type ( GetLogger() log.Logger GetThrottledLogger() log.Logger GetMetricsClient() metrics.Client + GetMetricsEmitter() structured.Emitter GetTimeSource() clock.TimeSource PreviousShardOwnerWasDifferent() bool diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index dc774b2a714..027da55949b 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -22,6 +22,7 @@ import ( cluster "github.com/uber/cadence/common/cluster" log "github.com/uber/cadence/common/log" metrics "github.com/uber/cadence/common/metrics" + structured "github.com/uber/cadence/common/metrics/structured" persistence "github.com/uber/cadence/common/persistence" types "github.com/uber/cadence/common/types" config "github.com/uber/cadence/service/history/config" @@ -353,6 +354,20 @@ func (mr *MockContextMockRecorder) GetMetricsClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockContext)(nil).GetMetricsClient)) } +// GetMetricsEmitter mocks base method. +func (m *MockContext) GetMetricsEmitter() structured.Emitter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsEmitter") + ret0, _ := ret[0].(structured.Emitter) + return ret0 +} + +// GetMetricsEmitter indicates an expected call of GetMetricsEmitter. +func (mr *MockContextMockRecorder) GetMetricsEmitter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsEmitter", reflect.TypeOf((*MockContext)(nil).GetMetricsEmitter)) +} + // GetQueueAckLevel mocks base method. func (m *MockContext) GetQueueAckLevel(category persistence.HistoryTaskCategory) persistence.HistoryTaskKey { m.ctrl.T.Helper()