Skip to content

Commit eec520b

Browse files
committed
kvclient: de-flake tests due to metamorphic max_buffer_size
This commit adjusts the relevant tests that can produce a different output if the txnWriteBuffer size is set too low (by ensuring a lower bound on the cluster setting). All affected tests need at most 2KiB which is the value I decided to use everywhere for consistency. Additionally, in order to improve the test coverage, it reduces the allowed range of metamorphic values to be in 1B-10KiB range. `buffered_writes_lock_loss` test seemingly deadlocks with 1B buffer size, so a separate issue is filed to investigate that. Release note: None
1 parent 84856d9 commit eec520b

File tree

12 files changed

+59
-12
lines changed

12 files changed

+59
-12
lines changed

pkg/ccl/logictestccl/testdata/logic_test/buffered_writes_lock_loss

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# LogicTest: !3node-tenant !local-mixed-25.2
22
# cluster-opt: disable-mvcc-range-tombstones-for-point-deletes
33

4+
# TODO(#151895): investigate why setting it to 1B deadlocks the test.
5+
statement ok
6+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
7+
48
statement ok
59
SET CLUSTER SETTING sql.txn.write_buffering_for_weak_isolation.enabled=true
610

pkg/ccl/logictestccl/testdata/logic_test/cluster_locks_tenant_write_buffering

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
statement ok
44
SET kv_transaction_buffered_writes_enabled = true;
55

6+
statement ok
7+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
8+
69
# Create a table, write a row, lock it, then switch users.
710
statement ok
811
CREATE TABLE t (k STRING PRIMARY KEY, v STRING, FAMILY (k,v))

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,18 @@ var bufferedWritesGetTransformEnabled = settings.RegisterBoolSetting(
5959
)
6060

6161
const defaultBufferSize = 1 << 22 // 4MB
62-
var bufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
62+
63+
// BufferedWritesMaxBufferSize controls the per-txn buffer size. It's exported
64+
// only to be accessed from tests.
65+
var BufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
6366
settings.ApplicationLevel,
6467
"kv.transaction.write_buffering.max_buffer_size",
6568
"if non-zero, defines that maximum size of the "+
6669
"buffer that will be used to buffer transactional writes per-transaction",
6770
int64(metamorphic.ConstantWithTestRange("kv.transaction.write_buffering.max_buffer_size",
6871
defaultBufferSize, // default
6972
1, // min
70-
defaultBufferSize, // max
73+
10<<10, // max, 10KiB
7174
)),
7275
settings.NonNegativeInt,
7376
settings.WithPublic,
@@ -298,10 +301,10 @@ func (twb *txnWriteBuffer) SendLocked(
298301
// Check if buffering writes from the supplied batch will run us over
299302
// budget. If it will, we shouldn't buffer writes from the current batch,
300303
// and flush the buffer.
301-
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
304+
maxSize := BufferedWritesMaxBufferSize.Get(&twb.st.SV)
302305
bufSize := twb.estimateSize(ba, cfg) + twb.bufferSize
303306

304-
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
307+
// NB: if BufferedWritesMaxBufferSize is set to 0 then we effectively disable
305308
// any buffer limiting.
306309
if maxSize != 0 && bufSize > maxSize {
307310
twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Inc(1)
@@ -1051,9 +1054,9 @@ func (twb *txnWriteBuffer) maybeMutateBatchMaxSpanRequestKeys(
10511054
}
10521055

10531056
// If the user has disabled a maximum buffer size, respect that.
1054-
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
1057+
maxSize := BufferedWritesMaxBufferSize.Get(&twb.st.SV)
10551058
if maxSize == 0 {
1056-
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because %s=0", bufferedWritesMaxBufferSize.Name())
1059+
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because %s=0", BufferedWritesMaxBufferSize.Name())
10571060
return
10581061
}
10591062

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func TestTxnCoordSenderWriteBufferingDisablesPipelining(t *testing.T) {
6060
require.NoError(t, db.Put(ctx, "test-key-a", "hello"))
6161

6262
bufferedWritesScanTransformEnabled.Override(ctx, &st.SV, false)
63+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
6364

6465
// Without write buffering
6566
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func makeMockTxnWriteBuffer(
3232
st := cluster.MakeClusterSettings()
3333
bufferedWritesScanTransformEnabled.Override(ctx, &st.SV, true)
3434
bufferedWritesGetTransformEnabled.Override(ctx, &st.SV, true)
35-
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
35+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
3636

3737
var metrics TxnMetrics
3838
if len(optionalMetrics) > 0 {
@@ -1586,7 +1586,7 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
15861586

15871587
putAEstimate := int64(len(keyA)+valA.Size()) + bufferedWriteStructOverhead + bufferedValueStructOverhead
15881588

1589-
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, putAEstimate)
1589+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, putAEstimate)
15901590

15911591
ba := &kvpb.BatchRequest{}
15921592
ba.Header = kvpb.Header{Txn: &txn}
@@ -1739,7 +1739,7 @@ func TestTxnWriteBufferLimitsSizeOfScans(t *testing.T) {
17391739
txn.Sequence = 10
17401740

17411741
bufferedWritesScanTransformEnabled.Override(ctx, &st.SV, true)
1742-
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, tc.bufferSize)
1742+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, tc.bufferSize)
17431743

17441744
ba := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
17451745
if isReverse {
@@ -1777,7 +1777,7 @@ func TestTxnWriteBufferLimitsSizeOfScans(t *testing.T) {
17771777
txn := makeTxnProto()
17781778
txn.Sequence = 10
17791779

1780-
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, 10*(lockKeyInfoSize+int64(len(keyA))))
1780+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, 10*(lockKeyInfoSize+int64(len(keyA))))
17811781

17821782
ba := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
17831783
ba.Add(&kvpb.ScanRequest{
@@ -2132,7 +2132,7 @@ func TestTxnWriteBufferSplitsBatchesWithSkipLocked(t *testing.T) {
21322132
br.Txn = ba.Txn
21332133
return br, nil
21342134
})
2135-
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, 1)
2135+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, 1)
21362136
ba = &kvpb.BatchRequest{
21372137
Header: kvpb.Header{
21382138
Txn: &txn,
@@ -2533,7 +2533,7 @@ func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
25332533
{
25342534
name: "flushed due to size limit",
25352535
setup: func(twb *txnWriteBuffer) {
2536-
bufferedWritesMaxBufferSize.Override(context.Background(), &twb.st.SV, 1)
2536+
BufferedWritesMaxBufferSize.Override(context.Background(), &twb.st.SV, 1)
25372537
},
25382538
ba: func(ba *kvpb.BatchRequest) {
25392539
putA := putArgs(keyA, "valA", txn.Sequence)

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,7 @@ func TestTxnContinueAfterCputError(t *testing.T) {
10571057
ctx := context.Background()
10581058
s := createTestDB(t)
10591059
defer s.Stop()
1060+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
10601061

10611062
txn := s.DB.NewTxn(ctx, "test txn")
10621063

@@ -1097,6 +1098,7 @@ func TestTxnDeleteResponse(t *testing.T) {
10971098
ctx := context.Background()
10981099
s := createTestDB(t)
10991100
defer s.Stop()
1101+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
11001102

11011103
txn := s.DB.NewTxn(ctx, "test txn")
11021104

@@ -1610,6 +1612,7 @@ func TestTxnBasicBufferedWrites(t *testing.T) {
16101612
defer log.Scope(t).Close(t)
16111613
s := createTestDB(t)
16121614
defer s.Stop()
1615+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
16131616

16141617
testutils.RunTrueAndFalse(t, "commit", func(t *testing.T, commit bool) {
16151618
ctx := context.Background()
@@ -1791,6 +1794,7 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) {
17911794
defer log.Scope(t).Close(t)
17921795
s := createTestDB(t)
17931796
defer s.Stop()
1797+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
17941798

17951799
extractKVs := func(rows []roachpb.KeyValue, batchResponses [][]byte) []roachpb.KeyValue {
17961800
if rows != nil {
@@ -2031,6 +2035,7 @@ func TestTxnBufferedWritesConditionalPuts(t *testing.T) {
20312035
defer log.Scope(t).Close(t)
20322036
s := createTestDB(t)
20332037
defer s.Stop()
2038+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
20342039

20352040
testutils.RunTrueAndFalse(t, "commit", func(t *testing.T, commit bool) {
20362041
ctx := context.Background()
@@ -2137,6 +2142,7 @@ func TestTxnBufferedWritesRollbackToSavepointAllBuffered(t *testing.T) {
21372142
defer log.Scope(t).Close(t)
21382143
s := createTestDB(t)
21392144
defer s.Stop()
2145+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
21402146

21412147
ctx := context.Background()
21422148
value1 := []byte("value1")
@@ -2196,6 +2202,7 @@ func TestTxnBufferedWriteRetriesCorrectly(t *testing.T) {
21962202
TestingRequestFilter: reqFilter,
21972203
})
21982204
defer s.Stop()
2205+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
21992206

22002207
rng, _ := randutil.NewTestRand()
22012208

@@ -2233,6 +2240,7 @@ func TestTxnBufferedWriteReadYourOwnWrites(t *testing.T) {
22332240

22342241
s := createTestDB(t)
22352242
defer s.Stop()
2243+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
22362244

22372245
value1 := []byte("value1")
22382246
value21 := []byte("value21")
@@ -2412,6 +2420,7 @@ func TestTxnBufferedWritesOmitAbortSpanChecks(t *testing.T) {
24122420
},
24132421
})
24142422
defer s.Stop()
2423+
kvcoord.BufferedWritesMaxBufferSize.Override(context.Background(), s.DB.SettingsValues(), 2<<10 /* 2KiB */)
24152424

24162425
value1 := []byte("value1")
24172426
valueConflict := []byte("conflict")

pkg/sql/logictest/testdata/logic_test/as_of

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ ROLLBACK
154154
statement ok
155155
SET kv_transaction_buffered_writes_enabled = true;
156156

157+
# Ensure that the write below is buffered and not flushed.
158+
statement ok
159+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
160+
157161
statement ok
158162
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE
159163

pkg/sql/logictest/testdata/logic_test/buffered_writes

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
statement ok
44
SET kv_transaction_buffered_writes_enabled=true
55

6+
# The buffer size can be changed metamoprhically, and if it happens to be too
7+
# small (around 500B), then some of the txns below might flush the buffer,
8+
# resulting in different KV requests. To ensure deterministic traces we always
9+
# update the buffer size to be large enough for txns in this file.
10+
statement ok
11+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
12+
613
statement ok
714
CREATE TABLE t1 (pk int primary key, v int, FAMILY (pk, v))
815

pkg/sql/logictest/testdata/logic_test/cluster_locks_write_buffering

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
statement ok
44
SET kv_transaction_buffered_writes_enabled = true;
55

6+
statement ok
7+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
8+
69
# This test uses local-read-committed so that it can also test locking behavior
710
# with READ COMMITTED transactions. However, we'll use a default of SERIALIZABLE
811
# for all transactions.

pkg/sql/opt/exec/execbuilder/testdata/autocommit

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010
statement ok
1111
SET kv_transaction_buffered_writes_enabled = true
1212

13+
# The buffer size can be changed metamoprhically, and if it happens to be too
14+
# small (around 500B), then some of the txns below might flush the buffer,
15+
# resulting in different KV requests. To ensure deterministic traces we always
16+
# update the buffer size to be large enough for txns in this file.
17+
statement ok
18+
SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB';
19+
1320
statement ok
1421
CREATE TABLE ab (a INT PRIMARY KEY, b INT, FAMILY f1 (a, b))
1522

0 commit comments

Comments
 (0)