@@ -7,11 +7,13 @@ package kvcoord
7
7
8
8
import (
9
9
"context"
10
+ "fmt"
10
11
"testing"
11
12
12
13
"github.com/cockroachdb/cockroach/pkg/base"
13
14
"github.com/cockroachdb/cockroach/pkg/kv"
14
15
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
15
17
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
16
18
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17
19
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -91,3 +93,108 @@ func TestTxnCoordSenderWriteBufferingDisablesPipelining(t *testing.T) {
91
93
kvpb .Scan , kvpb .Put , kvpb .EndTxn ,
92
94
}, calls )
93
95
}
96
+
97
+ // TestTxnWriteBufferFlushedWithMaxKeysOnBatch is a regression test for a bug
98
+ // related to flushing the write buffer in response to a batch with a MaxKeys or
99
+ // TargetBytes set.
100
+ //
101
+ // The bug requires that:
102
+ //
103
+ // 0. Buffered writes is enabled, either buffered writes for weak isolation is
104
+ // enabled or durable locks for serializable is enabled.
105
+ //
106
+ // 1. The user makes replicated, locking Get requests. This can occur via SELECT
107
+ // FOR UPDATE statements whose predicate contains all primary key columns and
108
+ // which isn't transformed to a Scan. These Get's will be transformed to unreplicated
109
+ // locking Get's and a replicated locking Get request will be buffered.
110
+ //
111
+ // 2. The user also writes some rows.
112
+ //
113
+ // 3. The Get's and writes are split over more than 1 range.
114
+ //
115
+ // 4. At least some of the buffered Get's are not replaced with later writes.
116
+ //
117
+ // 5. A batch with TargetBytes or MaxSpanRequestKeys set causes the buffer to flush.
118
+ //
119
+ // 6. The number of buffered Get's exceeds the MaxSpanRequestKeys.
120
+ func TestTxnWriteBufferFlushedWithMaxKeysOnBatch (t * testing.T ) {
121
+ defer leaktest .AfterTest (t )()
122
+ defer log .Scope (t ).Close (t )
123
+
124
+ ctx := context .Background ()
125
+ s , _ , db := serverutils .StartServer (t , base.TestServerArgs {})
126
+ defer s .Stopper ().Stop (ctx )
127
+
128
+ scratchStart , err := s .ScratchRange ()
129
+ require .NoError (t , err )
130
+
131
+ scratchKey := func (idx int ) roachpb.Key {
132
+ key := scratchStart .Clone ()
133
+ key = append (key , []byte (fmt .Sprintf ("key-%03d" , idx ))... )
134
+ return key
135
+ }
136
+
137
+ // We split the scratch range at a known place so that we can arrange for bug
138
+ // requirement (3).
139
+ _ , _ , err = s .SplitRange (scratchKey (6 ))
140
+ require .NoError (t , err )
141
+
142
+ st := s .ClusterSettings ()
143
+
144
+ // The bug requires that we transform Gets. Here, we disable it to prove that
145
+ // this tets passes.
146
+ bufferedWritesGetTransformEnabled .Override (ctx , & st .SV , false )
147
+
148
+ // The locks need to actually be taken, so let's write to every key we are
149
+ // going to lock.
150
+ for i := range []int {1 , 2 , 3 , 7 , 8 , 9 } {
151
+ require .NoError (t , db .Put (ctx , scratchKey (i ), "before-txn-value" ))
152
+ }
153
+
154
+ txnCtx := ctx
155
+ // To trace the transaction:
156
+ //
157
+ // tracer := s.TracerI().(*tracing.Tracer)
158
+ // txnCtx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "test")
159
+ err = db .Txn (txnCtx , func (ctx context.Context , txn * kv.Txn ) error {
160
+ txn .SetBufferedWritesEnabled (true )
161
+ // 1. Replicated locking Gets. We are putting 3 on both sides of the split
162
+ // to ensure we satisfy (3)
163
+ b := txn .NewBatch ()
164
+ b .GetForUpdate (scratchKey (1 ), kvpb .GuaranteedDurability )
165
+ b .GetForUpdate (scratchKey (2 ), kvpb .GuaranteedDurability )
166
+ b .GetForUpdate (scratchKey (3 ), kvpb .GuaranteedDurability )
167
+ b .GetForUpdate (scratchKey (7 ), kvpb .GuaranteedDurability )
168
+ b .GetForUpdate (scratchKey (8 ), kvpb .GuaranteedDurability )
169
+ b .GetForUpdate (scratchKey (9 ), kvpb .GuaranteedDurability )
170
+ if err := txn .Run (ctx , b ); err != nil {
171
+ return err
172
+ }
173
+
174
+ // 2. Our write that will be lost if we hit the bug.
175
+ if err := txn .Put (ctx , scratchKey (10 ), "from-txn-value" ); err != nil {
176
+ return err
177
+ }
178
+
179
+ // 3. We force the flush of the buffer with a DeleteRange request that
180
+ // has MaxSpanRequestKeys set
181
+ b = txn .NewBatch ()
182
+ b .Header .MaxSpanRequestKeys = 2
183
+ b .DelRange (scratchKey (21 ), scratchKey (24 ), true )
184
+ if err := txn .Run (ctx , b ); err != nil {
185
+ return err
186
+ }
187
+
188
+ return nil
189
+ })
190
+ // To print the trace:
191
+ // recording := collectAndFinish()
192
+ // t.Logf("TRACE: %s", recording)
193
+ require .NoError (t , err )
194
+ actualKV , err := db .Get (ctx , scratchKey (10 ))
195
+ require .NoError (t , err )
196
+ require .NotNil (t , actualKV .Value )
197
+ actualValue , err := actualKV .Value .GetBytes ()
198
+ require .NoError (t , err )
199
+ require .Equal (t , []byte ("from-txn-value" ), actualValue )
200
+ }
0 commit comments