Skip to content

Commit 7e041d5

Browse files
stevendannayuzefovich
authored andcommitted
kvcoord: add request validation to txnWriteBuffer
This adds early detection of unsupported request types and request options. SendLocked now errors on any request that the txnWriteBuffer doesn't explicitly support. We may consider flushing the buffer instead to increase the set of transactions that can use this feature. Fixes #143894 Release note: None
1 parent b5e5b88 commit 7e041d5

File tree

2 files changed

+228
-13
lines changed

2 files changed

+228
-13
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 92 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2121
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
2222
"github.com/cockroachdb/cockroach/pkg/storage/mvcceval"
23+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
2324
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/errors"
@@ -46,8 +47,8 @@ var bufferedWritesMaxBufferSize = settings.RegisterIntSetting(
4647

4748
// txnWriteBuffer is a txnInterceptor that buffers transactional writes until
4849
// commit time. Moreover, it also decomposes read-write KV operations (e.g.
49-
// CPuts, InitPuts) into separate (locking) read and write operations, buffering
50-
// the latter until commit time.
50+
// CPuts) into separate (locking) read and write operations, buffering the
51+
// latter until commit time.
5152
//
5253
// Buffering writes until commit time has four main benefits:
5354
//
@@ -253,7 +254,16 @@ func (twb *txnWriteBuffer) SendLocked(
253254
return twb.flushBufferAndSendBatch(ctx, ba)
254255
}
255256

256-
transformedBa, ts := twb.applyTransformations(ctx, ba)
257+
if err := twb.validateBatch(ba); err != nil {
258+
// We could choose to twb.flushBufferAndSendBatch
259+
// here. For now, we return an error.
260+
return nil, kvpb.NewError(err)
261+
}
262+
263+
transformedBa, ts, pErr := twb.applyTransformations(ctx, ba)
264+
if pErr != nil {
265+
return nil, pErr
266+
}
257267

258268
if len(transformedBa.Requests) == 0 {
259269
// Lower layers (the DistSender and the KVServer) do not expect/handle empty
@@ -278,6 +288,80 @@ func (twb *txnWriteBuffer) SendLocked(
278288
return twb.mergeResponseWithTransformations(ctx, ts, br)
279289
}
280290

291+
// validateBatch returns an error if the batch is unsupported
292+
// by the txnWriteBuffer.
293+
func (twb *txnWriteBuffer) validateBatch(ba *kvpb.BatchRequest) error {
294+
if ba.WriteOptions != nil {
295+
// OriginTimestamp and OriginID are currently only used by Logical Data
296+
// Replication (LDR). These options are unsupported at the moment as we
297+
// don't store the inbound batch options in the buffer.
298+
if ba.WriteOptions.OriginTimestamp.IsSet() {
299+
return errors.AssertionFailedf("transaction write buffer does not support batches with OriginTimestamp set")
300+
}
301+
if ba.WriteOptions.OriginID != 0 {
302+
return errors.AssertionFailedf("transaction write buffer does not support batches with OriginID set")
303+
}
304+
}
305+
return twb.validateRequests(ba)
306+
}
307+
308+
// validateRequests returns an error if any of the requests in the batch
309+
// are unsupported by the txnWriteBuffer.
310+
func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
311+
for _, ru := range ba.Requests {
312+
req := ru.GetInner()
313+
switch t := req.(type) {
314+
case *kvpb.ConditionalPutRequest:
315+
// Our client side ConditionalPutRequest evaluation does not know how to
316+
// handle the origin timestamp setting. Doing so would require sending a
317+
// GetRequest with RawMVCCValues set and parsing the MVCCValueHeader.
318+
if t.OriginTimestamp.IsSet() {
319+
return unsupportedOptionError(t.Method(), "OriginTimestamp")
320+
}
321+
case *kvpb.PutRequest:
322+
case *kvpb.DeleteRequest:
323+
case *kvpb.GetRequest:
324+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
325+
// such reads from the write buffer currently.
326+
if t.ReturnRawMVCCValues {
327+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
328+
}
329+
case *kvpb.ScanRequest:
330+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
331+
// such reads from the write buffer currently.
332+
if t.ReturnRawMVCCValues {
333+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
334+
}
335+
if t.ScanFormat == kvpb.COL_BATCH_RESPONSE {
336+
return unsupportedOptionError(t.Method(), "COL_BATCH_RESPONSE scan format")
337+
}
338+
case *kvpb.ReverseScanRequest:
339+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
340+
// such reads from the write buffer currently.
341+
if t.ReturnRawMVCCValues {
342+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
343+
}
344+
if t.ScanFormat == kvpb.COL_BATCH_RESPONSE {
345+
return unsupportedOptionError(t.Method(), "COL_BATCH_RESPONSE scan format")
346+
}
347+
default:
348+
// All other requests are unsupported. Note that we assume EndTxn and
349+
// DeleteRange requests were handled explicitly before this method was
350+
// called.
351+
return unsupportedMethodError(t.Method())
352+
}
353+
}
354+
return nil
355+
}
356+
357+
func unsupportedMethodError(m kvpb.Method) error {
358+
return errors.AssertionFailedf("transaction write buffer does not support %s requests", m)
359+
}
360+
361+
func unsupportedOptionError(m kvpb.Method, option string) error {
362+
return errors.AssertionFailedf("transaction write buffer does not support %s requests with %s", m, option)
363+
}
364+
281365
// estimateSize returns a conservative estimate by which the buffer will grow in
282366
// size if the writes from the supplied batch request are buffered.
283367
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
@@ -539,7 +623,7 @@ func (twb *txnWriteBuffer) closeLocked() {}
539623
// TODO(arul): Augment this comment as these expand.
540624
func (twb *txnWriteBuffer) applyTransformations(
541625
ctx context.Context, ba *kvpb.BatchRequest,
542-
) (*kvpb.BatchRequest, transformations) {
626+
) (*kvpb.BatchRequest, transformations, *kvpb.Error) {
543627
baRemote := ba.ShallowCopy()
544628
// TODO(arul): We could improve performance here by pre-allocating
545629
// baRemote.Requests to the correct size by counting the number of Puts/Dels
@@ -699,9 +783,6 @@ func (twb *txnWriteBuffer) applyTransformations(
699783
// We've constructed a response that we'll stitch together with the
700784
// result on the response path; eschew sending the request to the KV
701785
// layer.
702-
//
703-
// TODO(arul): if the ReturnRawMVCCValues flag is set, we'll need to
704-
// flush the buffer.
705786
continue
706787
}
707788
// Wasn't served locally; send the request to the KV layer.
@@ -736,10 +817,10 @@ func (twb *txnWriteBuffer) applyTransformations(
736817
baRemote.Requests = append(baRemote.Requests, ru)
737818

738819
default:
739-
baRemote.Requests = append(baRemote.Requests, ru)
820+
return nil, nil, kvpb.NewError(unsupportedMethodError(t.Method()))
740821
}
741822
}
742-
return baRemote, ts
823+
return baRemote, ts, nil
743824
}
744825

745826
// seekItemForSpan returns a bufferedWrite appropriate for use with a
@@ -1103,9 +1184,7 @@ func (t transformation) toResp(
11031184
ru.MustSetInner(reverseScanResp)
11041185

11051186
default:
1106-
// This is only possible once we start decomposing read-write requests into
1107-
// separate bits.
1108-
panic("unimplemented")
1187+
return ru, kvpb.NewError(unsupportedMethodError(req.Method()))
11091188
}
11101189

11111190
return ru, nil
@@ -1725,7 +1804,7 @@ func (m *respMerger) toReverseScanResp(
17251804

17261805
// assertTrue panics with a message if the supplied condition isn't true.
17271806
func assertTrue(cond bool, msg string) {
1728-
if !cond {
1807+
if !cond && buildutil.CrdbTestBuild {
17291808
panic(msg)
17301809
}
17311810
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,3 +1851,139 @@ func TestTxnWriteBufferClearsBufferOnEpochBump(t *testing.T) {
18511851
require.Equal(t, 0, int(twb.bufferSize))
18521852
require.Equal(t, numCalled, mockSender.NumCalled())
18531853
}
1854+
1855+
// TestTxnWriteBufferBatchRequestValidation verifies that the txnWriteBuffer
1856+
// rejects requests that it doesn't know how to support.
1857+
func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
1858+
defer leaktest.AfterTest(t)()
1859+
defer log.Scope(t).Close(t)
1860+
ctx := context.Background()
1861+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1862+
1863+
type testCase struct {
1864+
name string
1865+
ba func() *kvpb.BatchRequest
1866+
}
1867+
1868+
txn := makeTxnProto()
1869+
txn.Sequence = 1
1870+
keyA, keyC := roachpb.Key("a"), roachpb.Key("c")
1871+
1872+
tests := []testCase{
1873+
{
1874+
name: "batch with OriginTimestamp",
1875+
ba: func() *kvpb.BatchRequest {
1876+
header := kvpb.Header{
1877+
Txn: &txn,
1878+
WriteOptions: &kvpb.WriteOptions{
1879+
OriginTimestamp: hlc.Timestamp{WallTime: 1},
1880+
},
1881+
}
1882+
return &kvpb.BatchRequest{Header: header}
1883+
},
1884+
},
1885+
{
1886+
name: "batch with OriginID",
1887+
ba: func() *kvpb.BatchRequest {
1888+
header := kvpb.Header{
1889+
Txn: &txn,
1890+
WriteOptions: &kvpb.WriteOptions{
1891+
OriginID: 1,
1892+
},
1893+
}
1894+
return &kvpb.BatchRequest{Header: header}
1895+
},
1896+
},
1897+
{
1898+
name: "batch with InitPut",
1899+
ba: func() *kvpb.BatchRequest {
1900+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1901+
b.Add(&kvpb.InitPutRequest{
1902+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1903+
Value: roachpb.Value{},
1904+
})
1905+
return b
1906+
},
1907+
},
1908+
{
1909+
name: "batch with Increment",
1910+
ba: func() *kvpb.BatchRequest {
1911+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1912+
b.Add(&kvpb.IncrementRequest{
1913+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1914+
})
1915+
return b
1916+
},
1917+
},
1918+
{
1919+
name: "batch with ReturnRawMVCCValues Scan",
1920+
ba: func() *kvpb.BatchRequest {
1921+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1922+
r := &kvpb.ScanRequest{
1923+
ReturnRawMVCCValues: true,
1924+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1925+
}
1926+
b.Add(r)
1927+
return b
1928+
},
1929+
},
1930+
{
1931+
name: "batch with ReturnRawMVCCValues ReverseScan",
1932+
ba: func() *kvpb.BatchRequest {
1933+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1934+
r := &kvpb.ReverseScanRequest{
1935+
ReturnRawMVCCValues: true,
1936+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1937+
}
1938+
b.Add(r)
1939+
return b
1940+
},
1941+
},
1942+
{
1943+
name: "batch with ReturnRawMVCCValues Get",
1944+
ba: func() *kvpb.BatchRequest {
1945+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1946+
r := &kvpb.GetRequest{
1947+
ReturnRawMVCCValues: true,
1948+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1949+
}
1950+
b.Add(r)
1951+
return b
1952+
},
1953+
},
1954+
{
1955+
name: "batch with COL_BATCH_RESPONSE Scan",
1956+
ba: func() *kvpb.BatchRequest {
1957+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1958+
r := &kvpb.ScanRequest{
1959+
ScanFormat: kvpb.COL_BATCH_RESPONSE,
1960+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1961+
}
1962+
b.Add(r)
1963+
return b
1964+
},
1965+
},
1966+
{
1967+
name: "batch with COL_BATCH_RESPONSE ReverseScan",
1968+
ba: func() *kvpb.BatchRequest {
1969+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1970+
r := &kvpb.ReverseScanRequest{
1971+
ScanFormat: kvpb.COL_BATCH_RESPONSE,
1972+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1973+
}
1974+
b.Add(r)
1975+
return b
1976+
},
1977+
},
1978+
}
1979+
1980+
for _, tc := range tests {
1981+
t.Run(tc.name, func(t *testing.T) {
1982+
numCalledBefore := mockSender.NumCalled()
1983+
_, pErr := twb.SendLocked(ctx, tc.ba())
1984+
require.NotNil(t, pErr)
1985+
require.Equal(t, numCalledBefore, mockSender.NumCalled())
1986+
1987+
})
1988+
}
1989+
}

0 commit comments

Comments
 (0)