Skip to content

Commit cf7e2ee

Browse files
committed
kvcoord: allow write buffer to re-enable pipelining
This is an attempt to allow the pipeliner to re-enable pipelining when it is flushing its buffer. Epic: none Release note: None
1 parent 704e340 commit cf7e2ee

File tree

3 files changed

+37
-4
lines changed

3 files changed

+37
-4
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ func newRootTxnCoordSender(
281281
timeSource: timeutil.DefaultTimeSource{},
282282
txn: &tcs.mu.txn,
283283
}
284+
tcs.interceptorAlloc.txnWriteBuffer.init(
285+
&tcs.interceptorAlloc.txnPipeliner,
286+
)
287+
284288
tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)
285289

286290
// Once the interceptors are initialized, piece them all together in the
@@ -446,6 +450,7 @@ func (tc *TxnCoordSender) DisablePipelining() error {
446450
if tc.mu.active {
447451
return errors.Errorf("cannot disable pipelining on a running transaction")
448452
}
453+
tc.interceptorAlloc.txnPipeliner.disabledExplicitly = true
449454
tc.interceptorAlloc.txnPipeliner.disabled = true
450455
return nil
451456
}

pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,14 @@ var rejectTxnMaxCount = settings.RegisterIntSetting(
226226
// attached to any end transaction request that is passed through the pipeliner
227227
// to ensure that they the locks within them are released.
228228
type txnPipeliner struct {
229-
st *cluster.Settings
230-
riGen rangeIteratorFactory // used to condense lock spans, if provided
231-
wrapped lockedSender
229+
st *cluster.Settings
230+
riGen rangeIteratorFactory // used to condense lock spans, if provided
231+
wrapped lockedSender
232+
// disabledExplicitly tracks whether the user called txn.DisablePipelining().
233+
// This is separate from disabled so that the txnWriteBuffer can enable
234+
// pipelining if it has flushed its buffer iff pipelining wasn't previously
235+
// explicitly disabled.
236+
disabledExplicitly bool
232237
disabled bool
233238
txnMetrics *TxnMetrics
234239
condensedIntentsEveryN *log.EveryN
@@ -587,6 +592,12 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
587592
return true
588593
}
589594

595+
// enableImplicitPipelining enables pipelining unless pipelining was explicitly
596+
// disabled previously.
597+
func (tp *txnPipeliner) enableImplicitPipelining() {
598+
tp.disabled = tp.disabledExplicitly
599+
}
600+
590601
// chainToInFlightWrites ensures that we "chain" on to any in-flight writes that
591602
// overlap the keys we're trying to read/write. We do this by prepending
592603
// QueryIntent requests with the ErrorIfMissing option before each request that

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ type txnWriteBuffer struct {
183183
// `flushed` tracks whether the buffer has been previously flushed.
184184
flushed bool
185185

186+
pipelineEnabler pipelineEnabler
187+
186188
// flushOnNextBatch, if set, indicates that write buffering has just been
187189
// disabled, and the interceptor should flush any buffered writes when it
188190
// sees the next BatchRequest.
@@ -215,6 +217,14 @@ type txnWriteBuffer struct {
215217
testingOverrideCPutEvalFn func(expBytes []byte, actVal *roachpb.Value, actValPresent bool, allowNoExisting bool) *kvpb.ConditionFailedError
216218
}
217219

220+
type pipelineEnabler interface {
221+
enableImplicitPipelining()
222+
}
223+
224+
func (twb *txnWriteBuffer) init(pe pipelineEnabler) {
225+
twb.pipelineEnabler = pe
226+
}
227+
218228
func (twb *txnWriteBuffer) setEnabled(enabled bool) {
219229
if !enabled && twb.buffer.Len() > 0 {
220230
// When disabling write buffering, if we evaluated any requests, we need
@@ -1678,7 +1688,14 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
16781688
// Once we've flushed the buffer, we disable write buffering going forward. We
16791689
// do this even if the buffer is empty since once we've called this function,
16801690
// our buffer no longer represents all of the writes in the transaction.
1681-
log.VEventf(ctx, 2, "disabling write buffering for this epoch")
1691+
log.VEventf(ctx, 2, "disabling write buffering")
1692+
if twb.pipelineEnabler != nil {
1693+
// We enable pipelining, but only after this request returns.
1694+
//
1695+
// TODO(ssd): Consider enabling pipelining for this batch as well once we
1696+
// have more discipline around the invariants of the batches we are sending.
1697+
defer twb.pipelineEnabler.enableImplicitPipelining()
1698+
}
16821699
twb.flushed = true
16831700

16841701
numKeysBuffered := twb.buffer.Len()

0 commit comments

Comments
 (0)