@@ -19,6 +19,8 @@ import (
1919 "os"
2020 "strconv"
2121 "strings"
22+ "sync"
23+ "sync/atomic"
2224 "time"
2325
2426 "github.com/cockroachdb/cockroach/pkg/base"
@@ -959,24 +961,46 @@ func registerKVRangeLookups(r registry.Registry) {
959961}
960962
961963// measureQPS will measure the approx QPS at the time this command is run. The
962- // duration is how long of an interval to wait while measuring. Setting too
963- // short of an interval can mean inaccuracy in results. Setting too long of an
964- // interval may mean the impact is blurred out.
965- func measureQPS (ctx context.Context , t test.Test , db * gosql.DB , duration time.Duration ) float64 {
966- numInserts := func () float64 {
967- var v float64
968- if err := db .QueryRowContext (
969- ctx , `SELECT value FROM crdb_internal.node_metrics WHERE name = 'sql.insert.count'` ,
970- ).Scan (& v ); err != nil {
971- t .Fatal (err )
964+ // duration is the interval to measure over. Setting too short of an interval
965+ // can mean inaccuracy in results. Setting too long of an interval may mean the
966+ // impact is blurred out.
967+ func measureQPS (
968+ ctx context.Context , t test.Test , duration time.Duration , dbs ... * gosql.DB ,
969+ ) float64 {
970+
971+ currentQPS := func () uint64 {
972+ var value uint64
973+ var wg sync.WaitGroup
974+ wg .Add (len (dbs ))
975+
976+ // Count the inserts before sleeping.
977+ for _ , db := range dbs {
978+ db := db
979+ go func () {
980+ defer wg .Done ()
981+ var v uint64
982+ if err := db .QueryRowContext (
983+ ctx , `SELECT value FROM crdb_internal.node_metrics WHERE name = 'sql.insert.count'` ,
984+ ).Scan (& v ); err != nil {
985+ t .Fatal (err )
986+ }
987+ atomic .AddUint64 (& value , v )
988+ }()
972989 }
973- return v
990+ wg .Wait ()
991+ return value
974992 }
975993
976- before := numInserts ()
977- time .Sleep (duration )
978- after := numInserts ()
979- return (after - before ) / duration .Seconds ()
994+ // Measure the current time and the QPS now.
995+ startTime := timeutil .Now ()
996+ beforeQPS := currentQPS ()
997+ // Wait for the duration minus the first query time.
998+ select {
999+ case <- ctx .Done ():
1000+ return 0
1001+ case <- time .After (duration - timeutil .Since (startTime )):
1002+ return float64 (currentQPS ()- beforeQPS ) / duration .Seconds ()
1003+ }
9801004}
9811005
9821006// registerKVRestartImpact measures the impact of stopping and then restarting a
@@ -1035,7 +1059,7 @@ func registerKVRestartImpact(r registry.Registry) {
10351059 // Let some data be written to all nodes in the cluster.
10361060 t .Status (fmt .Sprintf ("waiting %s to establish a base QPS" , duration ))
10371061 time .Sleep (duration )
1038- qpsInitial := measureQPS (ctx , t , db , 5 * time .Second )
1062+ qpsInitial := measureQPS (ctx , t , 5 * time .Second , db )
10391063 t .Status (fmt .Sprintf ("initial (single node) qps: %.0f" , qpsInitial ))
10401064
10411065 // Disable replicate queue on all nodes. This allows the test to reproduce
@@ -1075,7 +1099,7 @@ func registerKVRestartImpact(r registry.Registry) {
10751099 if ! c .IsLocal () {
10761100 time .Sleep (3 * time .Minute )
10771101 }
1078- qpsFinal := measureQPS (ctx , t , db , 5 * time .Second )
1102+ qpsFinal := measureQPS (ctx , t , 5 * time .Second , db )
10791103 t .Status (fmt .Sprintf ("post outage qps: %.0f" , qpsFinal ))
10801104
10811105 // Pass the test if the QPS is within a factor of 2. Often the qpsFinal is
0 commit comments