Skip to content

Commit 651f996

Browse files
craig[bot]dhartunianstevendannaSachuman
committed
147665: e2e-tests: add cypress tests for scale cluster r=dhartunian a=dhartunian This commit adds files to enable the use of Cypress to smoke test DB Console on a cluster to see if it meets basic performance requirements. This can be used on a scale cluster to make sure we can render all the standard pages in a reasonable amount of time. The test will print out timing information for key elements in the logs. Additionally, screenshots are taken immediately after the timing snapshot to enable easy analysis of what's on the page at time of timer capture. You can then confirm that the page is "useful". Resolves: #51018 Release note: None 147888: kvcoord: avoid unnecessary flushing in txnWriteBuffer r=miraradeva a=stevendanna Previously, if the buffer was being flushed midway through a transaction, we were flushing the entire buffer to ensure that future savepoint rollbacks would result in the correct final value being committed. This was pessimistic, since many transactions don't use explicit savepoints. Here, we track whether an explicit savepoint has been created and only flush those values required to rollback the earliest active savepoint. Epic: none Release note: None 148410: authors: add Sachuman to authors r=Sachuman a=Sachuman Epic: None Release note: None Co-authored-by: David Hartunian <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Sachin Jain <[email protected]>
4 parents 56c99de + 63b5fee + e66f38b + c7a22f6 commit 651f996

18 files changed

+624
-10
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ Ryan Kuo <[email protected]> taroface <[email protected]
456456
457457
458458
Ryan Zhao <[email protected]> ryanzhao <[email protected]>
459+
Sachin Jain <[email protected]>
459460
Sadaf Qureshi <[email protected]>
460461
Sai Ravula <[email protected]>
461462
Sajjad Rizvi <[email protected]>

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@ type txnInterceptor interface {
217217
// createSavepointLocked().
218218
rollbackToSavepointLocked(context.Context, savepoint)
219219

220+
// releaseSavepointLocked is called when a savepoint is being
221+
// released.
222+
releaseSavepointLocked(context.Context, *savepoint)
223+
220224
// closeLocked closes the interceptor. It is called when the TxnCoordSender
221225
// shuts down due to either a txn commit or a txn abort. The method will
222226
// be called exactly once from cleanupTxnLocked.

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,15 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
187187
}
188188

189189
sp := s.(*savepoint)
190-
return tc.checkSavepointLocked(sp, "release")
190+
if err := tc.checkSavepointLocked(sp, "release"); err != nil {
191+
return err
192+
}
193+
194+
for _, reqInt := range tc.interceptorStack {
195+
reqInt.releaseSavepointLocked(ctx, sp)
196+
}
197+
198+
return nil
191199
}
192200

193201
// CanUseSavepoint is part of the kv.TxnSender interface.

pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,9 @@ func (tc *txnCommitter) epochBumpedLocked() {}
626626
// createSavepointLocked is part of the txnInterceptor interface.
627627
func (*txnCommitter) createSavepointLocked(context.Context, *savepoint) {}
628628

629+
// releaseSavepointLocked is part of the txnInterceptor interface.
630+
func (*txnCommitter) releaseSavepointLocked(context.Context, *savepoint) {}
631+
629632
// rollbackToSavepointLocked is part of the txnInterceptor interface.
630633
func (*txnCommitter) rollbackToSavepointLocked(context.Context, savepoint) {}
631634

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ func (h *txnHeartbeater) epochBumpedLocked() {}
302302
// createSavepointLocked is part of the txnInterceptor interface.
303303
func (*txnHeartbeater) createSavepointLocked(context.Context, *savepoint) {}
304304

305+
// releaseSavepointLocked is part of the txnInterceptor interface.
306+
func (*txnHeartbeater) releaseSavepointLocked(context.Context, *savepoint) {}
307+
305308
// rollbackToSavepointLocked is part of the txnInterceptor interface.
306309
func (*txnHeartbeater) rollbackToSavepointLocked(context.Context, savepoint) {}
307310

pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ func (*txnMetricRecorder) epochBumpedLocked() {}
7878
// createSavepointLocked is part of the txnInterceptor interface.
7979
func (*txnMetricRecorder) createSavepointLocked(context.Context, *savepoint) {}
8080

81+
// releaseSavepointLocked is part of the txnInterceptor interface.
82+
func (*txnMetricRecorder) releaseSavepointLocked(context.Context, *savepoint) {}
83+
8184
// rollbackToSavepointLocked is part of the txnInterceptor interface.
8285
func (*txnMetricRecorder) rollbackToSavepointLocked(context.Context, savepoint) {}
8386

pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,9 @@ func (tp *txnPipeliner) epochBumpedLocked() {
984984
// createSavepointLocked is part of the txnInterceptor interface.
985985
func (tp *txnPipeliner) createSavepointLocked(context.Context, *savepoint) {}
986986

987+
// releaseSavepointLocked is part of the txnInterceptor interface.
988+
func (tp *txnPipeliner) releaseSavepointLocked(context.Context, *savepoint) {}
989+
987990
// rollbackToSavepointLocked is part of the txnInterceptor interface.
988991
func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
989992
// Move all the writes in txnPipeliner that are not in the savepoint to the

pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save
219219
sp.seqNum = s.writeSeq
220220
}
221221

222+
// releaseSavepointLocked is part of the txnInterceptor interface.
223+
func (*txnSeqNumAllocator) releaseSavepointLocked(context.Context, *savepoint) {}
224+
222225
// rollbackToSavepointLocked is part of the txnInterceptor interface.
223226
func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
224227
// Nothing to restore. The seq nums keep increasing. The TxnCoordSender has

pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,9 @@ func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepo
764764
s.refreshInvalid = sr.refreshInvalid
765765
}
766766

767+
// releaseSavepointLocked is part of the txnInterceptor interface.
768+
func (sr *txnSpanRefresher) releaseSavepointLocked(context.Context, *savepoint) {}
769+
767770
// rollbackToSavepointLocked is part of the txnInterceptor interface.
768771
func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
769772
if !KeepRefreshSpansOnSavepointRollback.Get(&sr.st.SV) {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ type txnWriteBuffer struct {
177177
// sees the next BatchRequest.
178178
flushOnNextBatch bool
179179

180+
// firstExplicitSavepointSeq tracks the lowest explicit savepoint that hasn't
181+
// been released or rolled back. If this savepoint is non-zero, then a
182+
// mid-transaction flush must flush all revisions required to roll back this
183+
// (or a later) savepoint.
184+
firstExplicitSavepointSeq enginepb.TxnSeq
185+
180186
buffer btree
181187
bufferIDAlloc uint64
182188
bufferSize int64
@@ -605,6 +611,9 @@ func (twb *txnWriteBuffer) importLeafFinalState(context.Context, *roachpb.LeafTx
605611

606612
// epochBumpedLocked implements the txnInterceptor interface.
607613
func (twb *txnWriteBuffer) epochBumpedLocked() {
614+
// Sequence numbers are reset on epoch bumps so any retained savepoint is
615+
// wrong.
616+
twb.firstExplicitSavepointSeq = 0
608617
twb.resetBuffer()
609618
}
610619

@@ -613,8 +622,26 @@ func (twb *txnWriteBuffer) resetBuffer() {
613622
twb.bufferSize = 0
614623
}
615624

625+
func (twb *txnWriteBuffer) hasActiveSavepoint() bool {
626+
return twb.firstExplicitSavepointSeq != enginepb.TxnSeq(0)
627+
}
628+
616629
// createSavepointLocked is part of the txnInterceptor interface.
617-
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}
630+
func (twb *txnWriteBuffer) createSavepointLocked(ctx context.Context, sp *savepoint) {
631+
assertTrue(twb.firstExplicitSavepointSeq <= sp.seqNum,
632+
"sequence number in created savepoint lower than retained savepoint")
633+
634+
if twb.firstExplicitSavepointSeq == enginepb.TxnSeq(0) {
635+
twb.firstExplicitSavepointSeq = sp.seqNum
636+
}
637+
}
638+
639+
// rollbackToSavepointLocked is part of the txnInterceptor interface.
640+
func (twb *txnWriteBuffer) releaseSavepointLocked(ctx context.Context, sp *savepoint) {
641+
if twb.firstExplicitSavepointSeq == sp.seqNum {
642+
twb.firstExplicitSavepointSeq = 0
643+
}
644+
}
618645

619646
// rollbackToSavepointLocked is part of the txnInterceptor interface.
620647
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
@@ -647,6 +674,9 @@ func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s save
647674
for _, bw := range toDelete {
648675
twb.removeFromBuffer(bw)
649676
}
677+
if twb.firstExplicitSavepointSeq == s.seqNum {
678+
twb.firstExplicitSavepointSeq = 0
679+
}
650680
}
651681

652682
// closeLocked implements the txnInterceptor interface.
@@ -1370,6 +1400,8 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
13701400
twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Inc(1)
13711401
}
13721402

1403+
midTxnFlushWithExplicitSavepoint := !hasEndTxn && twb.hasActiveSavepoint()
1404+
13731405
// Flush all buffered writes by pre-pending them to the requests being sent
13741406
// in the batch.
13751407
//
@@ -1379,8 +1411,8 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
13791411
it := twb.buffer.MakeIter()
13801412
numRevisionsBuffered := 0
13811413
for it.First(); it.Valid(); it.Next() {
1382-
if !hasEndTxn {
1383-
revs := it.Cur().toAllRevisionRequests()
1414+
if midTxnFlushWithExplicitSavepoint {
1415+
revs := it.Cur().toAllRevisionRequests(twb.firstExplicitSavepointSeq)
13841416
numRevisionsBuffered += len(revs)
13851417
reqs = append(reqs, revs...)
13861418
} else {
@@ -1589,13 +1621,26 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion {
15891621
}
15901622

15911623
// toAllRevisionRequests returns requests for all revisions of the buffered
1592-
// writes for the key. When the buffer is flushed before the end of a
1593-
// transaction, all revisions must be written to storage to ensure that a future
1594-
// savepoint rollback is properly handled.
1595-
func (bw *bufferedWrite) toAllRevisionRequests() []kvpb.RequestUnion {
1624+
// writes that need to be flushed given the minimum sequence number.
1625+
//
1626+
// When the buffer is flushed before the end of a transaction, previous
1627+
// revisions must be written to storage to ensure that a future savepoint
1628+
// rollback is properly handled.
1629+
//
1630+
// The given sequence number is the smallest sequence number associated with an
1631+
// active savepoint.
1632+
//
1633+
// A write below the the minimum sequence number can be elided if there is a
1634+
// subsequent write also below the minimum sequence number.
1635+
func (bw *bufferedWrite) toAllRevisionRequests(minSeq enginepb.TxnSeq) []kvpb.RequestUnion {
15961636
rus := make([]kvpb.RequestUnion, 0, len(bw.vals))
1597-
for _, val := range bw.vals {
1598-
rus = append(rus, val.toRequestUnion(bw.key, bw.exclusionExpectedSinceTimestamp()))
1637+
maxIdx := len(bw.vals) - 1
1638+
for i, val := range bw.vals {
1639+
nextWriteLessThanSeq := (i+1 < maxIdx) && (bw.vals[i+1].seq < minSeq)
1640+
canElideRevision := val.seq < minSeq && nextWriteLessThanSeq
1641+
if !canElideRevision {
1642+
rus = append(rus, val.toRequestUnion(bw.key, bw.exclusionExpectedSinceTimestamp()))
1643+
}
15991644
}
16001645
return rus
16011646
}

0 commit comments

Comments
 (0)