Skip to content

Commit dcb363c

Browse files
committed
sidetransport: bump TestPaceUpdateSignalling expectation
This bumps the expectation for "immediate wakeup" to 30ms. This is 30x what I've observed running this under stress, but only 5ms over the recent test failure. The real fix here is going to be to thread our clock source down into the pacer so that we can completely mock the clock and make the test deterministic. While here, I updated the test a bit. Informs #155199 Release note: None
1 parent 44eef4b commit dcb363c

File tree

2 files changed

+37
-37
lines changed

2 files changed

+37
-37
lines changed

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ go_test(
5353
"//pkg/settings/cluster",
5454
"//pkg/testutils",
5555
"//pkg/testutils/skip",
56+
"//pkg/util/ctxgroup",
5657
"//pkg/util/hlc",
5758
"//pkg/util/leaktest",
5859
"//pkg/util/log",
5960
"//pkg/util/stop",
6061
"//pkg/util/syncutil",
62+
"@com_github_cockroachdb_crlib//crtime",
6163
"@com_github_cockroachdb_errors//:errors",
6264
"@com_github_stretchr_testify//require",
6365
"@io_storj_drpc//:drpc",

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"net"
12+
"slices"
1213
"sync"
1314
"sync/atomic"
1415
"testing"
@@ -25,11 +26,13 @@ import (
2526
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2627
"github.com/cockroachdb/cockroach/pkg/testutils"
2728
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
29+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2830
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2931
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3032
"github.com/cockroachdb/cockroach/pkg/util/log"
3133
"github.com/cockroachdb/cockroach/pkg/util/stop"
3234
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
35+
"github.com/cockroachdb/crlib/crtime"
3336
"github.com/cockroachdb/errors"
3437
"github.com/stretchr/testify/require"
3538
"google.golang.org/grpc"
@@ -957,26 +960,25 @@ func TestPaceUpdateSignalling(t *testing.T) {
957960
// each other.
958961
//
959962
// seqNum is the sequence number to wait for.
960-
testPacing := func(seqNum ctpb.SeqNum, assertionFunc func(timeSpread time.Duration)) {
963+
testPacing := func(t *testing.T, seqNum ctpb.SeqNum, assertionFunc func(timeSpread time.Duration)) {
961964
// Track the times when goroutines receive items from the buffer.
962-
var receiveTimes []time.Time
965+
var receiveTimes []crtime.Mono
963966
var mu syncutil.Mutex
964967

965968
// Create numWaiters goroutines that wait on s.buf.GetBySeq and record
966969
// receive times.
967-
done := make(chan struct{}, numWaiters)
968-
for i := 0; i < numWaiters; i++ {
969-
go func() {
970+
g := ctxgroup.WithContext(ctx)
971+
for range numWaiters {
972+
g.Go(func() error {
970973
// Wait for the specified sequence number.
971974
_, ok := s.buf.GetBySeq(ctx, seqNum)
972-
require.Equal(t, true, ok)
973-
if ok {
974-
mu.Lock()
975-
receiveTimes = append(receiveTimes, time.Now())
976-
mu.Unlock()
977-
}
978-
done <- struct{}{}
979-
}()
975+
require.True(t, ok)
976+
977+
mu.Lock()
978+
defer mu.Unlock()
979+
receiveTimes = append(receiveTimes, crtime.NowMono())
980+
return nil
981+
})
980982
}
981983

982984
// Wait until all goroutines are waiting on the buffer.
@@ -991,47 +993,43 @@ func TestPaceUpdateSignalling(t *testing.T) {
991993
s.publish(ctx)
992994

993995
// Wait for all goroutines to finish.
994-
for i := 0; i < numWaiters; i++ {
995-
<-done
996-
}
996+
require.NoError(t, g.Wait())
997997

998998
// Verify that all goroutines received the message.
999999
require.Len(t, receiveTimes, numWaiters)
10001000

1001-
// Find min and max receive times.
1002-
minTime := receiveTimes[0]
1003-
maxTime := receiveTimes[0]
1004-
for _, t := range receiveTimes[1:] {
1005-
if t.Before(minTime) {
1006-
minTime = t
1007-
}
1008-
if t.After(maxTime) {
1009-
maxTime = t
1010-
}
1011-
}
1012-
10131001
// Verify that the time spread matches expectations.
1002+
minTime := slices.Min(receiveTimes)
1003+
maxTime := slices.Max(receiveTimes)
10141004
timeSpread := maxTime.Sub(minTime)
10151005
assertionFunc(timeSpread)
10161006
}
10171007

10181008
// Test with 250ms pacing interval - expect at least 125ms spread just to be
10191009
// conservative. In practice, it should be closer to 250ms.
1020-
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 250*time.Millisecond)
1021-
testPacing(1 /* seqNum */, func(timeSpread time.Duration) {
1022-
require.GreaterOrEqual(t, timeSpread, 125*time.Millisecond)
1010+
t.Run("pacing_interval=250ms", func(t *testing.T) {
1011+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 250*time.Millisecond)
1012+
testPacing(t, 1 /* seqNum */, func(timeSpread time.Duration) {
1013+
require.GreaterOrEqual(t, timeSpread, 125*time.Millisecond)
1014+
})
10231015
})
10241016

10251017
// Change to 100ms pacing interval - expect at least 50ms spread.
1026-
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 100*time.Millisecond)
1027-
testPacing(2 /* seqNum */, func(timeSpread time.Duration) {
1028-
require.GreaterOrEqual(t, timeSpread, 50*time.Millisecond)
1018+
t.Run("pacing_interval=100ms", func(t *testing.T) {
1019+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 100*time.Millisecond)
1020+
testPacing(t, 2 /* seqNum */, func(timeSpread time.Duration) {
1021+
require.GreaterOrEqual(t, timeSpread, 50*time.Millisecond)
1022+
})
10291023
})
10301024

10311025
// Change to 0ms (disabled) pacing interval - expect all goroutines to be
10321026
// woken within a few milliseconds of each other.
1033-
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 0)
1034-
testPacing(3 /* seqNum */, func(timeSpread time.Duration) {
1035-
require.LessOrEqual(t, timeSpread, 25*time.Millisecond)
1027+
t.Run("pacing_interval=0ms", func(t *testing.T) {
1028+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 0)
1029+
testPacing(t, 3 /* seqNum */, func(timeSpread time.Duration) {
1030+
// The expectation here is many 30x what is typically seen. But, we want
1031+
// to avoid flakes.
1032+
require.LessOrEqual(t, timeSpread, 30*time.Millisecond)
1033+
})
10361034
})
10371035
}

0 commit comments

Comments
 (0)