Skip to content

Commit 202d7ad

Browse files
committed
kvclient: flush the write buffer if it gets too large
This patch introduces a new cluster setting, kv.transaction.write_buffering.max_buffer_size, which dictates how large a transaction's write buffer can get before we decide to flush all buffered writes to KV. It defaults to 4MB, for now. Once a transaction's buffer is flushed, subsequent writes will no longer be buffered on the client. Instead, the transaction will write intents, as it would have in a pre-buffered writes world. I briefly considered other schemes where we didn't disable buffered writes completely once a transaction goes over budget -- either by only flushing the buffer partly or flushing the buffer in its entirety but allowing subsequent writes to be buffered as long as the transaction has budget. However, I decided against either of these, as many of the benefits of having buffered writes (e.g. 1PC) are no longer possible after the first flush. Moreover, other benefits (e.g. batching, cheaper read-your-own-writes) don't generalize either. For now, we do the simple thing. Resolves #139056 Release note: None
1 parent 2733799 commit 202d7ad

File tree

5 files changed

+394
-92
lines changed

5 files changed

+394
-92
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes u
9898
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
9999
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
100100
kv.transaction.write_buffering.enabled boolean false if enabled, transactional writes are buffered on the client application
101+
kv.transaction.write_buffering.max_buffer_size integer 4194304 if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction application
101102
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
102103
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
103104
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
128128
<tr><td><div id="setting-kv-transaction-reject-over-max-intents-budget-enabled" class="anchored"><code>kv.transaction.reject_over_max_intents_budget.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
129129
<tr><td><div id="setting-kv-transaction-write-buffering-enabled" class="anchored"><code>kv.transaction.write_buffering.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, transactional writes are buffered on the client</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
130+
<tr><td><div id="setting-kv-transaction-write-buffering-max-buffer-size" class="anchored"><code>kv.transaction.write_buffering.max_buffer_size</code></div></td><td>integer</td><td><code>4194304</code></td><td>if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
130131
<tr><td><div id="setting-kv-transaction-write-pipelining-locking-reads-enabled" class="anchored"><code>kv.transaction.write_pipelining.locking_reads.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional locking reads are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
131132
<tr><td><div id="setting-kv-transaction-write-pipelining-ranged-writes-enabled" class="anchored"><code>kv.transaction.write_pipelining.ranged_writes.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional ranged writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
132133
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled<br />(alias: kv.transaction.write_pipelining_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ func (tc *TxnCoordSender) initCommonInterceptors(
321321
if ds, ok := tcf.wrapped.(*DistSender); ok {
322322
riGen.ds = ds
323323
}
324+
tc.interceptorAlloc.txnWriteBuffer.st = tcf.st
324325
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
325326
st: tcf.st,
326327
riGen: riGen,

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 152 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ import (
99
"context"
1010
"encoding/binary"
1111
"slices"
12+
"unsafe"
1213

1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1415
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
1617
"github.com/cockroachdb/cockroach/pkg/settings"
18+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1719
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
1820
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
1921
"github.com/cockroachdb/cockroach/pkg/storage/mvcceval"
@@ -30,6 +32,16 @@ var BufferedWritesEnabled = settings.RegisterBoolSetting(
3032
settings.WithPublic,
3133
)
3234

35+
var bufferedWritesMaxBufferSize = settings.RegisterIntSetting(
36+
settings.ApplicationLevel,
37+
"kv.transaction.write_buffering.max_buffer_size",
38+
"if non-zero, defines that maximum size of the "+
39+
"buffer that will be used to buffer transactional writes per-transaction",
40+
1<<22, // 4MB
41+
settings.NonNegativeInt,
42+
settings.WithPublic,
43+
)
44+
3345
// txnWriteBuffer is a txnInterceptor that buffers transactional writes until
3446
// commit time. Moreover, it also decomposes read-write KV operations (e.g.
3547
// CPuts, InitPuts) into separate (locking) read and write operations, buffering
@@ -111,11 +123,40 @@ var BufferedWritesEnabled = settings.RegisterBoolSetting(
111123
// TODO(arul): In various places below, there's potential to optimize things by
112124
// batch allocating misc objects and pre-allocating some slices.
113125
type txnWriteBuffer struct {
126+
st *cluster.Settings
127+
// enabled indicates whether write buffering is currently enabled for the
128+
// transaction or not. Write buffering may only be enabled on RootTxns, and
129+
// before the transaction has sent any requests. However, a transaction that
130+
// has previously buffered writes may flush its buffer and disable write
131+
// buffering for subsequent requests. This can happen for a few different
132+
// reasons:
133+
//
134+
// 1. If the buffer has exceeded its configured budget, or if the transaction
135+
// issues a DeleteRange request, we flush the buffer and disable write
136+
// buffering going forward. In either case, we're dealing with large writing
137+
// transactions, and there isn't much benefit from write buffering.
138+
// 2. If the transaction is performing a DDL operation, we flush the buffer
139+
// and disable write buffering going forward out of an abundance of caution.
140+
// This is opted into by SQL.
141+
//
142+
// As a result, we have a nice invariant: if write buffering is enabled, then
143+
// all writes performed by the transaction are buffered in memory. We can
144+
// never have the case where a part of the write set is buffered, and the
145+
// other part is replicated.
146+
//
147+
// In the future, the invariant above allows us to omit checking the AbortSpan
148+
// for transactions that have buffered writes enabled. The AbortSpan is used
149+
// to ensure we don't violate read-your-own-write semantics for transactions
150+
// that have been aborted by a conflicting transaction. As read-your-own-write
151+
// semantics are upheld by the client, not the server, for transactions that
152+
// use buffered writes, we can skip the AbortSpan check on the server.
114153
enabled bool
115154

116155
buffer btree
117-
bufferSeek bufferedWrite // re-use while seeking
118156
bufferIDAlloc uint64
157+
bufferSize int64
158+
159+
bufferSeek bufferedWrite // re-use while seeking
119160

120161
wrapped lockedSender
121162

@@ -137,7 +178,15 @@ func (twb *txnWriteBuffer) SendLocked(
137178
// anything.
138179
return twb.wrapped.SendLocked(ctx, ba)
139180
}
140-
return twb.flushWithEndTxn(ctx, ba)
181+
return twb.flushBufferAndSendBatch(ctx, ba)
182+
}
183+
184+
// Check if buffering writes from the supplied batch will run us over
185+
// budget. If it will, we shouldn't buffer writes from the current batch,
186+
// and flush the buffer.
187+
if twb.estimateSize(ba)+twb.bufferSize > bufferedWritesMaxBufferSize.Get(&twb.st.SV) {
188+
// TODO(arul): add some metrics for this case.
189+
return twb.flushBufferAndSendBatch(ctx, ba)
141190
}
142191

143192
transformedBa, ts := twb.applyTransformations(ctx, ba)
@@ -165,6 +214,50 @@ func (twb *txnWriteBuffer) SendLocked(
165214
return twb.mergeResponseWithTransformations(ctx, ts, br)
166215
}
167216

217+
// estimateSize returns a conservative estimate by which the buffer will grow in
218+
// size if the writes from the supplied batch request are buffered.
219+
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
220+
var scratch bufferedWrite
221+
estimate := int64(0)
222+
scratch.vals = make([]bufferedValue, 1)
223+
for _, ru := range ba.Requests {
224+
req := ru.GetInner()
225+
switch t := req.(type) {
226+
case *kvpb.ConditionalPutRequest:
227+
// At this point, we don't know whether the condition will evaluate
228+
// successfully or not, and by extension, whether the KV will be added to
229+
// the buffer. We therefore assume the worst case scenario (where the KV
230+
// is added to the buffer) in our estimate.
231+
scratch.key = t.Key
232+
scratch.vals[0] = bufferedValue{
233+
val: t.Value,
234+
seq: t.Sequence,
235+
}
236+
estimate += scratch.size()
237+
case *kvpb.PutRequest:
238+
// NB: when estimating, we're being conservative by assuming the Put is to
239+
// a key that isn't already present in the buffer. If it were, we could
240+
// omit the key's size from the estimate.
241+
scratch.key = t.Key
242+
scratch.vals[0] = bufferedValue{
243+
val: t.Value,
244+
seq: t.Sequence,
245+
}
246+
estimate += scratch.size()
247+
case *kvpb.DeleteRequest:
248+
// NB: Similar to Put, we're assuming we're deleting a key that isn't
249+
// already present in the buffer.
250+
scratch.key = t.Key
251+
scratch.vals[0] = bufferedValue{
252+
seq: t.Sequence,
253+
}
254+
estimate += scratch.size()
255+
}
256+
// No other request is buffered.
257+
}
258+
return estimate
259+
}
260+
168261
// adjustError adjusts the provided error based on the transformations made by
169262
// the txnWriteBuffer to the batch request before sending it to KV.
170263
func (twb *txnWriteBuffer) adjustError(
@@ -922,36 +1015,61 @@ func (twb *txnWriteBuffer) addToBuffer(key roachpb.Key, val roachpb.Value, seq e
9221015
if it.Valid() {
9231016
// We've already seen a write for this key.
9241017
bw := it.Cur()
925-
bw.vals = append(bw.vals, bufferedValue{val: val, seq: seq})
1018+
val := bufferedValue{val: val, seq: seq}
1019+
bw.vals = append(bw.vals, val)
1020+
twb.bufferSize += val.size()
9261021
} else {
9271022
twb.bufferIDAlloc++
928-
twb.buffer.Set(&bufferedWrite{
1023+
bw := &bufferedWrite{
9291024
id: twb.bufferIDAlloc,
9301025
key: key,
9311026
vals: []bufferedValue{{val: val, seq: seq}},
932-
})
1027+
}
1028+
twb.buffer.Set(bw)
1029+
twb.bufferSize += bw.size()
9331030
}
9341031
}
9351032

936-
// flushWithEndTxn flushes all buffered writes to the KV layer along with the
937-
// EndTxn request. Responses from the flushing are stripped before returning.
938-
func (twb *txnWriteBuffer) flushWithEndTxn(
1033+
// flushBufferAndSendBatch flushes all buffered writes when sending the supplied
1034+
// batch request to the KV layer. This is done by pre-pending the buffered
1035+
// writes to the requests in the batch.
1036+
//
1037+
// The response is transformed to hide the fact that requests were added to the
1038+
// batch to flush the buffer. Upper layers remain oblivious to the flush and any
1039+
// buffering in general.
1040+
func (twb *txnWriteBuffer) flushBufferAndSendBatch(
9391041
ctx context.Context, ba *kvpb.BatchRequest,
9401042
) (*kvpb.BatchResponse, *kvpb.Error) {
1043+
defer func() {
1044+
assertTrue(twb.buffer.Len() == 0, "buffer should be empty after flush")
1045+
assertTrue(twb.bufferSize == 0, "buffer size should be 0 after flush")
1046+
}()
1047+
9411048
numBuffered := twb.buffer.Len()
9421049
if numBuffered == 0 {
9431050
return twb.wrapped.SendLocked(ctx, ba) // nothing to flush
9441051
}
945-
// Iterate over the buffered writes and flush all buffered writes to the KV
946-
// layer by adding them to the batch.
947-
//
948-
// TODO(arul): If the batch request with the EndTxn request also contains an
949-
// overlapping write to a key that's already in the buffer, we could exclude
950-
// that write from the buffer.
951-
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
1052+
1053+
// Once we've flushed the buffer, we disable write buffering going forward.
1054+
twb.enabled = false
1055+
1056+
// Flush all buffered writes by pre-pending them to the requests being sent
1057+
// in the batch.
1058+
// First, collect the requests we'll need to flush.
1059+
toFlushBufferedWrites := make([]bufferedWrite, 0, twb.buffer.Len())
1060+
9521061
it := twb.buffer.MakeIter()
9531062
for it.First(); it.Valid(); it.Next() {
954-
reqs = append(reqs, it.Cur().toRequest())
1063+
toFlushBufferedWrites = append(toFlushBufferedWrites, *it.Cur())
1064+
}
1065+
1066+
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
1067+
1068+
// Next, remove the buffered writes from the buffer and collect them into requests.
1069+
for _, bw := range toFlushBufferedWrites {
1070+
reqs = append(reqs, bw.toRequest())
1071+
twb.buffer.Delete(&bw)
1072+
twb.bufferSize -= bw.size()
9551073
}
9561074

9571075
// Layers below us expect that writes inside a batch are in sequence number
@@ -970,13 +1088,12 @@ func (twb *txnWriteBuffer) flushWithEndTxn(
9701088
})
9711089

9721090
ba = ba.ShallowCopy()
973-
reqs = append(reqs, ba.Requests...)
974-
ba.Requests = reqs
975-
1091+
ba.Requests = append(reqs, ba.Requests...)
9761092
br, pErr := twb.wrapped.SendLocked(ctx, ba)
9771093
if pErr != nil {
9781094
return nil, twb.adjustErrorUponFlush(ctx, numBuffered, pErr)
9791095
}
1096+
9801097
// Strip out responses for all the flushed buffered writes.
9811098
br.Responses = br.Responses[numBuffered:]
9821099
return br, nil
@@ -1003,6 +1120,8 @@ func (twb *txnWriteBuffer) testingBufferedWritesAsSlice() []bufferedWrite {
10031120
return writes
10041121
}
10051122

1123+
const bufferedWriteStructOverhead = int64(unsafe.Sizeof(bufferedWrite{}))
1124+
10061125
// bufferedWrite is a buffered write operation to a given key. It maps a key to
10071126
// possibly multiple values[1], each with an associated sequence number.
10081127
//
@@ -1022,6 +1141,16 @@ type bufferedWrite struct {
10221141
vals []bufferedValue // sorted in increasing sequence number order
10231142
}
10241143

1144+
func (bw *bufferedWrite) size() int64 {
1145+
size := keySize(bw.key) + keySize(bw.endKey) + bufferedWriteStructOverhead
1146+
for _, v := range bw.vals {
1147+
size += v.size()
1148+
}
1149+
return size
1150+
}
1151+
1152+
const bufferedValueStructOverhead = int64(unsafe.Sizeof(bufferedValue{}))
1153+
10251154
// bufferedValue is a value written to a key at a given sequence number.
10261155
type bufferedValue struct {
10271156
val roachpb.Value
@@ -1037,6 +1166,10 @@ func (bv *bufferedValue) valPtr() *roachpb.Value {
10371166
return &valCpy
10381167
}
10391168

1169+
func (bv *bufferedValue) size() int64 {
1170+
return int64(len(bv.val.RawBytes)) + bufferedValueStructOverhead
1171+
}
1172+
10401173
//go:generate ../../../util/interval/generic/gen.sh *bufferedWrite kvcoord
10411174

10421175
// Methods required by util/interval/generic type contract.

0 commit comments

Comments
 (0)