diff --git a/common/metrics/defs.go b/common/metrics/defs.go index bcd12447f5c..a8a03c0fdca 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -37,10 +37,12 @@ type ( // metricDefinition contains the definition for a metric metricDefinition struct { - metricType MetricType // metric type - metricName MetricName // metric name - metricRollupName MetricName // optional. if non-empty, this name must be used for rolled-up version of this metric - buckets tally.Buckets // buckets if we are emitting histograms + metricType MetricType // metric type + metricName MetricName // metric name + metricRollupName MetricName // optional. if non-empty, this name must be used for rolled-up version of this metric + buckets tally.Buckets // buckets if we are emitting histograms + exponentialBuckets histogrammy[SubsettableHistogram] + intExponentialBuckets histogrammy[IntSubsettableHistogram] } // scopeDefinition holds the tag definitions for a scope @@ -2433,6 +2435,7 @@ const ( TaskBatchCompleteCounter TaskBatchCompleteFailure TaskProcessingLatency + ExponentialTaskProcessingLatency TaskQueueLatency ScheduleToStartHistoryQueueLatencyPerTaskList TaskRequestsOldScheduler @@ -2697,6 +2700,7 @@ const ( ReplicationTaskCleanupCount ReplicationTaskCleanupFailure ReplicationTaskLatency + ExponentialReplicationTaskLatency MutableStateChecksumMismatch MutableStateChecksumInvalidated FailoverMarkerCount @@ -3208,16 +3212,17 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ RingResolverError: {metricName: "ring_resolver_error", metricType: Counter}, }, History: { - TaskRequests: {metricName: "task_requests", metricType: Counter}, - TaskLatency: {metricName: "task_latency", metricType: Timer}, - TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, - TaskFailures: {metricName: "task_errors", metricType: Counter}, - TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter}, - TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter}, - TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter}, - TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter}, - TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer}, - TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer}, + TaskRequests: {metricName: "task_requests", metricType: Counter}, + TaskLatency: {metricName: "task_latency", metricType: Timer}, + TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, + TaskFailures: {metricName: "task_errors", metricType: Counter}, + TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter}, + TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter}, + TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter}, + TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter}, + TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer}, + ExponentialTaskProcessingLatency: {metricName: "task_latency_processing_ns", metricType: Histogram, exponentialBuckets: Low1ms100s}, + TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer}, ScheduleToStartHistoryQueueLatencyPerTaskList: {metricName: "schedule_to_start_history_queue_latency_per_tl", metricType: Timer}, TaskRequestsOldScheduler: {metricName: "task_requests_old_scheduler", metricType: Counter}, TaskRequestsNewScheduler: {metricName: "task_requests_new_scheduler", metricType: Counter}, @@ -3474,6 +3479,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ReplicationTaskCleanupCount: {metricName: "replication_task_cleanup_count", metricType: Counter}, ReplicationTaskCleanupFailure: {metricName: "replication_task_cleanup_failed", metricType: Counter}, ReplicationTaskLatency: {metricName: "replication_task_latency", metricType: Timer}, + ExponentialReplicationTaskLatency: {metricName: "replication_task_latency_ns", metricType: Histogram, exponentialBuckets: Mid1ms24h}, MutableStateChecksumMismatch: {metricName: "mutable_state_checksum_mismatch", metricType: Counter}, MutableStateChecksumInvalidated: {metricName: "mutable_state_checksum_invalidated", metricType: Counter}, FailoverMarkerCount: {metricName: "failover_marker_count", metricType: Counter}, diff --git a/common/metrics/defs_test.go b/common/metrics/defs_test.go index 0bdf79e70e3..4081d777e52 100644 --- a/common/metrics/defs_test.go +++ b/common/metrics/defs_test.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "regexp" + "strings" "testing" "time" @@ -187,6 +188,23 @@ func TestMetricsAreUnique(t *testing.T) { }) } +func TestHistogramSuffixes(t *testing.T) { + for _, serviceMetrics := range MetricDefs { + for _, def := range serviceMetrics { + if def.intExponentialBuckets != nil { + if !strings.HasSuffix(def.metricName.String(), "_counts") { + t.Errorf("int-exponential-histogram metric %q should have suffix \"_counts\"", def.metricName.String()) + } + } + if def.exponentialBuckets != nil { + if !strings.HasSuffix(def.metricName.String(), "_ns") { + t.Errorf("exponential-histogram metric %q should have suffix \"_ns\"", def.metricName.String()) + } + } + } + } +} + func TestExponentialDurationBuckets(t *testing.T) { factor := math.Pow(2, 0.25) assert.Equal(t, 80, len(ExponentialDurationBuckets)) diff --git a/common/metrics/histograms.go b/common/metrics/histograms.go new file mode 100644 index 00000000000..d1e914d9dde --- /dev/null +++ b/common/metrics/histograms.go @@ -0,0 +1,293 @@ +package metrics + +import ( + "fmt" + "math" + "slices" + "strconv" + "time" + + "github.com/uber-go/tally" +) + +// Nearly all histograms should use pre-defined buckets here, to encourage consistency. +// +// When creating a new one, make sure to read makeSubsettableHistogram for context, and choose +// values that communicate your *intent*, not the actual final value. Add a test to show the +// resulting buckets, so reviewers can see just how much / how little you chose. +// +// Similarly, name them more by their intent than their exact ranges, because "it has just barely +// enough buckets" is almost always not *actually* enough when things misbehave. +// Choosing by intent also helps make adjusting these values safer if needed (e.g. subsetting or +// reducing the range), because we can be sure all intents match. +var ( + // Default1ms100s is our "default" set of buckets, targeting at least 1ms through 100s, + // and is "rounded up" slightly to reach 80 buckets (100s needs 68 buckets), + // plus multi-minute exceptions are common enough to support for the small additional cost + // (this goes up to ~15m). + // + // That makes this *relatively* costly, but hopefully good enough for most metrics without + // needing any careful thought. If this proves incorrect, it will be down-scaled without + // checking existing uses, so make sure to use something else if you require a certain level + // of precision. + // + // If you need sub-millisecond precision, or substantially longer durations than about 1 minute, + // consider using a different histogram. + Default1ms100s = makeSubsettableHistogram(2, time.Millisecond, 100*time.Second, 80) + + // Low1ms100s is a half-resolution version of Default1ms100s, intended for use any time the default + // is expected to be more costly than we feel comfortable with. Or for automatic down-scaling + // at runtime via dynamic config, if that's ever built. + // + // This uses only 40 buckets, so it's likely good enough for most purposes, e.g. database operations + // that might have high cardinality but should not exceed about a minute even in exceptional cases. + // Reducing this to "1ms to 10s" seems reasonable for some cases, but that still needs ~32 buckets, + // which isn't a big savings and reduces our ability to notice abnormally slow events. + Low1ms100s = Default1ms100s.subsetTo(1) + + // High1ms24h covers things like activity latency, where ranges are very large but + // "longer than 1 day" is not particularly worth being precise about. + // + // This uses a lot of buckets, so it must be used with relatively-low cardinality elsewhere, + // and/or consider dual emitting (Mid1ms24h or lower) so overviews can be queried efficiently. + High1ms24h = makeSubsettableHistogram(2, time.Millisecond, 24*time.Hour, 112) + + // Mid1ms24h is a one-scale-lower version of High1ms24h, + // for use when we know it's too detailed to be worth emitting. + // + // This uses 56 buckets, half of High1ms24h's 112 + Mid1ms24h = High1ms24h.subsetTo(1) + + // Mid1To16k is a histogram for small counters, like "how many replication tasks did we receive". + // + // This targets 1 to ~10k with some buffer, and ends just below 64k. + // Do not rely on this for values which can ever reach 64k, make a new histogram. + Mid1To16k = IntSubsettableHistogram(makeSubsettableHistogram(2, 1, 16384, 64)) +) + +// SubsettableHistogram is a duration-based histogram that can be subset to a lower scale +// in a standardized, predictable way. It is intentionally compatible with Prometheus and OTEL's +// "exponential histograms": https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram +// +// These histograms MUST always have a "_ns" suffix in their name to avoid confusion with timers. +type SubsettableHistogram struct { + tallyBuckets tally.DurationBuckets + + scale int +} + +// IntSubsettableHistogram is a non-duration-based integer-distribution histogram, otherwise identical +// to SubsettableHistogram but built as a separate type so you cannot pass the wrong one. +// +// These histograms MUST always have a "_counts" suffix in their name to avoid confusion with timers. +// (more suffixes will likely be allowed as needed) +type IntSubsettableHistogram SubsettableHistogram + +// currently we have no apparent need for float-histograms, +// as all our value ranges go from >=1 to many thousands, where +// decimal precision is pointless and mostly just looks bad. +// +// if we ever need 0..1 precision in histograms, we can add them then. +// type FloatSubsettableHistogram struct { +// tally.ValueBuckets +// scale int +// } + +// subsetTo takes an existing histogram and reduces its detail level. +// +// this can be replaced by simply creating a new histogram with the same args +// as the original but half the length, but it's added because: 1) it works too, +// and 2) to document how the process works, because we'll likely be doing this +// at query time to reduce the level of detail in small / over-crowded graphs. +func (s SubsettableHistogram) subsetTo(newScale int) SubsettableHistogram { + if newScale >= s.scale { + panic(fmt.Sprintf("scale %v is not less than the current scale %v", newScale, s.scale)) + } + if newScale < 0 { + panic(fmt.Sprintf("negative scales (%v == greater than *2 per step) are possible, but do not have tests yet", newScale)) + } + dup := SubsettableHistogram{ + tallyBuckets: slices.Clone(s.tallyBuckets), + scale: s.scale, + } + // compress every other bucket per -1 scale + for dup.scale > newScale { + if (len(dup.tallyBuckets)-1)%2 != 0 { + panic(fmt.Sprintf("cannot subset from scale %v to %v, %v-buckets is not divisible by 2", dup.scale, dup.scale-1, len(dup.tallyBuckets)-1)) + } + half := make(tally.DurationBuckets, 0, ((len(dup.tallyBuckets)-1)/2)+1) + half = append(half, dup.tallyBuckets[0]) // keep the zero value intact + for i := 1; i < len(dup.tallyBuckets); i += 2 { + half = append(half, dup.tallyBuckets[i]) // compress the first, third, etc + } + dup.tallyBuckets = half + dup.scale-- + } + return dup +} + +func (i IntSubsettableHistogram) subsetTo(newScale int) IntSubsettableHistogram { + return IntSubsettableHistogram(SubsettableHistogram(i).subsetTo(newScale)) +} + +func (s SubsettableHistogram) tags() map[string]string { + return map[string]string{ + // use the duration string func, as "1ms" duration suffix is clearer than "1000000" for nanoseconds. + // this also helps differentiate "tracks time" from "may be a general value distribution", + // both visually when querying by hand and for any future automation (if needed). + "histogram_start": s.start().String(), + "histogram_end": s.end().String(), + "histogram_scale": strconv.Itoa(s.scale), + } +} + +func (i IntSubsettableHistogram) tags() map[string]string { + return map[string]string{ + "histogram_start": strconv.Itoa(int(i.start())), + "histogram_end": strconv.Itoa(int(i.end())), + "histogram_scale": strconv.Itoa(i.scale), + } +} + +// makeSubsettableHistogram is a replacement for tally.MustMakeExponentialDurationBuckets, +// tailored to make "construct a range" or "ensure N buckets" simpler for OTEL-compatible exponential histograms +// (i.e. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). +// +// It ensures values are as precise as possible (preventing display-space-wasteful numbers like 3.99996ms), +// that all histograms start with a 0 value (else erroneous negative values are impossible to notice), +// and avoids some bugs with low values (tally.MustMakeExponentialDurationBuckets misbehaves for small numbers, +// causing all buckets to == the minimum value). +// +// # To use +// +// Choose scale/start/end values that match your *intent*, i.e. the range you want to capture, and then choose +// a length that EXCEEDS that by at least 2x, ideally 4x-10x, and ends at a highly-divisible-by-2 value. +// +// The exact length / exceeding / etc does not matter (just write a test so it's documented and can be reviewed), +// it just serves to document your intent and to make sure that we have some head-room so we can understand how +// wrongly we guessed at our needs if it's exceeded during an incident of some kind. We've failed at this +// multiple times in the past, it's worth paying a bit extra to be able to diagnose problems. +// +// For all histograms produced, add a test to print the concrete values, so they can be quickly checked when reading. +func makeSubsettableHistogram(scale int, start, end time.Duration, length int) SubsettableHistogram { + if scale < 0 || scale > 3 { + // anything outside this range is currently not expected and probably a mistake, + // but any value is technically sound. + panic(fmt.Sprintf("scale must be between 0 (grows by *2) and 3 (grows by *2^1/8), got scale: %v", scale)) + } + if start <= 0 { + panic(fmt.Sprintf("start must be greater than 0 or it will not grow exponentially, got %v", start)) + } + if start >= end { + panic(fmt.Sprintf("start must be less than end (%v < %v)", start, end)) + } + if length < 12 || length > 160 { + // 160 is probably higher than we should consider, but going further is currently risking much too high costs. + // if this changes, e.g. due to metrics optimizations, just adjust this validation. + // + // 12 is pretty arbitrary, I just don't expect us to ever make a histogram that small, so it's probably + // from accidentally passing scale or something to the wrong argument. + panic(fmt.Sprintf("length must be between 12 < %d <=160", length)) + } + // make sure the number of buckets completes a "full" row, + // i.e. the next bucket would be a power of 2 of the start value. + // + // for a more visual example of what this means, see the logged strings in tests: + // each "row" of values must be the same width. + // + // adding a couple buckets to ensure this is met costs very little, and ensures + // subsetting combines values more consistently (not crossing rows) for longer. + powerOfTwoWidth := int(math.Pow(2, float64(scale))) // num of buckets needed to double a value + missing := length % powerOfTwoWidth + if missing != 0 { + panic(fmt.Sprintf(`number of buckets must "fill" a power of 2 to end at a consistent row width. `+ + `got %d, probably raise to %d`, + length, length+missing)) + } + + var buckets tally.DurationBuckets + for i := 0; i < length; i++ { + buckets = append(buckets, nextBucket(start, len(buckets), scale)) + } + + if last(buckets) <= end*2 { + panic(fmt.Sprintf("not enough buckets (%d) to exceed the end target (%v) by at least 2x. "+ + "you are STRONGLY encouraged to include ~2x-10x more range than your intended end value, "+ + "because we almost always underestimate the ranges needed during incidents", + length, last(buckets)), + ) + } + + return SubsettableHistogram{ + tallyBuckets: append( + // always include a zero value at the beginning, so negative values are noticeable ("-inf to 0" bucket) + tally.DurationBuckets{0}, + buckets..., + ), + scale: scale, + } +} + +// last-item-in-slice helper to eliminate some magic `-1`s +func last[T any, X ~[]T](s X) T { + return s[len(s)-1] +} + +func nextBucket(start time.Duration, num int, scale int) time.Duration { + // calculating it from `start` each time reduces floating point error, ensuring "clean" multiples + // at every power of 2 (and possibly others), instead of e.g. "1ms ... 1.9999994ms" which occurs + // if you try to build from the previous value each time. + return time.Duration( + float64(start) * + math.Pow(2, float64(num)/math.Pow(2, float64(scale)))) +} + +func (s SubsettableHistogram) histScale() int { return s.scale } +func (s SubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) } +func (s SubsettableHistogram) len() int { return len(s.tallyBuckets) } +func (s SubsettableHistogram) start() time.Duration { return s.tallyBuckets[1] } +func (s SubsettableHistogram) end() time.Duration { return s.tallyBuckets[len(s.tallyBuckets)-1] } +func (s SubsettableHistogram) buckets() tally.DurationBuckets { return s.tallyBuckets } +func (s SubsettableHistogram) print(to func(string, ...any)) { + to("%v\n", s.tallyBuckets[0:1]) // zero value on its own row + for rowStart := 1; rowStart < s.len(); rowStart += s.width() { + to("%v\n", s.tallyBuckets[rowStart:rowStart+s.width()]) + } +} + +func (i IntSubsettableHistogram) histScale() int { return i.scale } +func (i IntSubsettableHistogram) width() int { return int(math.Pow(2, float64(i.scale))) } +func (i IntSubsettableHistogram) len() int { return len(i.tallyBuckets) } +func (i IntSubsettableHistogram) start() time.Duration { return i.tallyBuckets[1] } +func (i IntSubsettableHistogram) end() time.Duration { + return i.tallyBuckets[len(i.tallyBuckets)-1] +} +func (i IntSubsettableHistogram) buckets() tally.DurationBuckets { return i.tallyBuckets } +func (i IntSubsettableHistogram) print(to func(string, ...any)) { + // fairly unreadable as duration-strings, so convert to int by hand + to("[%d]\n", int(i.tallyBuckets[0])) // zero value on its own row + for rowStart := 1; rowStart < i.len(); rowStart += i.width() { + ints := make([]int, 0, i.width()) + for _, d := range i.tallyBuckets[rowStart : rowStart+i.width()] { + ints = append(ints, int(d)) + } + to("%v\n", ints) + } +} + +var _ histogrammy[SubsettableHistogram] = SubsettableHistogram{} +var _ histogrammy[IntSubsettableHistogram] = IntSubsettableHistogram{} + +// internal utility/test methods, but could be exposed if there's a use for it +type histogrammy[T any] interface { + histScale() int // exponential scale value. 0..3 inclusive. + width() int // number of values per power of 2 == how wide to print each row. 1, 2, 4, or 8. + len() int // number of buckets + start() time.Duration // first non-zero bucket + end() time.Duration // last bucket + buckets() tally.DurationBuckets // access to all buckets + subsetTo(newScale int) T // generic so specific types can be returned + tags() map[string]string // start, end, and scale tags that need to be implicitly added + + print(to func(string, ...any)) // test-oriented printer +} diff --git a/common/metrics/histograms_test.go b/common/metrics/histograms_test.go new file mode 100644 index 00000000000..16465d4fc18 --- /dev/null +++ b/common/metrics/histograms_test.go @@ -0,0 +1,191 @@ +package metrics + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHistogramValues(t *testing.T) { + t.Run("default_1ms_to_100s", func(t *testing.T) { + checkHistogram(t, Default1ms100s, ` +[0s] +[1ms 1.189207ms 1.414213ms 1.681792ms] +[2ms 2.378414ms 2.828427ms 3.363585ms] +[4ms 4.756828ms 5.656854ms 6.727171ms] +[8ms 9.513656ms 11.313708ms 13.454342ms] +[16ms 19.027313ms 22.627416ms 26.908685ms] +[32ms 38.054627ms 45.254833ms 53.81737ms] +[64ms 76.109255ms 90.509667ms 107.634741ms] +[128ms 152.21851ms 181.019335ms 215.269482ms] +[256ms 304.437021ms 362.038671ms 430.538964ms] +[512ms 608.874042ms 724.077343ms 861.077929ms] +[1.024s 1.217748085s 1.448154687s 1.722155858s] +[2.048s 2.435496171s 2.896309375s 3.444311716s] +[4.096s 4.870992343s 5.792618751s 6.888623433s] +[8.192s 9.741984686s 11.585237502s 13.777246867s] +[16.384s 19.483969372s 23.170475005s 27.554493735s] +[32.768s 38.967938744s 46.340950011s 55.10898747s] +[1m5.536s 1m17.935877488s 1m32.681900023s 1m50.21797494s] +[2m11.072s 2m35.871754977s 3m5.363800047s 3m40.43594988s] +[4m22.144s 5m11.743509955s 6m10.727600094s 7m20.87189976s] +[8m44.288s 10m23.48701991s 12m21.455200189s 14m41.743799521s] +`) + }) + t.Run("low_1ms_to_100s", func(t *testing.T) { + checkHistogram(t, Low1ms100s, ` +[0s] +[1ms 1.414213ms] +[2ms 2.828427ms] +[4ms 5.656854ms] +[8ms 11.313708ms] +[16ms 22.627416ms] +[32ms 45.254833ms] +[64ms 90.509667ms] +[128ms 181.019335ms] +[256ms 362.038671ms] +[512ms 724.077343ms] +[1.024s 1.448154687s] +[2.048s 2.896309375s] +[4.096s 5.792618751s] +[8.192s 11.585237502s] +[16.384s 23.170475005s] +[32.768s 46.340950011s] +[1m5.536s 1m32.681900023s] +[2m11.072s 3m5.363800047s] +[4m22.144s 6m10.727600094s] +[8m44.288s 12m21.455200189s] +`) + }) + t.Run("high_1ms_to_24h", func(t *testing.T) { + checkHistogram(t, High1ms24h, ` +[0s] +[1ms 1.189207ms 1.414213ms 1.681792ms] +[2ms 2.378414ms 2.828427ms 3.363585ms] +[4ms 4.756828ms 5.656854ms 6.727171ms] +[8ms 9.513656ms 11.313708ms 13.454342ms] +[16ms 19.027313ms 22.627416ms 26.908685ms] +[32ms 38.054627ms 45.254833ms 53.81737ms] +[64ms 76.109255ms 90.509667ms 107.634741ms] +[128ms 152.21851ms 181.019335ms 215.269482ms] +[256ms 304.437021ms 362.038671ms 430.538964ms] +[512ms 608.874042ms 724.077343ms 861.077929ms] +[1.024s 1.217748085s 1.448154687s 1.722155858s] +[2.048s 2.435496171s 2.896309375s 3.444311716s] +[4.096s 4.870992343s 5.792618751s 6.888623433s] +[8.192s 9.741984686s 11.585237502s 13.777246867s] +[16.384s 19.483969372s 23.170475005s 27.554493735s] +[32.768s 38.967938744s 46.340950011s 55.10898747s] +[1m5.536s 1m17.935877488s 1m32.681900023s 1m50.21797494s] +[2m11.072s 2m35.871754977s 3m5.363800047s 3m40.43594988s] +[4m22.144s 5m11.743509955s 6m10.727600094s 7m20.87189976s] +[8m44.288s 10m23.48701991s 12m21.455200189s 14m41.743799521s] +[17m28.576s 20m46.974039821s 24m42.910400378s 29m23.487599042s] +[34m57.152s 41m33.948079642s 49m25.820800757s 58m46.975198084s] +[1h9m54.304s 1h23m7.896159284s 1h38m51.641601515s 1h57m33.950396168s] +[2h19m48.608s 2h46m15.792318568s 3h17m43.283203031s 3h55m7.900792337s] +[4h39m37.216s 5h32m31.584637137s 6h35m26.566406062s 7h50m15.801584674s] +[9h19m14.432s 11h5m3.169274274s 13h10m53.132812125s 15h40m31.603169349s] +[18h38m28.864s 22h10m6.338548549s 26h21m46.265624251s 31h21m3.206338698s] +[37h16m57.728s 44h20m12.677097099s 52h43m32.531248503s 62h42m6.412677396s] +`) + }) + t.Run("mid_1ms_24h", func(t *testing.T) { + checkHistogram(t, Mid1ms24h, ` +[0s] +[1ms 1.414213ms] +[2ms 2.828427ms] +[4ms 5.656854ms] +[8ms 11.313708ms] +[16ms 22.627416ms] +[32ms 45.254833ms] +[64ms 90.509667ms] +[128ms 181.019335ms] +[256ms 362.038671ms] +[512ms 724.077343ms] +[1.024s 1.448154687s] +[2.048s 2.896309375s] +[4.096s 5.792618751s] +[8.192s 11.585237502s] +[16.384s 23.170475005s] +[32.768s 46.340950011s] +[1m5.536s 1m32.681900023s] +[2m11.072s 3m5.363800047s] +[4m22.144s 6m10.727600094s] +[8m44.288s 12m21.455200189s] +[17m28.576s 24m42.910400378s] +[34m57.152s 49m25.820800757s] +[1h9m54.304s 1h38m51.641601515s] +[2h19m48.608s 3h17m43.283203031s] +[4h39m37.216s 6h35m26.566406062s] +[9h19m14.432s 13h10m53.132812125s] +[18h38m28.864s 26h21m46.265624251s] +[37h16m57.728s 52h43m32.531248503s] +`) + }) + t.Run("mid_to_16k_ints", func(t *testing.T) { + // note: this histogram has some duplicates. + // + // this wastes a bit of memory, but Tally will choose the same index each + // time for a specific value, so it does not cause extra non-zero series + // to be stored or emitted. + // + // if this turns out to be expensive in Prometheus / etc, it's easy + // enough to start the histogram at 8, and dual-emit a non-exponential + // histogram for lower values. + checkHistogram(t, Mid1To16k, ` +[0] +[1 1 1 1] +[2 2 2 3] +[4 4 5 6] +[8 9 11 13] +[16 19 22 26] +[32 38 45 53] +[64 76 90 107] +[128 152 181 215] +[256 304 362 430] +[512 608 724 861] +[1024 1217 1448 1722] +[2048 2435 2896 3444] +[4096 4870 5792 6888] +[8192 9741 11585 13777] +[16384 19483 23170 27554] +[32768 38967 46340 55108] +`) + }) +} + +// most histograms should pass this check, but fuzzy comparison is fine if needed for extreme cases. +func checkHistogram[T any](t *testing.T, h histogrammy[T], expected string) { + var buf strings.Builder + h.print(func(s string, a ...any) { + str := fmt.Sprintf(s, a...) + t.Logf(str) + buf.WriteString(str) + }) + if strings.TrimSpace(expected) != strings.TrimSpace(buf.String()) { + t.Error("histogram definition changed, update the test if this is intended") + } + + buckets := h.buckets() + assert.EqualValues(t, 0, buckets[0], "first bucket should always be zero") + for i := 1; i < len(buckets); i += h.width() { + if i > 1 { + // ensure good float math. + // + // this is *intentionally* doing exact comparisons, as floating point math with + // human-friendly integers have precise power-of-2 multiples for a very long time. + // + // note that the equivalent tally buckets, e.g.: + // tally.MustMakeExponentialDurationBuckets(time.Millisecond, math.Pow(2, 1.0/4.0)) + // fails this test, and the logs produced show ugly e.g. 31.999942ms values. + // tally also produces incorrect results if you start at e.g. 1, + // as it just produces endless `1` values. + assert.Equalf(t, buckets[i-h.width()]*2, buckets[i], + "current row's value (%v) is not a power of 2 greater than previous (%v), skewed / bad math?", + buckets[i-h.width()], buckets[i]) + } + } +} diff --git a/common/metrics/interfaces.go b/common/metrics/interfaces.go index 6efba4b596a..6c5f332453e 100644 --- a/common/metrics/interfaces.go +++ b/common/metrics/interfaces.go @@ -65,6 +65,10 @@ type ( RecordHistogramDuration(timer MetricIdx, d time.Duration) // RecordHistogramValue records a histogram value for the given metric name RecordHistogramValue(timer MetricIdx, value float64) + // ExponentialHistogram records a subsettable exponential histogram value for the given metric name + ExponentialHistogram(hist MetricIdx, d time.Duration) + // IntExponentialHistogram records a subsettable exponential histogram value for the given metric name + IntExponentialHistogram(hist MetricIdx, value int) // UpdateGauge reports Gauge type absolute value metric UpdateGauge(gauge MetricIdx, value float64) // Tagged return an internal scope that can be used to add additional diff --git a/common/metrics/mocks/Scope.go b/common/metrics/mocks/Scope.go index d67661b35c9..d689645f6fe 100644 --- a/common/metrics/mocks/Scope.go +++ b/common/metrics/mocks/Scope.go @@ -54,6 +54,14 @@ func (_m *Scope) RecordHistogramValue(timer metrics.MetricIdx, value float64) { _m.Called(timer, value) } +func (_m *Scope) ExponentialHistogram(hist metrics.MetricIdx, d time.Duration) { + _m.Called(hist, d) +} + +func (_m *Scope) IntExponentialHistogram(hist metrics.MetricIdx, value int) { + _m.Called(hist, value) +} + // RecordTimer provides a mock function with given fields: timer, d func (_m *Scope) RecordTimer(timer metrics.MetricIdx, d time.Duration) { _m.Called(timer, d) diff --git a/common/metrics/nop.go b/common/metrics/nop.go index 631f9f4261a..30bba3a2d13 100644 --- a/common/metrics/nop.go +++ b/common/metrics/nop.go @@ -44,24 +44,19 @@ func NopStopwatch() tally.Stopwatch { type noopClientImpl struct{} -func (n noopClientImpl) IncCounter(scope ScopeIdx, counter MetricIdx) { -} +func (n noopClientImpl) IncCounter(scope ScopeIdx, counter MetricIdx) {} -func (n noopClientImpl) AddCounter(scope ScopeIdx, counter MetricIdx, delta int64) { -} +func (n noopClientImpl) AddCounter(scope ScopeIdx, counter MetricIdx, delta int64) {} func (n noopClientImpl) StartTimer(scope ScopeIdx, timer MetricIdx) tally.Stopwatch { return NoopStopwatch } -func (n noopClientImpl) RecordTimer(scope ScopeIdx, timer MetricIdx, d time.Duration) { -} +func (n noopClientImpl) RecordTimer(scope ScopeIdx, timer MetricIdx, d time.Duration) {} -func (n *noopClientImpl) RecordHistogramDuration(scope ScopeIdx, timer MetricIdx, d time.Duration) { -} +func (n *noopClientImpl) RecordHistogramDuration(scope ScopeIdx, timer MetricIdx, d time.Duration) {} -func (n noopClientImpl) UpdateGauge(scope ScopeIdx, gauge MetricIdx, value float64) { -} +func (n noopClientImpl) UpdateGauge(scope ScopeIdx, gauge MetricIdx, value float64) {} func (n noopClientImpl) Scope(scope ScopeIdx, tags ...Tag) Scope { return NoopScope @@ -74,27 +69,25 @@ func NewNoopMetricsClient() Client { type noopScopeImpl struct{} -func (n *noopScopeImpl) IncCounter(counter MetricIdx) { -} +func (n *noopScopeImpl) IncCounter(counter MetricIdx) {} -func (n *noopScopeImpl) AddCounter(counter MetricIdx, delta int64) { -} +func (n *noopScopeImpl) AddCounter(counter MetricIdx, delta int64) {} func (n *noopScopeImpl) StartTimer(timer MetricIdx) Stopwatch { return NewTestStopwatch() } -func (n *noopScopeImpl) RecordTimer(timer MetricIdx, d time.Duration) { -} +func (n *noopScopeImpl) RecordTimer(timer MetricIdx, d time.Duration) {} -func (n *noopScopeImpl) RecordHistogramDuration(timer MetricIdx, d time.Duration) { -} +func (n *noopScopeImpl) RecordHistogramDuration(timer MetricIdx, d time.Duration) {} -func (n *noopScopeImpl) RecordHistogramValue(timer MetricIdx, value float64) { -} +func (n *noopScopeImpl) RecordHistogramValue(timer MetricIdx, value float64) {} -func (n *noopScopeImpl) UpdateGauge(gauge MetricIdx, value float64) { -} +func (n *noopScopeImpl) ExponentialHistogram(hist MetricIdx, d time.Duration) {} + +func (n *noopScopeImpl) IntExponentialHistogram(hist MetricIdx, value int) {} + +func (n *noopScopeImpl) UpdateGauge(gauge MetricIdx, value float64) {} func (n *noopScopeImpl) Tagged(tags ...Tag) Scope { return n diff --git a/common/metrics/scope.go b/common/metrics/scope.go index 4b91381fab7..9f2746a60e5 100644 --- a/common/metrics/scope.go +++ b/common/metrics/scope.go @@ -110,6 +110,16 @@ func (m *metricsScope) RecordHistogramValue(id MetricIdx, value float64) { } } +func (m *metricsScope) ExponentialHistogram(id MetricIdx, value time.Duration) { + def := m.defs[id] + m.scope.Tagged(def.exponentialBuckets.tags()).Histogram(def.metricName.String(), def.exponentialBuckets.buckets()).RecordDuration(value) +} + +func (m *metricsScope) IntExponentialHistogram(id MetricIdx, value int) { + def := m.defs[id] + m.scope.Tagged(def.intExponentialBuckets.tags()).Histogram(def.metricName.String(), def.intExponentialBuckets.buckets()).RecordDuration(time.Duration(value)) +} + func (m *metricsScope) Tagged(tags ...Tag) Scope { domainTagged := m.isDomainTagged tagMap := make(map[string]string, len(tags)) diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index b6bbb0dd6d5..b2ad2fa5be0 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -470,13 +470,24 @@ func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTa now := ts.Now() mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster)) domainID := replicationTask.HistoryTaskV2Attributes.GetDomainID() + var domainName string if domainID != "" { - domainName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) + cachedName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) if errorDomainName != nil { return errorDomainName } - mScope = mScope.Tagged(metrics.DomainTag(domainName)) + domainName = cachedName } + mScope = mScope.Tagged(metrics.DomainTag(domainName)) // use consistent tags so Prometheus does not break + + // emit single task processing latency + mScope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, now.Sub(startTime)) + // emit latency from task generated to task received + mScope.ExponentialHistogram( + metrics.ExponentialReplicationTaskLatency, + now.Sub(time.Unix(0, replicationTask.GetCreationTime())), + ) + // emit single task processing latency mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime)) // emit latency from task generated to task received diff --git a/service/worker/archiver/replay_metrics_client.go b/service/worker/archiver/replay_metrics_client.go index 3a9fa889287..a34e382f71c 100644 --- a/service/worker/archiver/replay_metrics_client.go +++ b/service/worker/archiver/replay_metrics_client.go @@ -158,6 +158,20 @@ func (r *replayMetricsScope) RecordHistogramValue(timer metrics.MetricIdx, value r.scope.RecordHistogramValue(timer, value) } +func (r *replayMetricsScope) ExponentialHistogram(hist metrics.MetricIdx, d time.Duration) { + if workflow.IsReplaying(r.ctx) { + return + } + r.scope.ExponentialHistogram(hist, d) +} + +func (r *replayMetricsScope) IntExponentialHistogram(hist metrics.MetricIdx, value int) { + if workflow.IsReplaying(r.ctx) { + return + } + r.scope.IntExponentialHistogram(hist, value) +} + // UpdateGauge reports Gauge type absolute value metric func (r *replayMetricsScope) UpdateGauge(gauge metrics.MetricIdx, value float64) { if workflow.IsReplaying(r.ctx) {