99 "context"
1010 "fmt"
1111 "net"
12+ "slices"
1213 "sync"
1314 "sync/atomic"
1415 "testing"
@@ -25,11 +26,13 @@ import (
2526 "github.com/cockroachdb/cockroach/pkg/settings/cluster"
2627 "github.com/cockroachdb/cockroach/pkg/testutils"
2728 "github.com/cockroachdb/cockroach/pkg/testutils/skip"
29+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2830 "github.com/cockroachdb/cockroach/pkg/util/hlc"
2931 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
3032 "github.com/cockroachdb/cockroach/pkg/util/log"
3133 "github.com/cockroachdb/cockroach/pkg/util/stop"
3234 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
35+ "github.com/cockroachdb/crlib/crtime"
3336 "github.com/cockroachdb/errors"
3437 "github.com/stretchr/testify/require"
3538 "google.golang.org/grpc"
@@ -957,26 +960,25 @@ func TestPaceUpdateSignalling(t *testing.T) {
957960 // each other.
958961 //
959962 // seqNum is the sequence number to wait for.
960- testPacing := func (seqNum ctpb.SeqNum , assertionFunc func (timeSpread time.Duration )) {
963+ testPacing := func (t * testing. T , seqNum ctpb.SeqNum , assertionFunc func (timeSpread time.Duration )) {
961964 // Track the times when goroutines receive items from the buffer.
962- var receiveTimes []time. Time
965+ var receiveTimes []crtime. Mono
963966 var mu syncutil.Mutex
964967
965968 // Create numWaiters goroutines that wait on s.buf.GetBySeq and record
966969 // receive times.
967- done := make ( chan struct {}, numWaiters )
968- for i := 0 ; i < numWaiters ; i ++ {
969- go func () {
970+ g := ctxgroup . WithContext ( ctx )
971+ for range numWaiters {
972+ g . Go ( func () error {
970973 // Wait for the specified sequence number.
971974 _ , ok := s .buf .GetBySeq (ctx , seqNum )
972- require .Equal (t , true , ok )
973- if ok {
974- mu .Lock ()
975- receiveTimes = append (receiveTimes , time .Now ())
976- mu .Unlock ()
977- }
978- done <- struct {}{}
979- }()
975+ require .True (t , ok )
976+
977+ mu .Lock ()
978+ defer mu .Unlock ()
979+ receiveTimes = append (receiveTimes , crtime .NowMono ())
980+ return nil
981+ })
980982 }
981983
982984 // Wait until all goroutines are waiting on the buffer.
@@ -991,47 +993,43 @@ func TestPaceUpdateSignalling(t *testing.T) {
991993 s .publish (ctx )
992994
993995 // Wait for all goroutines to finish.
994- for i := 0 ; i < numWaiters ; i ++ {
995- <- done
996- }
996+ require .NoError (t , g .Wait ())
997997
998998 // Verify that all goroutines received the message.
999999 require .Len (t , receiveTimes , numWaiters )
10001000
1001- // Find min and max receive times.
1002- minTime := receiveTimes [0 ]
1003- maxTime := receiveTimes [0 ]
1004- for _ , t := range receiveTimes [1 :] {
1005- if t .Before (minTime ) {
1006- minTime = t
1007- }
1008- if t .After (maxTime ) {
1009- maxTime = t
1010- }
1011- }
1012-
10131001 // Verify that the time spread matches expectations.
1002+ minTime := slices .Min (receiveTimes )
1003+ maxTime := slices .Max (receiveTimes )
10141004 timeSpread := maxTime .Sub (minTime )
10151005 assertionFunc (timeSpread )
10161006 }
10171007
10181008 // Test with 250ms pacing interval - expect at least 125ms spread just to be
10191009 // conservative. In practice, it should be closer to 250ms.
1020- closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 250 * time .Millisecond )
1021- testPacing (1 /* seqNum */ , func (timeSpread time.Duration ) {
1022- require .GreaterOrEqual (t , timeSpread , 125 * time .Millisecond )
1010+ t .Run ("pacing_interval=250ms" , func (t * testing.T ) {
1011+ closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 250 * time .Millisecond )
1012+ testPacing (t , 1 /* seqNum */ , func (timeSpread time.Duration ) {
1013+ require .GreaterOrEqual (t , timeSpread , 125 * time .Millisecond )
1014+ })
10231015 })
10241016
10251017 // Change to 100ms pacing interval - expect at least 50ms spread.
1026- closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 100 * time .Millisecond )
1027- testPacing (2 /* seqNum */ , func (timeSpread time.Duration ) {
1028- require .GreaterOrEqual (t , timeSpread , 50 * time .Millisecond )
1018+ t .Run ("pacing_interval=100ms" , func (t * testing.T ) {
1019+ closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 100 * time .Millisecond )
1020+ testPacing (t , 2 /* seqNum */ , func (timeSpread time.Duration ) {
1021+ require .GreaterOrEqual (t , timeSpread , 50 * time .Millisecond )
1022+ })
10291023 })
10301024
10311025 // Change to 0ms (disabled) pacing interval - expect all goroutines to be
10321026 // woken within a few milliseconds of each other.
1033- closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 0 )
1034- testPacing (3 /* seqNum */ , func (timeSpread time.Duration ) {
1035- require .LessOrEqual (t , timeSpread , 25 * time .Millisecond )
1027+ t .Run ("pacing_interval=0ms" , func (t * testing.T ) {
1028+ closedts .SideTransportPacingRefreshInterval .Override (ctx , & st .SV , 0 )
1029+ testPacing (t , 3 /* seqNum */ , func (timeSpread time.Duration ) {
1030+ // The expectation here is many 30x what is typically seen. But, we want
1031+ // to avoid flakes.
1032+ require .LessOrEqual (t , timeSpread , 30 * time .Millisecond )
1033+ })
10361034 })
10371035}
0 commit comments