Skip to content

Commit b8c93ad

Browse files
craig[bot]stevendanna
andcommitted
Merge #147827
147827: rpc: optimize VerifyClockOffset a bit r=RaduBerinde a=stevendanna This replaces some of our stats calculations with new versions that assume sorted input, avoiding a good deal of the copying and sorting contained in library we were using. Before: BenchmarkVerifyClockOffset 5313 203408 ns/op 82118 B/op 5 allocs/op After: BenchmarkVerifyClockOffset 10000 102450 ns/op 16489 B/op 1 allocs/op Fixes #147825 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents b343b8a + ca9e5a9 commit b8c93ad

File tree

3 files changed

+211
-19
lines changed

3 files changed

+211
-19
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ go_test(
157157
"//pkg/util/leaktest",
158158
"//pkg/util/log",
159159
"//pkg/util/netutil",
160+
"//pkg/util/randutil",
160161
"//pkg/util/stop",
161162
"//pkg/util/syncutil",
162163
"//pkg/util/timeutil",
@@ -168,6 +169,9 @@ go_test(
168169
"@com_github_gogo_protobuf//types",
169170
"@com_github_gogo_status//:status",
170171
"@com_github_golang_mock//gomock",
172+
"@com_github_google_go_cmp//cmp",
173+
"@com_github_google_go_cmp//cmp/cmpopts",
174+
"@com_github_montanaflynn_stats//:stats",
171175
"@com_github_prometheus_client_model//go",
172176
"@com_github_stretchr_testify//assert",
173177
"@com_github_stretchr_testify//require",

pkg/rpc/clock_offset.go

Lines changed: 110 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ package rpc
88
import (
99
"context"
1010
"math"
11+
"sort"
1112
"time"
1213

1314
"github.com/VividCortex/ewma"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
1517
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1618
"github.com/cockroachdb/cockroach/pkg/util/log"
1719
"github.com/cockroachdb/cockroach/pkg/util/metric"
@@ -340,7 +342,7 @@ func (r *RemoteClockMonitor) VerifyClockOffset(ctx context.Context) error {
340342

341343
now := r.clock.Now()
342344
healthyOffsetCount := 0
343-
345+
sum := float64(0)
344346
offsets, numClocks := func() (stats.Float64Data, int) {
345347
r.mu.Lock()
346348
defer r.mu.Unlock()
@@ -351,31 +353,24 @@ func (r *RemoteClockMonitor) VerifyClockOffset(ctx context.Context) error {
351353
delete(r.mu.offsets, id)
352354
continue
353355
}
354-
offs = append(offs, float64(offset.Offset+offset.Uncertainty))
355-
offs = append(offs, float64(offset.Offset-offset.Uncertainty))
356+
off1 := float64(offset.Offset + offset.Uncertainty)
357+
off2 := float64(offset.Offset - offset.Uncertainty)
358+
sum += off1 + off2
359+
offs = append(offs, off1, off2)
356360
if offset.isHealthy(ctx, r.toleratedOffset) {
357361
healthyOffsetCount++
358362
}
359363
}
360364
return offs, len(r.mu.offsets)
361365
}()
362366

363-
mean, err := offsets.Mean()
364-
if err != nil && !errors.Is(err, stats.EmptyInput) {
365-
return err
366-
}
367-
stdDev, err := offsets.StandardDeviation()
368-
if err != nil && !errors.Is(err, stats.EmptyInput) {
369-
return err
370-
}
371-
median, err := offsets.Median()
372-
if err != nil && !errors.Is(err, stats.EmptyInput) {
373-
return err
374-
}
375-
medianAbsoluteDeviation, err := offsets.MedianAbsoluteDeviation()
376-
if err != nil && !errors.Is(err, stats.EmptyInput) {
377-
return err
378-
}
367+
sort.Float64s(offsets)
368+
369+
mean := sum / float64(len(offsets))
370+
stdDev := StandardDeviationPopulationKnownMean(offsets, mean)
371+
median := MedianSortedInput(offsets)
372+
medianAbsoluteDeviation := MedianAbsoluteDeviationPopulationSortedInput(offsets)
373+
379374
r.metrics.ClockOffsetMeanNanos.Update(int64(mean))
380375
r.metrics.ClockOffsetStdDevNanos.Update(int64(stdDev))
381376
r.metrics.ClockOffsetMedianNanos.Update(int64(median))
@@ -458,3 +453,99 @@ func updateClockOffsetTracking(
458453
remoteClocks.UpdateOffset(ctx, nodeID, offset, pingDuration)
459454
return pingDuration, offset, remoteClocks.VerifyClockOffset(ctx)
460455
}
456+
457+
// The following statistics functions are re-implementations of similar
458+
// functions provided by github.com/montanaflynn/stats. Those original functions
459+
// were originally offered under:
460+
//
461+
// The MIT License (MIT)
462+
//
463+
// Copyright (c) 2014-2023 Montana Flynn (https://montanaflynn.com)
464+
//
465+
// Permission is hereby granted, free of charge, to any person obtaining a copy
466+
// of this software and associated documentation files (the "Software"), to deal
467+
// in the Software without restriction, including without limitation the rights
468+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
469+
// copies of the Software, and to permit persons to whom the Software is
470+
// furnished to do so, subject to the following conditions:
471+
//
472+
// The above copyright notice and this permission notice shall be included in all
473+
// copies or substantial portions of the Software.
474+
//
475+
476+
// StandardDeviationPopulationKnownMean calculates the standard deviation
477+
// assuming the input is the population and that the given mean is the mean of
478+
// the input.
479+
func StandardDeviationPopulationKnownMean(input stats.Float64Data, mean float64) float64 {
480+
if input.Len() == 0 {
481+
return math.NaN()
482+
}
483+
return math.Sqrt(PopulationVarianceKnownMean(input, mean))
484+
}
485+
486+
// PopulationVarianceKnownMean calculates the variance assuming the input is the
487+
// population and that the given mean is the mean of the input.
488+
func PopulationVarianceKnownMean(input stats.Float64Data, mean float64) float64 {
489+
if input.Len() == 0 {
490+
return math.NaN()
491+
}
492+
variance := float64(0)
493+
for _, n := range input {
494+
diff := n - mean
495+
variance += diff * diff
496+
}
497+
return variance / float64(input.Len())
498+
}
499+
500+
// MedianSortedInput calculates the median of the input, assuming it is already
501+
// sorted.
502+
func MedianSortedInput(sortedInput stats.Float64Data) float64 {
503+
if buildutil.CrdbTestBuild {
504+
if !sort.IsSorted(sortedInput) {
505+
panic("MedianSortedInput expects sorted input")
506+
}
507+
}
508+
509+
l := len(sortedInput)
510+
if l == 0 {
511+
return math.NaN()
512+
} else if l%2 == 0 {
513+
return (sortedInput[(l/2)-1] + sortedInput[(l/2)]) / 2.0
514+
} else {
515+
return sortedInput[l/2]
516+
}
517+
}
518+
519+
// MedianAbsoluteDeviationPopulationSortedInput calculates the median absolute
520+
// deviation from a pre-sorted population.
521+
func MedianAbsoluteDeviationPopulationSortedInput(sortedInput stats.Float64Data) float64 {
522+
switch sortedInput.Len() {
523+
case 0:
524+
return math.NaN()
525+
case 1:
526+
return 0
527+
}
528+
529+
m := MedianSortedInput(sortedInput)
530+
a := sortedInput
531+
532+
// Peal off the largest difference on either end until we reach the midpoint(s).
533+
last := 0.0
534+
for len(a) > (len(sortedInput) / 2) {
535+
leftDiff := m - a[0]
536+
rightDiff := a[len(a)-1] - m
537+
if leftDiff >= rightDiff {
538+
last = leftDiff
539+
a = a[1:]
540+
} else {
541+
last = rightDiff
542+
a = a[:len(a)-1]
543+
}
544+
}
545+
546+
if len(sortedInput)%2 == 1 {
547+
return last
548+
} else {
549+
return (max(m-a[0], a[len(a)-1]-m) + last) * 0.5
550+
}
551+
}

pkg/rpc/clock_offset_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@ package rpc
88
import (
99
"context"
1010
"math"
11+
"sort"
1112
"testing"
1213
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
1516
"github.com/cockroachdb/cockroach/pkg/testutils"
1617
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
18+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
1719
"github.com/cockroachdb/cockroach/pkg/util/stop"
1820
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
21+
"github.com/google/go-cmp/cmp"
22+
"github.com/google/go-cmp/cmp/cmpopts"
23+
"github.com/montanaflynn/stats"
24+
"github.com/stretchr/testify/require"
1925
)
2026

2127
const errOffsetGreaterThanMaxOffset = "clock synchronization error: this node is more than .+ away from at least half of the known nodes"
@@ -256,3 +262,94 @@ func TestResettingMaxTrigger(t *testing.T) {
256262
}
257263
}
258264
}
265+
266+
// TestStatsFuncs tests our descriptive stats functions against the stats
267+
// package.
268+
func TestStatsFuncs(t *testing.T) {
269+
defer leaktest.AfterTest(t)()
270+
rng, _ := randutil.NewTestRand()
271+
size := rng.Intn(1000) + 1
272+
data := make(stats.Float64Data, size)
273+
for i := range size {
274+
neg := 1
275+
if rng.Float64() > 0.5 {
276+
neg = -1
277+
}
278+
data[i] = float64(neg) * float64(rng.Int63())
279+
}
280+
281+
// TODO(ssd): You'll note differences between whether the test compares
282+
// operations on the unsorted data or the sorted data. This is to avoid
283+
// failures caused by floating point error. I had hoped to always compare the
284+
// unsorted data passed to the reference implementation with the sorted data
285+
// passed to our implementation. But even the floatWithinReasonableTolerance
286+
// function below, with enough operations the non-associativity of floating
287+
// point arithmetic really seems to accumulate.
288+
sortedData := make(stats.Float64Data, size)
289+
copy(sortedData, data)
290+
sort.Float64s(sortedData)
291+
292+
mean, err := sortedData.Mean()
293+
require.NoError(t, err)
294+
295+
floatWithinReasonableTolerance := func(t *testing.T, expected, actual float64) {
296+
const tolerance = 0.0001
297+
withinTolerance := cmp.Equal(expected, actual, cmpopts.EquateApprox(tolerance, 0))
298+
if !withinTolerance {
299+
t.Errorf("values outside tolerance\n %f (expected)\n %f (actual)\n %f (tolerance)", expected, actual, tolerance)
300+
}
301+
}
302+
303+
t.Run("StandardDeviationPopulationKnownMean", func(t *testing.T) {
304+
ourStdDev := StandardDeviationPopulationKnownMean(data, mean)
305+
theirStdDev, err := stats.StandardDeviation(data)
306+
require.NoError(t, err)
307+
floatWithinReasonableTolerance(t, theirStdDev, ourStdDev)
308+
})
309+
310+
t.Run("MedianSortedInput", func(t *testing.T) {
311+
ourMedian := MedianSortedInput(sortedData)
312+
theirMedian, err := stats.Median(data)
313+
require.NoError(t, err)
314+
floatWithinReasonableTolerance(t, theirMedian, ourMedian)
315+
})
316+
317+
t.Run("PopulationVarianceKnownMean", func(t *testing.T) {
318+
ourVar := PopulationVarianceKnownMean(sortedData, mean)
319+
theirVar, err := stats.PopulationVariance(sortedData)
320+
require.NoError(t, err)
321+
floatWithinReasonableTolerance(t, theirVar, ourVar)
322+
})
323+
324+
t.Run("MedianAbsoluteDeviationPopulationSortedInput", func(t *testing.T) {
325+
ourMedAbsDev := MedianAbsoluteDeviationPopulationSortedInput(sortedData)
326+
theirMedianAbsDev, err := stats.MedianAbsoluteDeviationPopulation(data)
327+
require.NoError(t, err)
328+
floatWithinReasonableTolerance(t, theirMedianAbsDev, ourMedAbsDev)
329+
})
330+
}
331+
332+
func BenchmarkVerifyClockOffset(b *testing.B) {
333+
defer leaktest.AfterTest(b)()
334+
335+
clock := timeutil.NewManualTime(timeutil.Unix(0, 123))
336+
maxOffset := 50 * time.Nanosecond
337+
monitor := newRemoteClockMonitor(clock, maxOffset, time.Hour, 0)
338+
rng, _ := randutil.NewTestRand()
339+
340+
offsetCount := 1000
341+
monitor.mu.offsets = make(map[roachpb.NodeID]RemoteOffset)
342+
for i := range offsetCount {
343+
neg := int64(1)
344+
if rng.Float64() > 0.5 {
345+
neg = -1
346+
}
347+
offset := neg * int64(rng.Float64()*float64(maxOffset))
348+
monitor.mu.offsets[roachpb.NodeID(i)] = RemoteOffset{Offset: offset}
349+
}
350+
351+
b.ResetTimer()
352+
for i := 0; i < b.N; i++ {
353+
require.NoError(b, monitor.VerifyClockOffset(context.Background()))
354+
}
355+
}

0 commit comments

Comments
 (0)