Skip to content

Commit c8e3897

Browse files
dashpoleMrAlias
andauthored
Use sync.Map and atomics to improve sum performance (#7427)
Alternative to #7380 This uses a sync.Map and atomics for the sum's counter value. This intentionally introduces a new race condition that didn't previously exist: * It is possible for the exemplar to be recorded in the batch of metrics after the add() for cumulative sum aggregations. For cumulative, this isn't a huge issue since exemplars are expected to persist across collection cycles. This is difficult to fix because we can't manage the internal storage of an exemplar.Reservoir (to atomically swap between hot and cold storage). If we are able to make assumptions about how exemplar reservoirs are managed (i.e. that the number of and order of exemplars returned is always the same), then we could possibly fix this by merging at export time. ### Alternatives Considered #### RWLock for the map instead of sync.Map This is significantly less performant. #### Single sync.Map without hotColdWaitGroup Deleting keys from the sync.Map concurrently with measurements (during Clear() of the sync.Map) can cause measurements to be made to a counter that has already been read, exported and deleted. This can produce incorrect sums when delta is used. Instead, atomically switching writes to a completely empty sync.Map and waiting for writes to the previous sync.Map complete eliminates this issue. #### Use two sync.Map for cumulative sums One idea I explored was doing a hot-cold swap for cumulative sums just like we do for delta sums. We would swap the hot and cold sync.Maps, wait for writes to the cold sync.Map to complete while new writes go to the hot map. Then, once we are done reading the cold map, we could merge the contents of the cold map back into the new hot map. This approach has two issues: * It isn't possible to "merge" one exemplar reservoir into another. This is an issue for persistent exemplars that aren't overwritten in a collection interval. * We can't keep a consistent set of keys in overflow scenarios. Measurements that are made to the hot map before the merge of the cold into hot that should have been overflows will be added as new attribute sets. That, in turn, means we will need to change previously-exported attribute sets to the overflow set, which will cause issues for users. ### Benchmarks Parallel: ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: AMD EPYC 7B12 │ main24.txt │ new24_new.txt │ │ sec/op │ sec/op vs base │ SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/0-24 255.65n ± 13% 68.06n ± 3% -73.38% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/1-24 286.70n ± 8% 67.66n ± 4% -76.40% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/10-24 287.15n ± 14% 69.90n ± 3% -75.66% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/0-24 244.75n ± 9% 68.83n ± 4% -71.88% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/1-24 267.20n ± 14% 65.86n ± 3% -75.35% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/10-24 291.50n ± 13% 66.59n ± 11% -77.15% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/0-24 247.85n ± 7% 66.06n ± 3% -73.34% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/1-24 286.75n ± 10% 68.52n ± 2% -76.10% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/10-24 289.50n ± 20% 67.45n ± 4% -76.70% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/0-24 246.25n ± 14% 66.69n ± 2% -72.92% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/1-24 289.55n ± 9% 65.54n ± 5% -77.36% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/10-24 286.05n ± 14% 67.55n ± 2% -76.39% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/0-24 254.8n ± 23% 225.9n ± 17% -11.32% (p=0.026 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/1-24 304.4n ± 13% 234.4n ± 19% -23.01% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/10-24 308.9n ± 20% 217.6n ± 10% -29.56% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/0-24 267.8n ± 14% 220.1n ± 19% -17.80% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/1-24 274.1n ± 21% 226.5n ± 5% -17.38% (p=0.024 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/10-24 239.0n ± 14% 236.1n ± 18% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/0-24 223.7n ± 11% 234.8n ± 7% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/1-24 253.9n ± 10% 244.8n ± 11% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10-24 272.6n ± 7% 250.0n ± 12% -8.33% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/0-24 232.6n ± 4% 232.2n ± 8% ~ (p=0.937 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/1-24 276.7n ± 20% 249.2n ± 11% ~ (p=0.485 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10-24 265.9n ± 18% 246.4n ± 9% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/0-24 294.0n ± 11% 269.0n ± 5% -8.47% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/1-24 314.6n ± 10% 268.8n ± 6% -14.54% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/10-24 303.9n ± 11% 285.4n ± 4% ~ (p=0.180 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/0-24 274.7n ± 13% 262.9n ± 7% ~ (p=0.145 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/1-24 296.1n ± 6% 288.9n ± 9% ~ (p=0.180 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/10-24 276.0n ± 14% 299.4n ± 12% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/0-24 191.4n ± 4% 176.0n ± 3% -8.05% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/1-24 223.2n ± 8% 172.8n ± 3% -22.54% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/10-24 265.7n ± 19% 172.2n ± 2% -35.21% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/0-24 179.4n ± 18% 171.0n ± 3% -4.74% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/1-24 209.1n ± 16% 175.4n ± 5% -16.07% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/10-24 222.5n ± 17% 175.6n ± 4% -21.08% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/0-24 194.4n ± 11% 176.9n ± 5% -9.03% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/1-24 207.5n ± 13% 175.1n ± 2% -15.66% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/10-24 243.7n ± 13% 172.6n ± 3% -29.15% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/0-24 218.3n ± 10% 177.6n ± 2% -18.67% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/1-24 193.5n ± 10% 176.1n ± 2% -8.99% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/10-24 192.8n ± 11% 173.7n ± 2% -9.91% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/0-24 185.1n ± 9% 204.8n ± 9% +10.61% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/1-24 218.8n ± 14% 229.7n ± 16% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/10-24 242.7n ± 8% 209.1n ± 18% -13.84% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/0-24 182.8n ± 42% 255.2n ± 8% +39.67% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/1-24 198.0n ± 7% 280.6n ± 22% +41.72% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/10-24 236.3n ± 18% 261.7n ± 8% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/0-24 223.2n ± 9% 226.9n ± 4% ~ (p=0.965 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/1-24 270.1n ± 10% 280.2n ± 6% ~ (p=0.143 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/10-24 257.2n ± 7% 252.0n ± 7% ~ (p=0.485 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/0-24 277.0n ± 5% 310.4n ± 12% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/1-24 287.3n ± 9% 271.2n ± 12% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/10-24 281.8n ± 9% 316.5n ± 22% +12.29% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/0-24 289.1n ± 9% 297.1n ± 12% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/1-24 277.8n ± 6% 353.1n ± 11% +27.11% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/10-24 281.8n ± 11% 352.2n ± 16% +24.94% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/0-24 294.1n ± 7% 317.5n ± 9% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/1-24 281.7n ± 10% 332.1n ± 8% +17.89% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/10-24 238.9n ± 12% 318.1n ± 9% +33.13% (p=0.002 n=6) geomean 251.9n 184.4n -26.77% ``` Single-threaded: ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: Intel(R) Xeon(R) CPU @ 2.20GHz │ main1.txt │ sync1.txt │ │ sec/op │ sec/op vs base │ SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/0 109.8n ± 7% 113.4n ± 23% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/1 115.0n ± 4% 113.3n ± 20% ~ (p=0.729 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/10 177.1n ± 34% 110.2n ± 16% -37.78% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/0 110.5n ± 42% 109.2n ± 19% ~ (p=0.457 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/1 118.8n ± 2% 118.4n ± 5% ~ (p=0.619 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/10 119.0n ± 2% 116.8n ± 42% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/0 106.9n ± 1% 102.5n ± 5% -4.16% (p=0.030 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/1 117.2n ± 2% 116.9n ± 7% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/10 115.4n ± 1% 115.1n ± 5% ~ (p=0.937 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/0 109.5n ± 5% 104.2n ± 8% -4.84% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/1 118.7n ± 14% 113.8n ± 35% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/10 116.6n ± 1% 116.8n ± 8% ~ (p=0.968 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/0 106.6n ± 4% 109.4n ± 5% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/1 114.7n ± 4% 117.9n ± 4% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/10 115.2n ± 4% 114.5n ± 1% ~ (p=0.162 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/0 109.4n ± 5% 107.5n ± 3% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/1 118.3n ± 2% 117.9n ± 3% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/10 117.7n ± 2% 120.8n ± 14% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/0 96.78n ± 1% 99.37n ± 3% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/1 103.0n ± 3% 116.5n ± 26% +13.16% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10 102.8n ± 1% 107.6n ± 22% +4.67% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/0 93.95n ± 22% 99.88n ± 18% +6.32% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/1 102.7n ± 5% 106.2n ± 6% ~ (p=0.089 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10 104.1n ± 4% 108.3n ± 27% +4.03% (p=0.026 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/0 146.3n ± 1% 154.0n ± 24% +5.23% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/1 154.8n ± 3% 161.2n ± 2% +4.20% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/10 155.5n ± 1% 164.0n ± 4% +5.43% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/0 145.9n ± 2% 159.7n ± 12% +9.42% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/1 155.2n ± 0% 164.0n ± 6% +5.70% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/10 219.3n ± 29% 159.5n ± 3% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/0 263.6n ± 36% 177.2n ± 1% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/1 189.1n ± 8% 190.4n ± 12% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/10 184.3n ± 3% 189.4n ± 6% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/0 180.7n ± 1% 182.7n ± 2% ~ (p=0.457 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/1 192.8n ± 9% 192.0n ± 1% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/10 192.3n ± 4% 190.2n ± 4% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/0 176.5n ± 2% 181.7n ± 4% +2.95% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/1 184.0n ± 4% 192.0n ± 1% +4.32% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/10 184.4n ± 1% 195.2n ± 3% +5.83% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/0 183.0n ± 3% 177.4n ± 5% -3.06% (p=0.048 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/1 194.4n ± 4% 188.1n ± 5% ~ (p=0.084 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/10 193.0n ± 5% 194.1n ± 5% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/0 178.4n ± 14% 185.6n ± 29% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/1 189.0n ± 8% 193.2n ± 2% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/10 197.7n ± 5% 198.8n ± 2% ~ (p=0.619 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/0 185.5n ± 3% 188.8n ± 4% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/1 191.2n ± 3% 190.2n ± 7% ~ (p=0.732 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/10 186.8n ± 2% 197.1n ± 6% +5.54% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/0 224.2n ± 4% 227.3n ± 2% ~ (p=0.394 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/1 232.5n ± 3% 242.5n ± 5% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/10 232.5n ± 3% 237.1n ± 5% +2.00% (p=0.045 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/0 227.5n ± 2% 238.5n ± 5% +4.81% (p=0.017 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/1 239.4n ± 8% 250.1n ± 6% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/10 241.5n ± 4% 254.0n ± 2% +5.18% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/0 231.1n ± 5% 239.2n ± 3% ~ (p=0.084 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/1 260.2n ± 16% 253.8n ± 4% ~ (p=0.190 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/10 234.3n ± 1% 246.8n ± 2% +5.29% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/0 221.8n ± 6% 232.0n ± 4% +4.58% (p=0.037 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/1 228.2n ± 7% 240.6n ± 1% +5.41% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/10 228.6n ± 7% 244.7n ± 1% +7.04% (p=0.015 n=6) geomean 158.1n 158.1n +0.00% ``` --------- Co-authored-by: Tyler Yahn <[email protected]>
1 parent cfd8570 commit c8e3897

File tree

14 files changed

+437
-121
lines changed

14 files changed

+437
-121
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4444
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
4545
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
4646
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
47+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
4748

4849
<!-- Released section -->
4950
<!-- Don't change this section unless doing release -->

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"math"
99
"math/rand/v2"
10+
"sync"
1011
"time"
1112

1213
"go.opentelemetry.io/otel/attribute"
@@ -37,6 +38,7 @@ var _ Reservoir = &FixedSizeReservoir{}
3738
type FixedSizeReservoir struct {
3839
reservoir.ConcurrentSafe
3940
*storage
41+
mu sync.Mutex
4042

4143
// count is the number of measurement seen.
4244
count int64
@@ -192,6 +194,8 @@ func (r *FixedSizeReservoir) advance() {
192194
//
193195
// The Reservoir state is preserved after this call.
194196
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
197+
r.mu.Lock()
198+
defer r.mu.Unlock()
195199
r.storage.Collect(dest)
196200
// Call reset here even though it will reset r.count and restart the random
197201
// number series. This will persist any old exemplars as long as no new

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"slices"
99
"sort"
10+
"sync"
1011
"time"
1112

1213
"go.opentelemetry.io/otel/attribute"
@@ -42,6 +43,7 @@ var _ Reservoir = &HistogramReservoir{}
4243
type HistogramReservoir struct {
4344
reservoir.ConcurrentSafe
4445
*storage
46+
mu sync.Mutex
4547

4648
// bounds are bucket bounds in ascending order.
4749
bounds []float64
@@ -76,3 +78,12 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
7678
defer r.mu.Unlock()
7779
r.store(idx, m)
7880
}
81+
82+
// Collect returns all the held exemplars.
83+
//
84+
// The Reservoir state is preserved after this call.
85+
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
86+
r.mu.Lock()
87+
defer r.mu.Unlock()
88+
r.storage.Collect(dest)
89+
}

sdk/metric/exemplar/storage.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
55

66
import (
77
"context"
8-
"sync"
98
"time"
109

1110
"go.opentelemetry.io/otel/attribute"
@@ -14,7 +13,6 @@ import (
1413

1514
// storage is an exemplar storage for [Reservoir] implementations.
1615
type storage struct {
17-
mu sync.Mutex
1816
// measurements are the measurements sampled.
1917
//
2018
// This does not use []metricdata.Exemplar because it potentially would
@@ -34,8 +32,6 @@ func (r *storage) store(idx int, m measurement) {
3432
//
3533
// The Reservoir state is preserved after this call.
3634
func (r *storage) Collect(dest *[]Exemplar) {
37-
r.mu.Lock()
38-
defer r.mu.Unlock()
3935
*dest = reset(*dest, len(r.measurements), len(r.measurements))
4036
var n int
4137
for _, m := range r.measurements {

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,13 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
110110

111111
// Sum returns a sum aggregate function input and output.
112112
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
113-
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
114113
switch b.Temporality {
115114
case metricdata.DeltaTemporality:
116-
return b.filter(s.measure), s.delta
115+
s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc())
116+
return b.filter(s.measure), s.collect
117117
default:
118-
return b.filter(s.measure), s.cumulative
118+
s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc())
119+
return b.filter(s.measure), s.collect
119120
}
120121
}
121122

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"runtime"
9+
"sync"
10+
"sync/atomic"
11+
12+
"go.opentelemetry.io/otel/attribute"
13+
)
14+
15+
// atomicCounter is an efficient way of adding to a number which is either an
16+
// int64 or float64. It is designed to be efficient when adding whole
17+
// numbers, regardless of whether N is an int64 or float64.
18+
//
19+
// Inspired by the Prometheus counter implementation:
20+
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
21+
type atomicCounter[N int64 | float64] struct {
22+
// nFloatBits contains only the non-integer portion of the counter.
23+
nFloatBits atomic.Uint64
24+
// nInt contains only the integer portion of the counter.
25+
nInt atomic.Int64
26+
}
27+
28+
// load returns the current value. The caller must ensure all calls to add have
29+
// returned prior to calling load.
30+
func (n *atomicCounter[N]) load() N {
31+
fval := math.Float64frombits(n.nFloatBits.Load())
32+
ival := n.nInt.Load()
33+
return N(fval + float64(ival))
34+
}
35+
36+
func (n *atomicCounter[N]) add(value N) {
37+
ival := int64(value)
38+
// This case is where the value is an int, or if it is a whole-numbered float.
39+
if float64(ival) == float64(value) {
40+
n.nInt.Add(ival)
41+
return
42+
}
43+
44+
// Value must be a float below.
45+
for {
46+
oldBits := n.nFloatBits.Load()
47+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
48+
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
49+
return
50+
}
51+
}
52+
}
53+
54+
// hotColdWaitGroup is a synchronization primitive which enables lockless
55+
// writes for concurrent writers and enables a reader to acquire exclusive
56+
// access to a snapshot of state including only completed operations.
57+
// Conceptually, it can be thought of as a "hot" wait group,
58+
// and a "cold" wait group, with the ability for the reader to atomically swap
59+
// the hot and cold wait groups, and wait for the now-cold wait group to
60+
// complete.
61+
//
62+
// Inspired by the prometheus/client_golang histogram implementation:
63+
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
64+
//
65+
// Usage:
66+
//
67+
// var hcwg hotColdWaitGroup
68+
// var data [2]any
69+
//
70+
// func write() {
71+
// hotIdx := hcwg.start()
72+
// defer hcwg.done(hotIdx)
73+
// // modify data without locking
74+
// data[hotIdx].update()
75+
// }
76+
//
77+
// func read() {
78+
// coldIdx := hcwg.swapHotAndWait()
79+
// // read data now that all writes to the cold data have completed.
80+
// data[coldIdx].read()
81+
// }
82+
type hotColdWaitGroup struct {
83+
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
84+
// and a 1 bit hot index to denote which of the two data-points new
85+
// measurements to write to. These are contained together so that read()
86+
// can atomically swap the hot bit, reset the started writes to zero, and
87+
// read the number writes that were started prior to the hot bit being
88+
// swapped.
89+
startedCountAndHotIdx atomic.Uint64
90+
// endedCounts is the number of writes that have completed to each
91+
// dataPoint.
92+
endedCounts [2]atomic.Uint64
93+
}
94+
95+
// start returns the hot index that the writer should write to. The returned
96+
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
97+
// its operation. start() is safe to call concurrently with other methods.
98+
func (l *hotColdWaitGroup) start() uint64 {
99+
// We increment h.startedCountAndHotIdx so that the counter in the lower
100+
// 63 bits gets incremented. At the same time, we get the new value
101+
// back, which we can use to return the currently-hot index.
102+
return l.startedCountAndHotIdx.Add(1) >> 63
103+
}
104+
105+
// done signals to the reader that an operation has fully completed.
106+
// done is safe to call concurrently.
107+
func (l *hotColdWaitGroup) done(hotIdx uint64) {
108+
l.endedCounts[hotIdx].Add(1)
109+
}
110+
111+
// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
112+
// and then returns the now-cold index for the reader to read from. The
113+
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
114+
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
115+
n := l.startedCountAndHotIdx.Load()
116+
coldIdx := (^n) >> 63
117+
// Swap the hot and cold index while resetting the started measurements
118+
// count to zero.
119+
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
120+
hotIdx := n >> 63
121+
startedCount := n & ((1 << 63) - 1)
122+
// Wait for all measurements to the previously-hot map to finish.
123+
for startedCount != l.endedCounts[hotIdx].Load() {
124+
runtime.Gosched() // Let measurements complete.
125+
}
126+
// reset the number of ended operations
127+
l.endedCounts[hotIdx].Store(0)
128+
return hotIdx
129+
}
130+
131+
// limitedSyncMap is a sync.Map which enforces the aggregation limit on
132+
// attribute sets and provides a Len() function.
133+
type limitedSyncMap struct {
134+
sync.Map
135+
aggLimit int
136+
len int
137+
lenMux sync.Mutex
138+
}
139+
140+
func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any {
141+
actual, loaded := m.Load(fltrAttr.Equivalent())
142+
if loaded {
143+
return actual
144+
}
145+
// If the overflow set exists, assume we have already overflowed and don't
146+
// bother with the slow path below.
147+
actual, loaded = m.Load(overflowSet.Equivalent())
148+
if loaded {
149+
return actual
150+
}
151+
// Slow path: add a new attribute set.
152+
m.lenMux.Lock()
153+
defer m.lenMux.Unlock()
154+
155+
// re-fetch now that we hold the lock to ensure we don't use the overflow
156+
// set unless we are sure the attribute set isn't being written
157+
// concurrently.
158+
actual, loaded = m.Load(fltrAttr.Equivalent())
159+
if loaded {
160+
return actual
161+
}
162+
163+
if m.aggLimit > 0 && m.len >= m.aggLimit-1 {
164+
fltrAttr = overflowSet
165+
}
166+
actual, loaded = m.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr))
167+
if !loaded {
168+
m.len++
169+
}
170+
return actual
171+
}
172+
173+
func (m *limitedSyncMap) Clear() {
174+
m.lenMux.Lock()
175+
defer m.lenMux.Unlock()
176+
m.len = 0
177+
m.Map.Clear()
178+
}
179+
180+
func (m *limitedSyncMap) Len() int {
181+
m.lenMux.Lock()
182+
defer m.lenMux.Unlock()
183+
return m.len
184+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) {
16+
var wg sync.WaitGroup
17+
var aSum atomicCounter[float64]
18+
for _, in := range []float64{
19+
0.2,
20+
0.25,
21+
1.6,
22+
10.55,
23+
42.4,
24+
} {
25+
wg.Add(1)
26+
go func() {
27+
defer wg.Done()
28+
aSum.add(in)
29+
}()
30+
}
31+
wg.Wait()
32+
assert.Equal(t, float64(55), math.Round(aSum.load()))
33+
}
34+
35+
func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
36+
var wg sync.WaitGroup
37+
var aSum atomicCounter[int64]
38+
for _, in := range []int64{
39+
1,
40+
2,
41+
3,
42+
4,
43+
5,
44+
} {
45+
wg.Add(1)
46+
go func() {
47+
defer wg.Done()
48+
aSum.add(in)
49+
}()
50+
}
51+
wg.Wait()
52+
assert.Equal(t, int64(15), aSum.load())
53+
}
54+
55+
func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
56+
var wg sync.WaitGroup
57+
hcwg := &hotColdWaitGroup{}
58+
var data [2]uint64
59+
for range 5 {
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
hotIdx := hcwg.start()
64+
defer hcwg.done(hotIdx)
65+
atomic.AddUint64(&data[hotIdx], 1)
66+
}()
67+
}
68+
for range 2 {
69+
readIdx := hcwg.swapHotAndWait()
70+
assert.NotPanics(t, func() {
71+
// reading without using atomics should not panic since we are
72+
// reading from the cold element, and have waited for all writes to
73+
// finish.
74+
t.Logf("read value %+v", data[readIdx])
75+
})
76+
}
77+
wg.Wait()
78+
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322322
valuesMu sync.Mutex
323323

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ func NewFilteredExemplarReservoir[N int64 | float64](
5454

5555
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
5656
if f.filter(ctx) {
57+
// only record the current time if we are sampling this measurement.
5758
ts := time.Now()
5859
if !f.concurrentSafe {
5960
f.reservoirMux.Lock()
6061
defer f.reservoirMux.Unlock()
6162
}
62-
// only record the current time if we are sampling this measurement.
6363
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
6464
}
6565
}

0 commit comments

Comments
 (0)