Skip to content

Commit b2b1034

Browse files
arulajmaniyuzefovich
authored andcommitted
kvserver: omit AbortSpan checks for buffered writes transactions
Transactions that use buffered writes do not rely on the AbortSpan to correctly uphold read-your-own-writes semantics. As such, we can omit AbortSpan checks for transactions that have buffered all writes, from preceding batches, on the client. Fixes #140593 Release note: None
1 parent c12ca7e commit b2b1034

File tree

4 files changed

+112
-2
lines changed

4 files changed

+112
-2
lines changed

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,20 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3939
"github.com/cockroachdb/cockroach/pkg/util/log"
4040
"github.com/cockroachdb/cockroach/pkg/util/randutil"
41+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
42+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4143
"github.com/cockroachdb/errors"
4244
"github.com/stretchr/testify/require"
4345
)
4446

47+
func checkGetResults(t *testing.T, expected map[string][]byte, results ...kv.Result) {
48+
for _, result := range results {
49+
require.Equal(t, 1, len(result.Rows))
50+
require.Equal(t, expected[string(result.Rows[0].Key)], result.Rows[0].ValueBytes())
51+
}
52+
require.Len(t, expected, len(results))
53+
}
54+
4555
// TestTxnDBBasics verifies that a simple transaction can be run and
4656
// either committed or aborted. On commit, mutations are visible; on
4757
// abort, mutations are never visible. During the txn, verify that
@@ -2371,3 +2381,82 @@ func TestLeafTransactionAdmissionHeader(t *testing.T) {
23712381
}
23722382
require.Equal(t, expectedLeafHeader, leafHeader)
23732383
}
2384+
2385+
// TestTxnBufferedWritesOmitAbortSpanChecks verifies that transactions that use
2386+
// buffered writes do not check the AbortSpan, while still upholding
2387+
// read-your-own-writes semantics.
2388+
func TestTxnBufferedWritesOmitAbortSpanChecks(t *testing.T) {
2389+
defer leaktest.AfterTest(t)()
2390+
defer log.Scope(t).Close(t)
2391+
ctx := context.Background()
2392+
2393+
var mu struct {
2394+
syncutil.Mutex
2395+
txnID uuid.UUID
2396+
}
2397+
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
2398+
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
2399+
BeforeAbortSpanCheck: func(id uuid.UUID) {
2400+
mu.Lock()
2401+
defer mu.Unlock()
2402+
2403+
if mu.txnID == id {
2404+
t.Fatal("transactions using buffered writes should not check the AbortSpan")
2405+
}
2406+
},
2407+
},
2408+
})
2409+
defer s.Stop()
2410+
2411+
value1 := []byte("value1")
2412+
valueConflict := []byte("conflict")
2413+
2414+
keyA := []byte("keyA")
2415+
2416+
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2417+
txn.SetBufferedWritesEnabled(true)
2418+
mu.Lock()
2419+
mu.txnID = txn.ID()
2420+
mu.Unlock()
2421+
2422+
// Fix the transaction's commit timestamp.
2423+
_, err := txn.CommitTimestamp()
2424+
require.NoError(t, err)
2425+
2426+
// Put transactional value at keyA.
2427+
require.NoError(t, txn.Put(ctx, keyA, value1))
2428+
2429+
// Read what we just wrote.
2430+
b := txn.NewBatch()
2431+
b.Get(keyA)
2432+
require.NoError(t, txn.Run(ctx, b))
2433+
expected := map[string][]byte{
2434+
"keyA": value1,
2435+
}
2436+
checkGetResults(t, expected, b.Results...)
2437+
2438+
// Start another transaction that writes to keyA. This prevents us from
2439+
// committing at our original timestamp. Moreover, had we not been buffering
2440+
// our writes, this transaction would have resulted in aborting us and
2441+
// removing our intent.
2442+
err = s.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
2443+
require.NoError(t, txn.SetUserPriority(roachpb.MaxUserPriority))
2444+
return txn.Put(ctx, keyA, valueConflict)
2445+
})
2446+
require.NoError(t, err)
2447+
2448+
// Perform another read again. We should still see our previous write, not what
2449+
// the conflicting transaction wrote.
2450+
b = txn.NewBatch()
2451+
b.Get(keyA)
2452+
require.NoError(t, txn.Run(ctx, b))
2453+
expected = map[string][]byte{
2454+
"keyA": value1,
2455+
}
2456+
checkGetResults(t, expected, b.Results...)
2457+
2458+
// Try to commit the transaction. We should encounter a WriteTooOldError.
2459+
err = txn.Commit(ctx)
2460+
require.Error(t, err)
2461+
require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err)
2462+
}

pkg/kv/kvserver/kvserverbase/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/util/quotapool",
2929
"//pkg/util/syncutil",
3030
"//pkg/util/timeutil",
31+
"//pkg/util/uuid",
3132
"@com_github_cockroachdb_errors//:errors",
3233
"@com_github_cockroachdb_pebble//vfs",
3334
"@com_github_cockroachdb_redact//:redact",

pkg/kv/kvserver/kvserverbase/knobs.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99

1010
package kvserverbase
1111

12-
import "time"
12+
import (
13+
"time"
14+
15+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
16+
)
1317

1418
// BatchEvalTestingKnobs contains testing helpers that are used during batch evaluation.
1519
type BatchEvalTestingKnobs struct {
@@ -60,6 +64,10 @@ type BatchEvalTestingKnobs struct {
6064

6165
// CommitTriggerError is called at commit triggers to simulate errors.
6266
CommitTriggerError func() error
67+
68+
// BeforeAbortSpanCheck is called before a request checks the abort span with
69+
// the request's txn ID.
70+
BeforeAbortSpanCheck func(id uuid.UUID)
6371
}
6472

6573
// IntentResolverTestingKnobs contains testing helpers that are used during

pkg/kv/kvserver/replica_evaluate.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,28 @@ func evaluateBatch(
240240
// transactions on reads). Note that 1PC transactions have had their
241241
// transaction field cleared by this point so we do not execute this
242242
// check in that case.
243+
//
244+
// TODO(arul): this check assumes lock == Intent, which isn't true any
245+
// longer. We could optimize this by making a distinction between locks
246+
// acquired and previous writes performed.
243247
if baHeader.Txn.IsLocking() {
244248
// We don't check the abort span for a couple of special requests:
245249
// - if the request is asking to abort the transaction, then don't check the
246250
// AbortSpan; we don't want the request to be rejected if the transaction
247251
// has already been aborted.
248252
// - heartbeats don't check the abort span. If the txn is aborted, they'll
249253
// return an aborted proto in their otherwise successful response.
254+
// - if the request belongs to a transaction that has buffered all
255+
// preceding writes on the client, we don't rely on the AbortSpan to
256+
// correctly uphold read-your-own-write semantics.
257+
//
250258
// TODO(nvanbenschoten): Let's remove heartbeats from this allowlist when
251259
// we rationalize the TODO in txnHeartbeater.heartbeat.
252-
if !ba.IsSingleAbortTxnRequest() && !ba.IsSingleHeartbeatTxnRequest() {
260+
if !ba.IsSingleAbortTxnRequest() && !ba.IsSingleHeartbeatTxnRequest() &&
261+
!ba.HasBufferedAllPrecedingWrites {
262+
if fn := rec.EvalKnobs().BeforeAbortSpanCheck; fn != nil {
263+
fn(ba.Txn.ID)
264+
}
253265
if pErr := checkIfTxnAborted(ctx, rec, readWriter, *baHeader.Txn); pErr != nil {
254266
return nil, result.Result{}, pErr
255267
}

0 commit comments

Comments
 (0)