Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ type (

// metricDefinition contains the definition for a metric
metricDefinition struct {
metricType MetricType // metric type
metricName MetricName // metric name
metricRollupName MetricName // optional. if non-empty, this name must be used for rolled-up version of this metric
buckets tally.Buckets // buckets if we are emitting histograms
metricType MetricType // metric type
metricName MetricName // metric name
metricRollupName MetricName // optional. if non-empty, this name must be used for rolled-up version of this metric
buckets tally.Buckets // buckets if we are emitting histograms
exponentialBuckets histogrammy[SubsettableHistogram]
intExponentialBuckets histogrammy[IntSubsettableHistogram]
}

// scopeDefinition holds the tag definitions for a scope
Expand Down Expand Up @@ -2433,6 +2435,7 @@ const (
TaskBatchCompleteCounter
TaskBatchCompleteFailure
TaskProcessingLatency
ExponentialTaskProcessingLatency
TaskQueueLatency
ScheduleToStartHistoryQueueLatencyPerTaskList
TaskRequestsOldScheduler
Expand Down Expand Up @@ -2697,6 +2700,7 @@ const (
ReplicationTaskCleanupCount
ReplicationTaskCleanupFailure
ReplicationTaskLatency
ExponentialReplicationTaskLatency
MutableStateChecksumMismatch
MutableStateChecksumInvalidated
FailoverMarkerCount
Expand Down Expand Up @@ -3208,16 +3212,17 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
RingResolverError: {metricName: "ring_resolver_error", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
ExponentialTaskProcessingLatency: {metricName: "task_latency_processing_ns", metricType: Histogram, exponentialBuckets: Low1ms10m},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
ScheduleToStartHistoryQueueLatencyPerTaskList: {metricName: "schedule_to_start_history_queue_latency_per_tl", metricType: Timer},
TaskRequestsOldScheduler: {metricName: "task_requests_old_scheduler", metricType: Counter},
TaskRequestsNewScheduler: {metricName: "task_requests_new_scheduler", metricType: Counter},
Expand Down Expand Up @@ -3474,6 +3479,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ReplicationTaskCleanupCount: {metricName: "replication_task_cleanup_count", metricType: Counter},
ReplicationTaskCleanupFailure: {metricName: "replication_task_cleanup_failed", metricType: Counter},
ReplicationTaskLatency: {metricName: "replication_task_latency", metricType: Timer},
ExponentialReplicationTaskLatency: {metricName: "replication_task_latency_ns", metricType: Histogram, exponentialBuckets: Mid1ms24h},
MutableStateChecksumMismatch: {metricName: "mutable_state_checksum_mismatch", metricType: Counter},
MutableStateChecksumInvalidated: {metricName: "mutable_state_checksum_invalidated", metricType: Counter},
FailoverMarkerCount: {metricName: "failover_marker_count", metricType: Counter},
Expand Down
18 changes: 18 additions & 0 deletions common/metrics/defs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"math"
"regexp"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -187,6 +188,23 @@ func TestMetricsAreUnique(t *testing.T) {
})
}

func TestHistogramSuffixes(t *testing.T) {
for _, serviceMetrics := range MetricDefs {
for _, def := range serviceMetrics {
if def.intExponentialBuckets != nil {
if !strings.HasSuffix(def.metricName.String(), "_counts") {
t.Errorf("int-exponential-histogram metric %q should have suffix \"_counts\"", def.metricName.String())
}
}
if def.exponentialBuckets != nil {
if !strings.HasSuffix(def.metricName.String(), "_ns") {
t.Errorf("exponential-histogram metric %q should have suffix \"_ns\"", def.metricName.String())
}
}
}
}
}

func TestExponentialDurationBuckets(t *testing.T) {
factor := math.Pow(2, 0.25)
assert.Equal(t, 80, len(ExponentialDurationBuckets))
Expand Down
276 changes: 276 additions & 0 deletions common/metrics/histograms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package metrics

import (
"fmt"
"math"
"slices"
"strconv"
"time"

"github.com/uber-go/tally"
)

// Nearly all histograms should use pre-defined buckets here, to encourage consistency.
//
// Name them more by their intent than their exact ranges, because the ranges will be adjusted to
// add a small bit of safety buffer and round to a nicely-divisible count.
// For this reason, if you need something outside the intent but within the technical boundaries,
// you should probably make a new bucket instead so you have more of an unexpected-value safety buffer.
var (
// Default1ms10m is our "default" set of buckets, targeting at least 1ms through 100s,
// and is "rounded up" slightly to reach 80 buckets (100s needs 68 buckets),
// plus multi-minute exceptions are common enough to support for the small additional cost.
//
// That makes this *relatively* costly, but hopefully good enough for most metrics without
// needing any careful thought. If this proves incorrect, it will be down-scaled without
// checking existing uses, so make sure to use something else if you require a certain level
// of precision.
//
// If you need sub-millisecond precision, or substantially longer durations than about 1 minute,
// consider using a different histogram.
Default1ms10m = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool {
return last >= 10*time.Minute && length == 80
})

// Low1ms10m is a half-resolution version of Default1ms10m, intended for use any time the default
// is expected to be more costly than we feel comfortable with. Or for automatic down-scaling
// at runtime via dynamic config, if that's ever built.
//
// This uses only 41 buckets, so it's likely good enough for most <10m purposes, e.g. database operations.
// E.g. reducing to "1ms to 10s" seems reasonable, but that still needs ~32 buckets, which isn't a big savings
// and reduces our ability to notice abnormally slow events.
Low1ms10m = Default1ms10m.subsetTo(1)

// High1ms24h covers things like activity latency, where ranges are very large but
// "longer than 1 day" is not particularly worth being precise about.
//
// This uses a lot of buckets, so it must be used with relatively-low cardinality elsewhere,
// and/or consider dual emitting (Mid1ms24h or lower) so overviews can be queried efficiently.
High1ms24h = makeSubsettableHistogram(time.Millisecond, 2, func(last time.Duration, length int) bool {
// 24h leads to 108 buckets, raise to 112 for better subsetting
return last >= 24*time.Hour && length == 112
})

// Mid1ms24h is a one-scale-lower version of High1ms24h,
// for use when we know it's too detailed to be worth emitting.
//
// This uses 57 buckets, half of High1ms24h's 113
Mid1ms24h = High1ms24h.subsetTo(1)

// Mid1To32k is a histogram for small counters, like "how many replication tasks did we receive".
// This targets 1 to ~10k with some buffer, and ends just below 64k (so do not rely on it for 64k, make a new histogram)
Mid1To32k = IntSubsettableHistogram(makeSubsettableHistogram(1, 2, func(last time.Duration, length int) bool {
// 10k needs 56 buckets and is uncomfortably close to the limit for e.g.
// replication batch sizes, raise to 64 for better subsetting
return last >= 10000 && length == 64
}))
)

// SubsettableHistogram is a duration-based histogram that can be subset to a lower scale
// in a standardized, predictable way. It is intentionally compatible with Prometheus and OTEL's
// "exponential histograms": https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
//
// These histograms MUST always have a "_ns" suffix in their name to avoid confusion with timers.
type SubsettableHistogram struct {
tallyBuckets tally.DurationBuckets

scale int
}

// IntSubsettableHistogram is a non-duration-based integer-distribution histogram, otherwise identical
// to SubsettableHistogram but built as a separate type so you cannot pass the wrong one.
//
// These histograms MUST always have a "_counts" suffix in their name to avoid confusion with timers.
type IntSubsettableHistogram SubsettableHistogram

// currently we have no apparent need for float-histograms,
// as all our value ranges go from >=1 to many thousands, where
// decimal precision is pointless and mostly just looks bad.
//
// if we ever need 0..1 precision in histograms, we can add them then.
// type FloatSubsettableHistogram struct {
// tally.ValueBuckets
// scale int
// }

func (s SubsettableHistogram) subsetTo(newScale int) SubsettableHistogram {
if newScale >= s.scale {
panic(fmt.Sprintf("scale %v is not less than the current scale %v", newScale, s.scale))
}
if newScale < 0 {
panic(fmt.Sprintf("negative scales (%v == greater than *2 per step) are possible, but do not have tests yet", newScale))
}
dup := SubsettableHistogram{
tallyBuckets: slices.Clone(s.tallyBuckets),
scale: s.scale,
}
// compress every other bucket per -1 scale
for dup.scale > newScale {
if (len(dup.tallyBuckets)-1)%2 != 0 {
panic(fmt.Sprintf("cannot subset from scale %v to %v, %v-buckets is not divisible by 2", dup.scale, dup.scale-1, len(dup.tallyBuckets)-1))
}
half := make(tally.DurationBuckets, 0, ((len(dup.tallyBuckets)-1)/2)+1)
half = append(half, dup.tallyBuckets[0]) // keep the zero value intact
for i := 1; i < len(dup.tallyBuckets); i += 2 {
half = append(half, dup.tallyBuckets[i]) // compress the first, third, etc
}
dup.tallyBuckets = half
dup.scale--
}
return dup
}

func (i IntSubsettableHistogram) subsetTo(newScale int) IntSubsettableHistogram {
return IntSubsettableHistogram(SubsettableHistogram(i).subsetTo(newScale))
}

func (s SubsettableHistogram) tags() map[string]string {
return map[string]string{
"histogram_start": s.start().String(),
"histogram_end": s.end().String(),
"histogram_scale": strconv.Itoa(s.scale),
}
}

func (i IntSubsettableHistogram) tags() map[string]string {
return map[string]string{
"histogram_start": strconv.Itoa(int(i.start())),
"histogram_end": strconv.Itoa(int(i.end())),
"histogram_scale": strconv.Itoa(i.scale),
}
}

// makeSubsettableHistogram is a replacement for tally.MustMakeExponentialDurationBuckets,
// tailored to make "construct a range" or "ensure N buckets" simpler for OTEL-compatible exponential histograms
// (i.e. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram).
//
// It ensures values are as precise as possible (preventing display-space-wasteful numbers like 3.99996ms),
// that all histograms start with a 0 value (else erroneous negative values are impossible to notice),
// and avoids some bugs with low values (tally.MustMakeExponentialDurationBuckets misbehaves for small numbers,
// causing all buckets to == the minimum value).
//
// Start time must be positive, scale must be 0..3 (inclusive), and the loop MUST stop within 320 or fewer steps,
// to prevent extremely-costly histograms.
// Even 320 is far too many tbh, target a max of 160 for very-high-value data, and generally around 80 or less
// (ideally a value that is divisible by 2 many times, to allow "clean" subsetting, but this is NOT necessary).
//
// The stop callback will be given the current largest value and the number of values generated so far.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the stop function documentation here, but I'd recommend moving it to a type:

// StopFunction has nice documentation
type StopFunction func(last time.Duration, length int) bool

I think this might increase readability, and I'd recommend adding some examples to the documentation (e.g the ones above) just for extra clarity.

For me - why wouldn't we always force a user to a power of 2, if we're going to fill it post-fact anyway?

NIT: Would maxDuration or maxDurationNanos make sense rather than last as a parameter? I guess we would only want to do that if we were planning to force a specific comparison, which the stop function currently doesn't do.

Copy link
Member Author

@Groxx Groxx Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force a clean power of 2: yea, I think I like that better too. will do 👍

maxDuration / maxDurationNanos: not sure what you mean here tbh. this isn't called once, it's called per value, and it's already a time.Duration

Copy link
Member Author

@Groxx Groxx Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re docs on a type: I don't believe any IDEs will show that to you as you fill in the argument, unless you make a var x StopFunction variable - function arg type docs are generally not exposed, just the names and the "parent" func's docs:

goland while entering the arg:
Image

vscode:
Image

and generally for small funcs like this I think having to look at two locations to figure out how to call the callback-needing func is worse...

... though I see IDEs are finally showing you the signature inline, rather than just the name, so that's not too bad (unless this is a local LLM, then it's awful because that's not reliable):

Image Image

in principle I kinda agree, the more types the merrier. ergonomics are just not all that good tho for this kind of case imo, unless there's a lot of reuse or func-as-var usage (where the type would prevent mixing up types) :\

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood re: type and am happy with the more ergonomic editor usage.

maxDuration / maxDurationNanos: not sure what you mean here tbh. this isn't called once, it's called per value, and it's already a time.Duration

I had a couple of mystery logical leaps in my thinking here. At the moment the stop function gives the user arbitrary power to define how they want to stop, given the duration of the most recently appended bucket + how many buckets exist.

All the examples are just comparisons with:

  • the length of the list
  • whether the 'last' duration is greater than or equal to some const

If that's all we want to support, I'd suggest creating the stop function for the user by giving them a method that allows them to specify a max value + a max length:

func CreateStopFunction(maxDuration time.Duration, maxLength int) StopFunction {
  return func(last time.Duration, l int) bool {
    return last >= maxDuration && l >= maxLength
  }
}

Or just allowing them to specify maxDuration/maxLength and doing that comparison where the existing stop function is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I'm going to CreateStopFunction, I'll just have those two args passed directly and use it that way internally.

... which I think you're convincing me of, in side-chat. lemme see how it looks 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated! I like this better, thanks for bringing it up and pushing on me :)

// This excludes the zero value (you should ignore it anyway), so you can stop at your target value,
// or == a highly-divisible-by-2 number (do not go over).
// The buckets produced may be padded further to reach a "full" power-of-2 row, as this simplifies math elsewhere
// and costs very little compared to the rest of the histogram.
//
// For all values produced, please add a test to print the concrete values, and record the length so they
// can be quickly checked when reading.
func makeSubsettableHistogram(start time.Duration, scale int, stop func(last time.Duration, count int) bool) SubsettableHistogram {
if start <= 0 {
panic(fmt.Sprintf("start must be greater than 0 or it will not grow exponentially, got %v", start))
}
if scale < 0 || scale > 3 {
// anything outside this range is currently not expected and probably a mistake,
// but any value is technically sound.
panic(fmt.Sprintf("scale must be between 0 (grows by *2) and 3 (grows by *2^1/8), got scale: %v", scale))
}
var buckets tally.DurationBuckets
for {
if len(buckets) > 320 {
panic(fmt.Sprintf("over 320 buckets is too many, choose a smaller range or smaller scale. "+
"started at: %v, scale: %v, last value: %v",
start, scale, last(buckets)))
}
buckets = append(buckets, nextBucket(start, len(buckets), scale))

// stop when requested.
if stop(last(buckets), len(buckets)) {
break
}
}

// make sure the number of buckets completes a "full" row,
// i.e. the next bucket would be a power of 2 of the start value.
//
// for a more visual example of what this means, see the logged strings in tests:
// each "row" of values must be the same width.
//
// adding a couple buckets to ensure this is met costs very little, and ensures
// subsetting combines values more consistently (not crossing rows) for longer.
powerOfTwoWidth := int(math.Pow(2, float64(scale))) // num of buckets needed to double a value
missing := len(buckets) % powerOfTwoWidth
if missing != 0 {
panic(fmt.Sprintf(`number of buckets must "fill" a power of 2 to end at a consistent row width. `+
`got %d, probably raise to %d`,
len(buckets), len(buckets)+missing))
}
return SubsettableHistogram{
tallyBuckets: append(
// always include a zero value at the beginning, so negative values are noticeable ("-inf to 0" bucket)
tally.DurationBuckets{0},
buckets...,
),
scale: scale,
}
}

// last-item-in-slice helper to eliminate some magic `-1`s
func last[T any, X ~[]T](s X) T {
return s[len(s)-1]
}

func nextBucket(start time.Duration, num int, scale int) time.Duration {
// calculating it from `start` each time reduces floating point error, ensuring "clean" multiples
// at every power of 2 (and possibly others), instead of e.g. "1ms ... 1.9999994ms" which occurs
// if you try to build from the previous value each time.
return time.Duration(
float64(start) *
math.Pow(2, float64(num)/math.Pow(2, float64(scale))))
}

func (s SubsettableHistogram) histScale() int { return s.scale }
func (s SubsettableHistogram) width() int { return int(math.Pow(2, float64(s.scale))) }
func (s SubsettableHistogram) len() int { return len(s.tallyBuckets) }
func (s SubsettableHistogram) start() time.Duration { return s.tallyBuckets[1] }
func (s SubsettableHistogram) end() time.Duration { return s.tallyBuckets[len(s.tallyBuckets)-1] }
func (s SubsettableHistogram) buckets() tally.DurationBuckets { return s.tallyBuckets }
func (s SubsettableHistogram) print(to func(string, ...any)) {
to("%v\n", s.tallyBuckets[0:1]) // zero value on its own row
for rowStart := 1; rowStart < s.len(); rowStart += s.width() {
to("%v\n", s.tallyBuckets[rowStart:rowStart+s.width()])
}
}

func (i IntSubsettableHistogram) histScale() int { return i.scale }
func (i IntSubsettableHistogram) width() int { return int(math.Pow(2, float64(i.scale))) }
func (i IntSubsettableHistogram) len() int { return len(i.tallyBuckets) }
func (i IntSubsettableHistogram) start() time.Duration { return i.tallyBuckets[1] }
func (i IntSubsettableHistogram) end() time.Duration {
return i.tallyBuckets[len(i.tallyBuckets)-1]
}
func (i IntSubsettableHistogram) buckets() tally.DurationBuckets { return i.tallyBuckets }
func (i IntSubsettableHistogram) print(to func(string, ...any)) {
// fairly unreadable as duration-strings, so convert to int by hand
to("[%d]\n", int(i.tallyBuckets[0])) // zero value on its own row
for rowStart := 1; rowStart < i.len(); rowStart += i.width() {
ints := make([]int, 0, i.width())
for _, d := range i.tallyBuckets[rowStart : rowStart+i.width()] {
ints = append(ints, int(d))
}
to("%v\n", ints)
}
}

var _ histogrammy[SubsettableHistogram] = SubsettableHistogram{}
var _ histogrammy[IntSubsettableHistogram] = IntSubsettableHistogram{}

// internal utility/test methods, but could be exposed if there's a use for it
type histogrammy[T any] interface {
histScale() int // exponential scale value. 0..3 inclusive.
width() int // number of values per power of 2 == how wide to print each row. 1, 2, 4, or 8.
len() int // number of buckets
start() time.Duration // first non-zero bucket
end() time.Duration // last bucket
buckets() tally.DurationBuckets // access to all buckets
subsetTo(newScale int) T // generic so specific types can be returned
tags() map[string]string // start, end, and scale tags that need to be implicitly added

print(to func(string, ...any)) // test-oriented printer
}
Loading
Loading