Skip to content

Commit 6dee617

Browse files
craig[bot]stevendanna
andcommitted
Merge #157825
157825: kvnemesis: add clock skew and drift r=stevendanna a=stevendanna This injects clock drift between the nodes in a multi-node KVNemesis cluster. To start, the clock uses a Sinusoidal drift centered on a fixed offset from realtime with an amplitude that ensures any two clocks should stay within the MaxOffset. This is perhaps not very realistic, but we can improve this over time. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents 55edba2 + 0d59296 commit 6dee617

File tree

4 files changed

+185
-3
lines changed

4 files changed

+185
-3
lines changed

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
name = "kvnemesis",
77
srcs = [
88
"applier.go",
9+
"clock.go",
910
"doc.go",
1011
"engine.go",
1112
"env.go",
@@ -65,6 +66,7 @@ go_test(
6566
size = "large",
6667
srcs = [
6768
"applier_test.go",
69+
"clock_test.go",
6870
"engine_test.go",
6971
"generator_test.go",
7072
"kvnemesis_test.go",
@@ -117,6 +119,7 @@ go_test(
117119
"//pkg/util/randutil",
118120
"//pkg/util/stop",
119121
"//pkg/util/syncutil",
122+
"//pkg/util/timeutil",
120123
"//pkg/util/uuid",
121124
"@com_github_cockroachdb_errors//:errors",
122125
"@com_github_stretchr_testify//assert",

pkg/kv/kvnemesis/clock.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kvnemesis
7+
8+
import (
9+
"fmt"
10+
"math"
11+
"math/rand"
12+
"time"
13+
14+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
15+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
16+
)
17+
18+
// testClock is an hlc.WallClock that simulates drift around some fixed offset
19+
// from the given system clock.
20+
type testClock struct {
21+
clock hlc.WallClock
22+
23+
sign int
24+
// freq is the angular frequency in radians/nanosecond.
25+
freq float64
26+
amplitude time.Duration
27+
fixedOffset time.Duration
28+
}
29+
30+
// NewTestClock returns a testClock.
31+
//
32+
// The maxOffset is the maximum offset of two clocks created with NewTestClock.
33+
// The clock will return times in the range
34+
//
35+
// [t-maxOffset/2, t+maxOffset/2]
36+
//
37+
// We currently use a sine wave to drift the clock over time. Note that this was
38+
// chosen without much detailed research. The goal was to have something that
39+
// drifted the clock over time in one direction to simulate a slow or fast clock
40+
// and then move it in the opposite direction to simulate ntpd or chronyd
41+
// correcting the clock.
42+
//
43+
// We may instead want something more like a saw-tooth pattern where the
44+
// correction is rather abrupt.
45+
func NewTestClock(maxOffset time.Duration, period time.Duration, rng *rand.Rand) *testClock {
46+
maxAmplitude := time.Duration(float64(maxOffset / 2))
47+
// We don't want all the clocks moving in lock step. We do two things:
48+
//
49+
// 1. Randomly swap the initial direction of drift, and
50+
//
51+
// 2. Add a small fixed offset from the center (reducing amplitude
52+
// appropriately), in the direction of the swap.
53+
sign := 1
54+
if rng.Intn(2) == 1 {
55+
sign = -1
56+
}
57+
jitter := 0.10 * rng.Float64()
58+
fixedOffset := time.Duration(float64(maxAmplitude) * jitter)
59+
amplitude := maxAmplitude - fixedOffset
60+
freq := float64(2.0*math.Pi) / float64(period)
61+
return &testClock{
62+
clock: timeutil.DefaultTimeSource{},
63+
sign: sign,
64+
freq: freq,
65+
amplitude: amplitude,
66+
fixedOffset: fixedOffset,
67+
}
68+
}
69+
70+
var _ hlc.WallClock = (*testClock)(nil)
71+
72+
func (c *testClock) Now() time.Time {
73+
//
74+
// f(t) = t + sign*(amplitude*sin(freq*t) + offset)
75+
//
76+
// TODO(ssd): math.Sin on every call to Now() might be expensive. One
77+
// alternative might be simple linear growth up to some max which then
78+
// reverses direction until the offset is back to zero. Or, if we like the Sin
79+
// wave perhaps we can store of the current offset and then only update it
80+
// every so many calls.
81+
t := c.clock.Now()
82+
p := math.Sin(c.freq * float64(t.UnixNano()))
83+
adjust := float64(c.amplitude)*p + float64(c.fixedOffset)
84+
adjust = float64(c.sign) * adjust
85+
return t.Add(time.Duration(adjust))
86+
}
87+
88+
func (c *testClock) String() string {
89+
return fmt.Sprintf("clock[offset=%s,amp=%s]", time.Duration(c.sign)*c.fixedOffset, c.amplitude)
90+
}

pkg/kv/kvnemesis/clock_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kvnemesis
7+
8+
import (
9+
"testing"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
13+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
14+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestTestClock(t *testing.T) {
19+
defer leaktest.AfterTest(t)()
20+
21+
rng, _ := randutil.NewTestRand()
22+
maxOffset := 10 * time.Millisecond
23+
period := 10 * time.Second
24+
tolerance := 50 * time.Microsecond
25+
26+
t.Run("clock nears amplitude at period", func(t *testing.T) {
27+
c := NewTestClock(maxOffset, period, rng)
28+
clock := timeutil.NewManualTime(timeutil.Unix(0, 0))
29+
c.clock = clock
30+
clock.Advance(period)
31+
now := c.Now()
32+
require.Less(t, tolerance, abs(c.amplitude.Nanoseconds()-abs(period.Nanoseconds()-now.UnixNano())))
33+
})
34+
t.Run("5000 advances", func(t *testing.T) {
35+
clock := timeutil.NewManualTime(timeutil.Unix(0, 0))
36+
37+
c1 := NewTestClock(maxOffset, period, rng)
38+
c2 := NewTestClock(maxOffset, period, rng)
39+
c1.clock = clock
40+
c2.clock = clock
41+
42+
for range 5000 {
43+
adv := rng.Int31n(100)
44+
clock.Advance(time.Duration(adv) * time.Millisecond)
45+
now1 := c1.Now()
46+
now2 := c2.Now()
47+
actual := clock.Now()
48+
49+
// The two clocks should never be further apart than maxOffset
50+
require.LessOrEqual(t, time.Duration(abs(int64(now1.Sub(now2)))), maxOffset)
51+
52+
// Also just check each clock on its own.
53+
clockInsideBounds := func(t *testing.T, computed time.Time, actual time.Time) {
54+
maxErr := maxOffset/2 + tolerance
55+
max := actual.Add(maxErr)
56+
min := actual.Add(-maxErr)
57+
require.Less(t, computed, max)
58+
require.Greater(t, computed, min)
59+
}
60+
clockInsideBounds(t, now1, actual)
61+
clockInsideBounds(t, now2, actual)
62+
}
63+
})
64+
}
65+
66+
func abs(i int64) int64 {
67+
if i < 0 {
68+
return -1 * i
69+
}
70+
return i
71+
}

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ import (
5858
var defaultNumSteps = envutil.EnvOrDefaultInt("COCKROACH_KVNEMESIS_STEPS", 100)
5959

6060
func (cfg kvnemesisTestCfg) testClusterArgs(
61-
ctx context.Context, tr *SeqTracker, partitioner *rpc.Partitioner, mode TestMode,
61+
ctx context.Context,
62+
t testing.TB,
63+
rng *rand.Rand,
64+
tr *SeqTracker,
65+
partitioner *rpc.Partitioner,
66+
mode TestMode,
6267
) (base.TestClusterArgs, stop.Closer) {
6368
storeKnobs := &kvserver.StoreTestingKnobs{
6469
DisableRaftLogQueue: true,
@@ -258,15 +263,28 @@ func (cfg kvnemesisTestCfg) testClusterArgs(
258263
for i := 0; i < cfg.numNodes; i++ {
259264
nodeId := i + 1
260265
ctk := rpc.ContextTestingKnobs{}
266+
// Test clock injection
267+
//
268+
// We need to stay away within 80% of the max offset, so we set our
269+
// offset to 70% of the configured MaxOffset. Since MaxOffset is
270+
// relatively small, this doesn't give us much wiggle room.
271+
//
272+
// NOTE(ssd): If we see flakes because of untrustworthy remote clocks,
273+
// don't hesitate to increase this safety margin for now.
274+
maxOffset := time.Duration(float64(storeKnobs.MaxOffset) * 0.70)
275+
period := 1 * time.Second
276+
clock := NewTestClock(maxOffset, period, rng)
277+
t.Logf("n%d using %s", nodeId, clock)
261278
partitioner.RegisterTestingKnobs(roachpb.NodeID(nodeId), &ctk)
262279
perNodeServerArgs := commonServerArgs
263280
perNodeServerArgs.Knobs.Server = &server.TestingKnobs{
281+
StickyVFSRegistry: reg,
282+
WallClock: clock,
264283
ContextTestingKnobs: ctk,
265284
}
266285
perNodeServerArgs.Locality = roachpb.Locality{
267286
Tiers: []roachpb.Tier{{Key: "node", Value: fmt.Sprintf("n%d", nodeId)}},
268287
}
269-
perNodeServerArgs.Knobs.Server = &server.TestingKnobs{StickyVFSRegistry: reg}
270288
perNodeServerArgs.StoreSpecs = append(
271289
perNodeServerArgs.StoreSpecs,
272290
base.StoreSpec{InMemory: true, StickyVFSID: strconv.Itoa(nodeId)},
@@ -608,7 +626,7 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
608626
ctx := context.Background()
609627
tr := &SeqTracker{}
610628
var partitioner rpc.Partitioner
611-
args, closer := cfg.testClusterArgs(ctx, tr, &partitioner, cfg.mode)
629+
args, closer := cfg.testClusterArgs(ctx, t, rng, tr, &partitioner, cfg.mode)
612630
tc := testcluster.StartTestCluster(t, cfg.numNodes, args)
613631
tc.Stopper().AddCloser(closer)
614632
defer tc.Stopper().Stop(ctx)

0 commit comments

Comments
 (0)