Skip to content

Commit 6a4ebd8

Browse files
committed
kvserver: record follower write bytes in replica load
Previously, write bytes for range load were only tracked on leaseholder replicas during proposal, leaving write bytes written to follower replicas unaccounted for. This commit updates the tracking to include write bytes on follower replicas as well. Epic: none Release note: none
1 parent f3a3492 commit 6a4ebd8

File tree

4 files changed

+74
-34
lines changed

4 files changed

+74
-34
lines changed

pkg/kv/kvserver/load/replica_load.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ type ReplicaLoadStats struct {
6565
// follower and leaseholder reads.
6666
ReadKeysPerSecond float64
6767
// WriteBytesPerSecond is the replica's average bytes written per second. A
68-
// "Write" is as described in WritesPerSecond.
68+
// "Write" is as described in WritesPerSecond. If the replica is a leaseholder,
69+
// this is recorded as the bytes that will be written by the replica during the
70+
// application of the Raft command including write bytes and ingested bytes for
71+
// AddSSTable requests. If the replica is a follower, this is recorded right
72+
// before a command is applied to the state machine.
6973
WriteBytesPerSecond float64
7074
// ReadBytesPerSecond is the replica's average bytes read per second. A "Read" is as
7175
// described in ReadsPerSecond.

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -253,16 +253,20 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
253253
// We don't track these stats in standalone log application since they depend
254254
// on whether the proposer is still waiting locally, and this concept does not
255255
// apply in a standalone context.
256-
//
257-
// TODO(irfansharif): This code block can be removed once below-raft
258-
// admission control is the only form of IO admission control. It pre-dates
259-
// it -- these stats were previously used to deduct IO tokens for follower
260-
// writes/ingests without waiting.
261-
if !cmd.IsLocal() && !cmd.ApplyAdmissionControl() {
256+
if !cmd.IsLocal() {
262257
writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes()
263-
b.followerStoreWriteBytes.NumEntries++
264-
b.followerStoreWriteBytes.WriteBytes += writeBytes
265-
b.followerStoreWriteBytes.IngestedBytes += ingestedBytes
258+
if writeBytes > 0 || ingestedBytes > 0 {
259+
b.r.recordRequestWriteBytes(writeBytes, ingestedBytes)
260+
}
261+
// TODO(irfansharif): This code block can be removed once below-raft
262+
// admission control is the only form of IO admission control. It pre-dates
263+
// it -- these stats were previously used to deduct IO tokens for follower
264+
// writes/ingests without waiting.
265+
if !cmd.ApplyAdmissionControl() {
266+
b.followerStoreWriteBytes.NumEntries++
267+
b.followerStoreWriteBytes.WriteBytes += writeBytes
268+
b.followerStoreWriteBytes.IngestedBytes += ingestedBytes
269+
}
266270
}
267271

268272
// MVCC history mutations violate the closed timestamp, modifying data that

pkg/kv/kvserver/replica_raft.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ var ReplicaLeaderlessUnavailableThreshold = settings.RegisterDurationSettingWith
9696
// terminate execution, although it is given no guarantee that the proposal
9797
// won't still go on to commit and apply at some later time.
9898
// - the proposal's ID.
99+
// - the bytes that will be written by the replica during the application of
100+
// the Raft command.
99101
// - any error obtained during the creation or proposal of the command, in
100102
// which case the other returned values are zero.
101103
func (r *Replica) evalAndPropose(

pkg/kv/kvserver/replica_rankings_test.go

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,11 @@ func TestWriteLoadStatsAccounting(t *testing.T) {
218218
ReplicationMode: base.ReplicationManual,
219219
}
220220
args.ServerArgs.Knobs.Store = &StoreTestingKnobs{DisableCanAckBeforeApplication: true}
221-
tc := serverutils.StartCluster(t, 1, args)
221+
tc := serverutils.StartCluster(t, 3, args)
222+
defer tc.Stopper().Stop(ctx)
222223

223224
const epsilonAllowed = 5
224225

225-
defer tc.Stopper().Stop(ctx)
226226
ts := tc.Server(0)
227227
db := ts.DB()
228228
conn := tc.ServerConn(0)
@@ -246,52 +246,82 @@ func TestWriteLoadStatsAccounting(t *testing.T) {
246246
{1234, 1234, 1234, 0, 1234 * writeSize, 0},
247247
}
248248

249-
store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
249+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(1), tc.Target(2))
250+
s1, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID())
250251
require.NoError(t, err)
251-
252-
repl := store.LookupReplica(roachpb.RKey(scratchKey))
253-
require.NotNil(t, repl)
252+
lhRepl := s1.LookupReplica(roachpb.RKey(scratchKey))
253+
require.NotNil(t, lhRepl)
254+
s2, err := tc.Server(1).GetStores().(*Stores).GetStore(tc.Server(1).GetFirstStoreID())
255+
require.NoError(t, err)
256+
followerRepl1 := s2.LookupReplica(roachpb.RKey(scratchKey))
257+
require.NotNil(t, followerRepl1)
258+
s3, err := tc.Server(2).GetStores().(*Stores).GetStore(tc.Server(2).GetFirstStoreID())
259+
require.NoError(t, err)
260+
followerRepl2 := s3.LookupReplica(roachpb.RKey(scratchKey))
261+
require.NotNil(t, followerRepl2)
254262

255263
// Disable the consistency checker, to avoid interleaving requests
256-
// artificially inflating measurement due to consistency checking.
264+
// artificially inflating measurement due to consistency checking. Also,
265+
// disable the mvcc gc queue and raft log queue to avoid them issuing
266+
// interleaving requests as well.
257267
sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)
258268
sqlDB.Exec(t, `SET CLUSTER SETTING kv.range_split.by_load.enabled = false`)
269+
sqlDB.Exec(t, `SET CLUSTER SETTING kv.mvcc_gc_queue.enabled = false`)
270+
sqlDB.Exec(t, `SET CLUSTER SETTING kv.raft_log_queue.enabled = false`)
259271

260272
for _, testCase := range testCases {
261273
// Reset the request counts to 0 before sending to clear previous requests.
262-
repl.loadStats.Reset()
263-
264-
requestsBefore := repl.loadStats.TestingGetSum(load.Requests)
265-
writesBefore := repl.loadStats.TestingGetSum(load.WriteKeys)
266-
readsBefore := repl.loadStats.TestingGetSum(load.ReadKeys)
267-
readBytesBefore := repl.loadStats.TestingGetSum(load.ReadBytes)
268-
writeBytesBefore := repl.loadStats.TestingGetSum(load.WriteBytes)
274+
lhRepl.loadStats.Reset()
275+
followerRepl1.loadStats.Reset()
276+
followerRepl2.loadStats.Reset()
277+
278+
requestsBefore := lhRepl.loadStats.TestingGetSum(load.Requests)
279+
readsBefore := lhRepl.loadStats.TestingGetSum(load.ReadKeys)
280+
lhWritesBefore := lhRepl.loadStats.TestingGetSum(load.WriteKeys)
281+
readBytesBefore := lhRepl.loadStats.TestingGetSum(load.ReadBytes)
282+
lhWriteBytesBefore := lhRepl.loadStats.TestingGetSum(load.WriteBytes)
283+
follower1WriteBytesBefore := followerRepl1.loadStats.TestingGetSum(load.WriteBytes)
284+
follower2WriteBytesBefore := followerRepl2.loadStats.TestingGetSum(load.WriteBytes)
269285

270286
for i := 0; i < testCase.writes; i++ {
271287
_, pErr := db.Inc(ctx, scratchKey, 1)
272288
require.Nil(t, pErr)
273289
}
274290
require.Equal(t, 0.0, requestsBefore)
275-
require.Equal(t, 0.0, writesBefore)
291+
require.Equal(t, 0.0, lhWritesBefore)
276292
require.Equal(t, 0.0, readsBefore)
277-
require.Equal(t, 0.0, writeBytesBefore)
293+
require.Equal(t, 0.0, lhWriteBytesBefore)
278294
require.Equal(t, 0.0, readBytesBefore)
279-
280-
requestsAfter := repl.loadStats.TestingGetSum(load.Requests)
281-
writesAfter := repl.loadStats.TestingGetSum(load.WriteKeys)
282-
readsAfter := repl.loadStats.TestingGetSum(load.ReadKeys)
283-
readBytesAfter := repl.loadStats.TestingGetSum(load.ReadBytes)
284-
writeBytesAfter := repl.loadStats.TestingGetSum(load.WriteBytes)
295+
require.Equal(t, 0.0, follower1WriteBytesBefore)
296+
require.Equal(t, 0.0, follower2WriteBytesBefore)
297+
298+
require.NoError(t, waitForApplication(
299+
ctx,
300+
s1.cfg.NodeDialer,
301+
lhRepl.GetRangeID(),
302+
lhRepl.Desc().Replicas().Descriptors(),
303+
lhRepl.GetLeaseAppliedIndex(),
304+
))
305+
306+
requestsAfter := lhRepl.loadStats.TestingGetSum(load.Requests)
307+
lhWritesAfter := lhRepl.loadStats.TestingGetSum(load.WriteKeys)
308+
readsAfter := lhRepl.loadStats.TestingGetSum(load.ReadKeys)
309+
readBytesAfter := lhRepl.loadStats.TestingGetSum(load.ReadBytes)
310+
lhWriteBytesAfter := lhRepl.loadStats.TestingGetSum(load.WriteBytes)
311+
follower1WriteBytesAfter := followerRepl1.loadStats.TestingGetSum(load.WriteBytes)
312+
follower2WriteBytesAfter := followerRepl2.loadStats.TestingGetSum(load.WriteBytes)
285313

286314
assertGreaterThanInDelta(t, testCase.expectedRQPS, requestsAfter, epsilonAllowed)
287-
assertGreaterThanInDelta(t, testCase.expectedWPS, writesAfter, epsilonAllowed)
315+
assertGreaterThanInDelta(t, testCase.expectedWPS, lhWritesAfter, epsilonAllowed)
288316
assertGreaterThanInDelta(t, testCase.expectedRPS, readsAfter, epsilonAllowed)
289317
assertGreaterThanInDelta(t, testCase.expectedRBPS, readBytesAfter, epsilonAllowed)
290318
// NB: We assert that the written bytes is greater than the write
291319
// batch request size. However the size multiplication factor,
292320
// varies between 3 and 5 so we instead assert that it is greater
293321
// than the logical bytes.
294-
require.GreaterOrEqual(t, writeBytesAfter, testCase.expectedWBPS)
322+
require.GreaterOrEqual(t, lhWriteBytesAfter, testCase.expectedWBPS)
323+
require.GreaterOrEqual(t, follower1WriteBytesAfter, testCase.expectedWBPS)
324+
require.GreaterOrEqual(t, follower2WriteBytesAfter, testCase.expectedWBPS)
295325
}
296326
}
297327

0 commit comments

Comments
 (0)