Skip to content

Commit da7a4eb

Browse files
committed
kvcoord: fix CPut handling for intra-batch conflicts
Previously, for condition evaluation part of CPuts, we would ignore writes that were performed within the same KV batch. That was the case since we performed the read from the buffer when applying the transformation but would actually buffer writes after the KV server response comes back, so previous writes within the same KV batch would be invisible. This is now fixed by buffering the write when applying the transformation. Also avoid flushing any buffered writes when rolling back the txn. Release note: None
1 parent 2b29b9c commit da7a4eb

File tree

4 files changed

+62
-18
lines changed

4 files changed

+62
-18
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,12 @@ func (twb *txnWriteBuffer) SendLocked(
131131
return twb.wrapped.SendLocked(ctx, ba)
132132
}
133133

134-
if _, ok := ba.GetArg(kvpb.EndTxn); ok {
135-
// TODO(arul): should we only flush if the transaction is being committed?
136-
// If the transaction is being rolled back, we shouldn't needlessly flush
137-
// writes.
134+
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {
135+
if !etArg.(*kvpb.EndTxnRequest).Commit {
136+
// We're performing a rollback, so there is no point in flushing
137+
// anything.
138+
return twb.wrapped.SendLocked(ctx, ba)
139+
}
138140
return twb.flushWithEndTxn(ctx, ba)
139141
}
140142

@@ -375,6 +377,9 @@ func (twb *txnWriteBuffer) applyTransformations(
375377
// Send a locking Get request to the KV layer; we'll evaluate the
376378
// condition locally based on the response.
377379
baRemote.Requests = append(baRemote.Requests, getReqU)
380+
// Buffer a Put under the optimistic assumption that the condition
381+
// will be satisfied.
382+
twb.addToBuffer(t.Key, t.Value, t.Sequence)
378383

379384
case *kvpb.PutRequest:
380385
// If the MustAcquireExclusiveLock flag is set on the Put, then we need to
@@ -838,13 +843,14 @@ func (t transformation) toResp(
838843
req.AllowIfDoesNotExist,
839844
)
840845
if condFailedErr != nil {
846+
// TODO(yuzefovich): consider "poisoning" the txnWriteBuffer when we
847+
// hit a condition failed error to avoid mistaken usages (e.g. an
848+
// attempt to flush with the EndTxn request with Commit=true).
841849
pErr := kvpb.NewErrorWithTxn(condFailedErr, txn)
842850
pErr.SetErrorIndex(int32(t.index))
843851
return kvpb.ResponseUnion{}, pErr
844852
}
845-
// The condition was satisfied; buffer a Put, and return a synthesized
846-
// response.
847-
twb.addToBuffer(req.Key, req.Value, req.Sequence)
853+
// The condition was satisfied - return a synthesized response.
848854
ru.MustSetInner(&kvpb.ConditionalPutResponse{})
849855

850856
case *kvpb.PutRequest:
@@ -897,12 +903,6 @@ func (t transformation) toResp(
897903
panic("unimplemented")
898904
}
899905

900-
// TODO(arul): in the future, when we'll evaluate CPuts locally, we'll have
901-
// this function take in the result of the KVGet, save the CPut function
902-
// locally on the transformation, and use these two things to evaluate the
903-
// condition here, on the client. We'll then construct and return the
904-
// appropriate response.
905-
906906
return ru, nil
907907
}
908908

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,11 +1109,11 @@ func TestTxnWriteBufferDecomposesConditionalPuts(t *testing.T) {
11091109
require.IsType(t, &kvpb.ConditionFailedError{}, pErr.GoError())
11101110
}
11111111

1112-
// Lastly, commit the transaction. A put should only be flushed if the condition
1113-
// evaluated successfully.
1112+
// Lastly, commit or rollback the transaction. A put should only be
1113+
// flushed if the condition evaluated successfully.
11141114
ba = &kvpb.BatchRequest{}
11151115
ba.Header = kvpb.Header{Txn: &txn}
1116-
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1116+
ba.Add(&kvpb.EndTxnRequest{Commit: condEvalSuccessful})
11171117

11181118
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
11191119
if condEvalSuccessful {

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2011,6 +2011,7 @@ func TestTxnBufferedWritesConditionalPuts(t *testing.T) {
20112011
if expErr {
20122012
require.Error(t, err)
20132013
require.IsType(t, &kvpb.ConditionFailedError{}, err)
2014+
return err
20142015
} else {
20152016
require.NoError(t, err)
20162017
}
@@ -2022,11 +2023,10 @@ func TestTxnBufferedWritesConditionalPuts(t *testing.T) {
20222023
}
20232024
})
20242025

2025-
if commit {
2026+
if commit && !expErr {
20262027
require.NoError(t, err)
20272028
} else {
20282029
require.Error(t, err)
2029-
testutils.IsError(err, "abort")
20302030
}
20312031

20322032
// Verify the values are visible only if the transaction commited

pkg/sql/logictest/testdata/logic_test/buffered_writes

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,47 @@ query II
8282
SELECT * FROM t1
8383
----
8484
1 1
85+
86+
statement ok
87+
CREATE TABLE t2 (k INT PRIMARY KEY);
88+
89+
statement ok
90+
BEGIN;
91+
92+
statement error pgcode 23505 duplicate key value violates unique constraint "t2_pkey"
93+
INSERT INTO t2 VALUES (1), (1);
94+
95+
statement ok
96+
ROLLBACK;
97+
98+
statement ok
99+
BEGIN;
100+
101+
statement ok
102+
INSERT INTO t2 VALUES (1);
103+
104+
statement error pgcode 23505 duplicate key value violates unique constraint "t2_pkey"
105+
INSERT INTO t2 VALUES (1);
106+
107+
statement ok
108+
ROLLBACK;
109+
110+
statement ok
111+
BEGIN;
112+
113+
statement ok
114+
INSERT INTO t2 VALUES (1);
115+
116+
statement ok
117+
DELETE FROM t2 WHERE k = 1;
118+
119+
statement ok
120+
INSERT INTO t2 VALUES (1);
121+
122+
statement ok
123+
COMMIT;
124+
125+
query I
126+
SELECT * FROM t2
127+
----
128+
1

0 commit comments

Comments
 (0)