Skip to content

Commit faef9d7

Browse files
committed
kvcoord: unset AsyncConsensus on split batches
When batches are split downstream of the transaction pipeline interceptor, they must unset AsyncConsensus to avoid cases where a read misses an un-validated, pipelined write that failed. Note that we don't do this when splitting requests _by range_, because those splits won't separate a scan from an overlapping write. This should be very rare since it requires a read/write batch that SQL doesn't typically produce and both reads must be be locking requests so that CanPipeline returns true for all requests in the batch. It seems unlikely that such a batch is ever produced outside in the standard configuration outside of KVNemesis. Fixes #150304 Release note: None
1 parent 8765705 commit faef9d7

File tree

2 files changed

+126
-13
lines changed

2 files changed

+126
-13
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,23 +1147,31 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
11471147
// request.
11481148
var errNo1PCTxn = kvpb.NewErrorf("cannot send 1PC txn to multiple ranges")
11491149

1150-
// splitBatchAndCheckForRefreshSpans splits the batch according to the
1151-
// canSplitET parameter and checks whether the batch can forward its
1152-
// read timestamp. If the batch has its CanForwardReadTimestamp flag
1153-
// set but is being split across multiple sub-batches then the flag in
1154-
// the batch header is unset.
1155-
func splitBatchAndCheckForRefreshSpans(
1150+
// splitBatchAndCheckForIncompatibilities splits the batch according to the
1151+
// canSplitET parameter and checks whether the batch has settings incompatible
1152+
// with being split.
1153+
//
1154+
// - If the batch has its CanForwardReadTimestamp flag set but is being split
1155+
// across multiple sub-batches then the flag in the batch header is unset.
1156+
//
1157+
// - If the batch has AsyncConsensus set to true but is being split, then the
1158+
// flag in the batch header is unset.
1159+
func splitBatchAndCheckForIncompatibilities(
11561160
ba *kvpb.BatchRequest, canSplitET bool,
11571161
) [][]kvpb.RequestUnion {
11581162
parts := ba.Split(canSplitET)
11591163

1160-
// If the batch is split and the header has its CanForwardReadTimestamp flag
1161-
// set then we much check whether any request would need to be refreshed in
1162-
// the event that the one of the partial batches was to forward its read
1163-
// timestamp during a server-side refresh. If any such request exists then
1164-
// we unset the CanForwardReadTimestamp flag.
11651164
if len(parts) > 1 {
1165+
// If the batch is split and the header has its CanForwardReadTimestamp flag
1166+
// set then we much check whether any request would need to be refreshed in
1167+
// the event that the one of the partial batches was to forward its read
1168+
// timestamp during a server-side refresh. If any such request exists then
1169+
// we unset the CanForwardReadTimestamp flag.
11661170
unsetCanForwardReadTimestampFlag(ba)
1171+
// If the batch is split and the header has AsyncConsensus, we
1172+
// unconditionally unset it because it may be unsafe if the write needs to
1173+
// be observed by different reads that have been split.
1174+
unsetAsyncConsensus(ba)
11671175
}
11681176

11691177
return parts
@@ -1191,6 +1199,17 @@ func unsetCanForwardReadTimestampFlag(ba *kvpb.BatchRequest) {
11911199
}
11921200
}
11931201

1202+
// unsetAsyncConsensus ensures that if a batch that has been split because of
1203+
// incompatible request flags, AsyncConsensus is not set.
1204+
//
1205+
// NOTE(ssd): We could be "smarter" here and only unset this flag if we have a
1206+
// pipeline-able write that intersects requests in two different splits. But,
1207+
// batches that require splitting are uncommon and thus aren't worth unnecessary
1208+
// complexity.
1209+
func unsetAsyncConsensus(ba *kvpb.BatchRequest) {
1210+
ba.AsyncConsensus = false
1211+
}
1212+
11941213
// Send implements the batch.Sender interface. It subdivides the Batch
11951214
// into batches admissible for sending (preventing certain illegal
11961215
// mixtures of requests), executes each individual part (which may
@@ -1231,7 +1250,7 @@ func (ds *DistSender) Send(
12311250
if ba.Txn != nil && ba.Txn.Epoch > 0 && !require1PC {
12321251
splitET = true
12331252
}
1234-
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
1253+
parts := splitBatchAndCheckForIncompatibilities(ba, splitET)
12351254
var singleRplChunk [1]*kvpb.BatchResponse
12361255
rplChunks := singleRplChunk[:0:1]
12371256

@@ -1277,7 +1296,7 @@ func (ds *DistSender) Send(
12771296
} else if require1PC {
12781297
log.Fatalf(ctx, "required 1PC transaction cannot be split: %s", ba)
12791298
}
1280-
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
1299+
parts = splitBatchAndCheckForIncompatibilities(ba, true /* split ET */)
12811300
onePart = false
12821301
// Restart transaction of the last chunk as multiple parts with
12831302
// EndTxn in the last part.

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4816,3 +4816,97 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
48164816
close(blockCh)
48174817
wg.Wait()
48184818
}
4819+
4820+
// TestBatchPutScanReverseScanWithFailedPutReplication is a regression test for
4821+
// #150304 in which a batch containing a Put, Scan, and ReverseScan, failed to
4822+
// read its own write during one of the Scans. This was the result of the batch
4823+
// being split downstream of the txnPipeliner and thus the Put not being
4824+
// verified before the overlapping scan was issued.
4825+
func TestBatchPutScanReverseScanWithFailedPutReplication(t *testing.T) {
4826+
defer leaktest.AfterTest(t)()
4827+
defer log.Scope(t).Close(t)
4828+
4829+
keyA := roachpb.Key("a")
4830+
keyB := roachpb.Key("b")
4831+
keyC := roachpb.Key("c")
4832+
4833+
var (
4834+
targetTxnIDString atomic.Value
4835+
cmdID atomic.Value
4836+
)
4837+
cmdID.Store(kvserverbase.CmdIDKey(""))
4838+
targetTxnIDString.Store("")
4839+
ctx := context.Background()
4840+
st := cluster.MakeTestingClusterSettings()
4841+
4842+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
4843+
ServerArgs: base.TestServerArgs{
4844+
Settings: st,
4845+
Knobs: base.TestingKnobs{
4846+
Store: &kvserver.StoreTestingKnobs{
4847+
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
4848+
if fArgs.Req.Header.Txn == nil ||
4849+
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
4850+
return nil // not our txn
4851+
}
4852+
putReq, ok := fArgs.Req.Requests[0].GetInner().(*kvpb.PutRequest)
4853+
// Only fail replication on the Put to keyB.
4854+
if ok && putReq.Key.Equal(keyB) {
4855+
t.Logf("will fail application for txn %s; req: %+v; raft cmdID: %x",
4856+
fArgs.Req.Header.Txn.ID.String(), putReq, fArgs.CmdID)
4857+
cmdID.Store(fArgs.CmdID)
4858+
}
4859+
return nil
4860+
},
4861+
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4862+
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4863+
t.Logf("failing application for raft cmdID: %x", cmdID)
4864+
return 0, kvpb.NewErrorf("test injected error")
4865+
}
4866+
return 0, nil
4867+
},
4868+
},
4869+
},
4870+
},
4871+
})
4872+
defer tc.Stopper().Stop(ctx)
4873+
4874+
db := tc.Server(0).DB()
4875+
4876+
err := db.Put(ctx, keyA, "initial_a")
4877+
require.NoError(t, err)
4878+
4879+
err = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4880+
if txnID := targetTxnIDString.Load(); txnID == "" {
4881+
// Store the txnID for the testing knobs.
4882+
targetTxnIDString.Store(txn.ID().String())
4883+
t.Logf("txn ID is: %s", txn.ID())
4884+
}
4885+
4886+
// Create a batch that will be split into two batches because of scan
4887+
// directions. We've arranged for the put to fail application.
4888+
b := txn.NewBatch()
4889+
b.Put(keyB, "value_b")
4890+
b.ScanForUpdate(keyA, keyC, kvpb.GuaranteedDurability)
4891+
b.ReverseScanForShare(keyA, keyC, kvpb.GuaranteedDurability)
4892+
4893+
// If the batch isn't marked for async consensus, the batch will fail.
4894+
if err := txn.Run(ctx, b); err != nil {
4895+
return err
4896+
}
4897+
4898+
// If the batch doesn't fail, then it _must_ have both rows in the second
4899+
// scan.
4900+
require.Equal(t, 2, len(b.Results[2].Rows))
4901+
return nil
4902+
})
4903+
4904+
// The transaction should fail due to the replication failure.
4905+
require.Error(t, err)
4906+
require.ErrorContains(t, err, "test injected error")
4907+
4908+
// Verify that the Put didn't succeed by checking that keyB doesn't exist.
4909+
res, err := db.Get(ctx, keyB)
4910+
require.NoError(t, err)
4911+
require.False(t, res.Exists(), "keyB should not exist due to failed replication")
4912+
}

0 commit comments

Comments
 (0)