Skip to content

Commit 9985f68

Browse files
craig[bot]stevendannafqazi
committed
144225: kvcoord: add request validation to txnWriteBuffer r=yuzefovich a=stevendanna This adds early detection of unsupported request types and request options. SendLocked now errors on any request that the txnWriteBuffer doesn't explicitly support. We may consider flushing the buffer instead to increase the set of transactions that can use this feature. Fixes #143894 Release note: None 144240: sql/schemachanger: avoid falling back when adding multiple pks r=fqazi a=fqazi Previously, the declarative schema changer would incorrectly fallback if the user was attempting to add multiple primary keys. This was a gap in the logic for automatically cleaning up default pks, which return a not implemented error if the rowid column was not found. To address this, this patch will always generates the correct error. Fixes: #144235 Release note: None Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Faizan Qazi <[email protected]>
3 parents d5b0b3a + 7e041d5 + 735b918 commit 9985f68

File tree

7 files changed

+260
-22
lines changed

7 files changed

+260
-22
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 92 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2121
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
2222
"github.com/cockroachdb/cockroach/pkg/storage/mvcceval"
23+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
2324
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/errors"
@@ -46,8 +47,8 @@ var bufferedWritesMaxBufferSize = settings.RegisterIntSetting(
4647

4748
// txnWriteBuffer is a txnInterceptor that buffers transactional writes until
4849
// commit time. Moreover, it also decomposes read-write KV operations (e.g.
49-
// CPuts, InitPuts) into separate (locking) read and write operations, buffering
50-
// the latter until commit time.
50+
// CPuts) into separate (locking) read and write operations, buffering the
51+
// latter until commit time.
5152
//
5253
// Buffering writes until commit time has four main benefits:
5354
//
@@ -253,7 +254,16 @@ func (twb *txnWriteBuffer) SendLocked(
253254
return twb.flushBufferAndSendBatch(ctx, ba)
254255
}
255256

256-
transformedBa, ts := twb.applyTransformations(ctx, ba)
257+
if err := twb.validateBatch(ba); err != nil {
258+
// We could choose to twb.flushBufferAndSendBatch
259+
// here. For now, we return an error.
260+
return nil, kvpb.NewError(err)
261+
}
262+
263+
transformedBa, ts, pErr := twb.applyTransformations(ctx, ba)
264+
if pErr != nil {
265+
return nil, pErr
266+
}
257267

258268
if len(transformedBa.Requests) == 0 {
259269
// Lower layers (the DistSender and the KVServer) do not expect/handle empty
@@ -278,6 +288,80 @@ func (twb *txnWriteBuffer) SendLocked(
278288
return twb.mergeResponseWithTransformations(ctx, ts, br)
279289
}
280290

291+
// validateBatch returns an error if the batch is unsupported
292+
// by the txnWriteBuffer.
293+
func (twb *txnWriteBuffer) validateBatch(ba *kvpb.BatchRequest) error {
294+
if ba.WriteOptions != nil {
295+
// OriginTimestamp and OriginID are currently only used by Logical Data
296+
// Replication (LDR). These options are unsupported at the moment as we
297+
// don't store the inbound batch options in the buffer.
298+
if ba.WriteOptions.OriginTimestamp.IsSet() {
299+
return errors.AssertionFailedf("transaction write buffer does not support batches with OriginTimestamp set")
300+
}
301+
if ba.WriteOptions.OriginID != 0 {
302+
return errors.AssertionFailedf("transaction write buffer does not support batches with OriginID set")
303+
}
304+
}
305+
return twb.validateRequests(ba)
306+
}
307+
308+
// validateRequests returns an error if any of the requests in the batch
309+
// are unsupported by the txnWriteBuffer.
310+
func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
311+
for _, ru := range ba.Requests {
312+
req := ru.GetInner()
313+
switch t := req.(type) {
314+
case *kvpb.ConditionalPutRequest:
315+
// Our client side ConditionalPutRequest evaluation does not know how to
316+
// handle the origin timestamp setting. Doing so would require sending a
317+
// GetRequest with RawMVCCValues set and parsing the MVCCValueHeader.
318+
if t.OriginTimestamp.IsSet() {
319+
return unsupportedOptionError(t.Method(), "OriginTimestamp")
320+
}
321+
case *kvpb.PutRequest:
322+
case *kvpb.DeleteRequest:
323+
case *kvpb.GetRequest:
324+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
325+
// such reads from the write buffer currently.
326+
if t.ReturnRawMVCCValues {
327+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
328+
}
329+
case *kvpb.ScanRequest:
330+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
331+
// such reads from the write buffer currently.
332+
if t.ReturnRawMVCCValues {
333+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
334+
}
335+
if t.ScanFormat == kvpb.COL_BATCH_RESPONSE {
336+
return unsupportedOptionError(t.Method(), "COL_BATCH_RESPONSE scan format")
337+
}
338+
case *kvpb.ReverseScanRequest:
339+
// ReturnRawMVCCValues is unsupported because we don't know how to serve
340+
// such reads from the write buffer currently.
341+
if t.ReturnRawMVCCValues {
342+
return unsupportedOptionError(t.Method(), "ReturnRawMVCCValue")
343+
}
344+
if t.ScanFormat == kvpb.COL_BATCH_RESPONSE {
345+
return unsupportedOptionError(t.Method(), "COL_BATCH_RESPONSE scan format")
346+
}
347+
default:
348+
// All other requests are unsupported. Note that we assume EndTxn and
349+
// DeleteRange requests were handled explicitly before this method was
350+
// called.
351+
return unsupportedMethodError(t.Method())
352+
}
353+
}
354+
return nil
355+
}
356+
357+
func unsupportedMethodError(m kvpb.Method) error {
358+
return errors.AssertionFailedf("transaction write buffer does not support %s requests", m)
359+
}
360+
361+
func unsupportedOptionError(m kvpb.Method, option string) error {
362+
return errors.AssertionFailedf("transaction write buffer does not support %s requests with %s", m, option)
363+
}
364+
281365
// estimateSize returns a conservative estimate by which the buffer will grow in
282366
// size if the writes from the supplied batch request are buffered.
283367
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
@@ -539,7 +623,7 @@ func (twb *txnWriteBuffer) closeLocked() {}
539623
// TODO(arul): Augment this comment as these expand.
540624
func (twb *txnWriteBuffer) applyTransformations(
541625
ctx context.Context, ba *kvpb.BatchRequest,
542-
) (*kvpb.BatchRequest, transformations) {
626+
) (*kvpb.BatchRequest, transformations, *kvpb.Error) {
543627
baRemote := ba.ShallowCopy()
544628
// TODO(arul): We could improve performance here by pre-allocating
545629
// baRemote.Requests to the correct size by counting the number of Puts/Dels
@@ -699,9 +783,6 @@ func (twb *txnWriteBuffer) applyTransformations(
699783
// We've constructed a response that we'll stitch together with the
700784
// result on the response path; eschew sending the request to the KV
701785
// layer.
702-
//
703-
// TODO(arul): if the ReturnRawMVCCValues flag is set, we'll need to
704-
// flush the buffer.
705786
continue
706787
}
707788
// Wasn't served locally; send the request to the KV layer.
@@ -736,10 +817,10 @@ func (twb *txnWriteBuffer) applyTransformations(
736817
baRemote.Requests = append(baRemote.Requests, ru)
737818

738819
default:
739-
baRemote.Requests = append(baRemote.Requests, ru)
820+
return nil, nil, kvpb.NewError(unsupportedMethodError(t.Method()))
740821
}
741822
}
742-
return baRemote, ts
823+
return baRemote, ts, nil
743824
}
744825

745826
// seekItemForSpan returns a bufferedWrite appropriate for use with a
@@ -1103,9 +1184,7 @@ func (t transformation) toResp(
11031184
ru.MustSetInner(reverseScanResp)
11041185

11051186
default:
1106-
// This is only possible once we start decomposing read-write requests into
1107-
// separate bits.
1108-
panic("unimplemented")
1187+
return ru, kvpb.NewError(unsupportedMethodError(req.Method()))
11091188
}
11101189

11111190
return ru, nil
@@ -1725,7 +1804,7 @@ func (m *respMerger) toReverseScanResp(
17251804

17261805
// assertTrue panics with a message if the supplied condition isn't true.
17271806
func assertTrue(cond bool, msg string) {
1728-
if !cond {
1807+
if !cond && buildutil.CrdbTestBuild {
17291808
panic(msg)
17301809
}
17311810
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,3 +1851,139 @@ func TestTxnWriteBufferClearsBufferOnEpochBump(t *testing.T) {
18511851
require.Equal(t, 0, int(twb.bufferSize))
18521852
require.Equal(t, numCalled, mockSender.NumCalled())
18531853
}
1854+
1855+
// TestTxnWriteBufferBatchRequestValidation verifies that the txnWriteBuffer
1856+
// rejects requests that it doesn't know how to support.
1857+
func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
1858+
defer leaktest.AfterTest(t)()
1859+
defer log.Scope(t).Close(t)
1860+
ctx := context.Background()
1861+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1862+
1863+
type testCase struct {
1864+
name string
1865+
ba func() *kvpb.BatchRequest
1866+
}
1867+
1868+
txn := makeTxnProto()
1869+
txn.Sequence = 1
1870+
keyA, keyC := roachpb.Key("a"), roachpb.Key("c")
1871+
1872+
tests := []testCase{
1873+
{
1874+
name: "batch with OriginTimestamp",
1875+
ba: func() *kvpb.BatchRequest {
1876+
header := kvpb.Header{
1877+
Txn: &txn,
1878+
WriteOptions: &kvpb.WriteOptions{
1879+
OriginTimestamp: hlc.Timestamp{WallTime: 1},
1880+
},
1881+
}
1882+
return &kvpb.BatchRequest{Header: header}
1883+
},
1884+
},
1885+
{
1886+
name: "batch with OriginID",
1887+
ba: func() *kvpb.BatchRequest {
1888+
header := kvpb.Header{
1889+
Txn: &txn,
1890+
WriteOptions: &kvpb.WriteOptions{
1891+
OriginID: 1,
1892+
},
1893+
}
1894+
return &kvpb.BatchRequest{Header: header}
1895+
},
1896+
},
1897+
{
1898+
name: "batch with InitPut",
1899+
ba: func() *kvpb.BatchRequest {
1900+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1901+
b.Add(&kvpb.InitPutRequest{
1902+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1903+
Value: roachpb.Value{},
1904+
})
1905+
return b
1906+
},
1907+
},
1908+
{
1909+
name: "batch with Increment",
1910+
ba: func() *kvpb.BatchRequest {
1911+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1912+
b.Add(&kvpb.IncrementRequest{
1913+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1914+
})
1915+
return b
1916+
},
1917+
},
1918+
{
1919+
name: "batch with ReturnRawMVCCValues Scan",
1920+
ba: func() *kvpb.BatchRequest {
1921+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1922+
r := &kvpb.ScanRequest{
1923+
ReturnRawMVCCValues: true,
1924+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1925+
}
1926+
b.Add(r)
1927+
return b
1928+
},
1929+
},
1930+
{
1931+
name: "batch with ReturnRawMVCCValues ReverseScan",
1932+
ba: func() *kvpb.BatchRequest {
1933+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1934+
r := &kvpb.ReverseScanRequest{
1935+
ReturnRawMVCCValues: true,
1936+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1937+
}
1938+
b.Add(r)
1939+
return b
1940+
},
1941+
},
1942+
{
1943+
name: "batch with ReturnRawMVCCValues Get",
1944+
ba: func() *kvpb.BatchRequest {
1945+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1946+
r := &kvpb.GetRequest{
1947+
ReturnRawMVCCValues: true,
1948+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1949+
}
1950+
b.Add(r)
1951+
return b
1952+
},
1953+
},
1954+
{
1955+
name: "batch with COL_BATCH_RESPONSE Scan",
1956+
ba: func() *kvpb.BatchRequest {
1957+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1958+
r := &kvpb.ScanRequest{
1959+
ScanFormat: kvpb.COL_BATCH_RESPONSE,
1960+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1961+
}
1962+
b.Add(r)
1963+
return b
1964+
},
1965+
},
1966+
{
1967+
name: "batch with COL_BATCH_RESPONSE ReverseScan",
1968+
ba: func() *kvpb.BatchRequest {
1969+
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1970+
r := &kvpb.ReverseScanRequest{
1971+
ScanFormat: kvpb.COL_BATCH_RESPONSE,
1972+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
1973+
}
1974+
b.Add(r)
1975+
return b
1976+
},
1977+
},
1978+
}
1979+
1980+
for _, tc := range tests {
1981+
t.Run(tc.name, func(t *testing.T) {
1982+
numCalledBefore := mockSender.NumCalled()
1983+
_, pErr := twb.SendLocked(ctx, tc.ba())
1984+
require.NotNil(t, pErr)
1985+
require.Equal(t, numCalledBefore, mockSender.NumCalled())
1986+
1987+
})
1988+
}
1989+
}

pkg/sql/logictest/testdata/logic_test/new_schema_changer

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,3 +1540,23 @@ statement ok
15401540
EXPLAIN (DDL) CREATE SCHEMA sc1
15411541

15421542
subtest end
1543+
1544+
1545+
subtest multiple_primary_key
1546+
1547+
statement ok
1548+
CREATE TABLE multiple_pk_attempt(n INT PRIMARY KEY, j INT)
1549+
1550+
let $use_decl_sc
1551+
SHOW use_declarative_schema_changer
1552+
1553+
statement ok
1554+
SET use_declarative_schema_changer = 'unsafe_always'
1555+
1556+
statement error pgcode 42611 multiple primary keys for table \"multiple_pk_attempt\" are not allowed
1557+
ALTER TABLE multiple_pk_attempt ADD PRIMARY KEY (j);
1558+
1559+
statement ok
1560+
SET use_declarative_schema_changer = $use_decl_sc
1561+
1562+
subtest end

pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_constraint.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ func alterTableAddPrimaryKey(
8585
if getPrimaryIndexDefaultRowIDColumn(
8686
b, tbl.TableID, oldPrimaryIndex.IndexID,
8787
) == nil {
88-
panic(scerrors.NotImplementedError(t))
88+
// If the constraint already exists then nothing to do here.
89+
if oldPrimaryIndex != nil && d.IfNotExists {
90+
return
91+
}
92+
panic(pgerror.Newf(pgcode.InvalidColumnDefinition,
93+
"multiple primary keys for table %q are not allowed", tn.Object()))
8994
}
9095
alterPrimaryKey(b, tn, tbl, stmt, alterPrimaryKeySpec{
9196
n: t,

pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,12 +447,18 @@ func fallBackIfRegionalByRowTable(b BuildCtx, t tree.NodeFormatter, tableID cati
447447
}
448448
}
449449

450+
// mustRetrieveCurrentPrimaryIndexElement retrieves the current primary index,
451+
// which must be public.
450452
func mustRetrieveCurrentPrimaryIndexElement(
451453
b BuildCtx, tableID catid.DescID,
452454
) (res *scpb.PrimaryIndex) {
453455
scpb.ForEachPrimaryIndex(b.QueryByID(tableID), func(
454456
current scpb.Status, target scpb.TargetStatus, e *scpb.PrimaryIndex,
455457
) {
458+
// TODO(fqazi): We don't support DROP CONSTRAINT PRIMARY KEY, so there is no
459+
// risk of ever seeing a non-public PrimaryIndex element. In the future when
460+
// we do support DROP CONSTRAINT PRIMARY KEY, we should adapt callers of
461+
// this function to handle the absent primary index case.
456462
if current == scpb.Status_PUBLIC {
457463
res = e
458464
}

pkg/sql/schemachanger/scbuild/testdata/unimplemented_alter_table

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ unimplemented
3232
ALTER TABLE defaultdb.foo EXPERIMENTAL_AUDIT SET READ WRITE
3333
----
3434

35-
unimplemented
36-
ALTER TABLE defaultdb.foo ADD PRIMARY KEY (l);
37-
----
38-
3935
unimplemented
4036
ALTER TABLE defaultdb.foo PARTITION BY NOTHING
4137
----

pkg/sql/schemachanger/scbuild/testdata/zone_cfg

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ ALTER TABLE defaultdb.foo_index_zone_cfg ADD COLUMN j INT
4646
- [[IndexColumn:{DescID: 105, ColumnID: 7, IndexID: 1}, PUBLIC], ABSENT]
4747
{columnId: 7, indexId: 1, kind: STORED, ordinalInKind: 5, tableId: 105}
4848

49-
unimplemented
50-
ALTER TABLE defaultdb.foo_index_zone_cfg ADD PRIMARY KEY (i, n)
51-
----
52-
5349
build
5450
ALTER TABLE defaultdb.foo_index_zone_cfg DROP COLUMN k
5551
----

0 commit comments

Comments
 (0)