Skip to content

Commit 7c744ff

Browse files
committed
kvcoord: Revert "kvcoord: unset AsyncConsensus on split batches"
This reverts commit faef9d7. We just merged a change that prohibits including Scans and ReverseScans into a single batch request, which makes the commit that we're reverting obsolete. Release note: None
1 parent 2d73bfc commit 7c744ff

File tree

2 files changed

+13
-129
lines changed

2 files changed

+13
-129
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,31 +1157,23 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
11571157
// request.
11581158
var errNo1PCTxn = kvpb.NewErrorf("cannot send 1PC txn to multiple ranges")
11591159

1160-
// splitBatchAndCheckForIncompatibilities splits the batch according to the
1161-
// canSplitET parameter and checks whether the batch has settings incompatible
1162-
// with being split.
1163-
//
1164-
// - If the batch has its CanForwardReadTimestamp flag set but is being split
1165-
// across multiple sub-batches then the flag in the batch header is unset.
1166-
//
1167-
// - If the batch has AsyncConsensus set to true but is being split, then the
1168-
// flag in the batch header is unset.
1169-
func splitBatchAndCheckForIncompatibilities(
1160+
// splitBatchAndCheckForRefreshSpans splits the batch according to the
1161+
// canSplitET parameter and checks whether the batch can forward its
1162+
// read timestamp. If the batch has its CanForwardReadTimestamp flag
1163+
// set but is being split across multiple sub-batches then the flag in
1164+
// the batch header is unset.
1165+
func splitBatchAndCheckForRefreshSpans(
11701166
ba *kvpb.BatchRequest, canSplitET bool,
11711167
) [][]kvpb.RequestUnion {
11721168
parts := ba.Split(canSplitET)
11731169

1170+
// If the batch is split and the header has its CanForwardReadTimestamp flag
1171+
// set then we much check whether any request would need to be refreshed in
1172+
// the event that the one of the partial batches was to forward its read
1173+
// timestamp during a server-side refresh. If any such request exists then
1174+
// we unset the CanForwardReadTimestamp flag.
11741175
if len(parts) > 1 {
1175-
// If the batch is split and the header has its CanForwardReadTimestamp flag
1176-
// set then we much check whether any request would need to be refreshed in
1177-
// the event that the one of the partial batches was to forward its read
1178-
// timestamp during a server-side refresh. If any such request exists then
1179-
// we unset the CanForwardReadTimestamp flag.
11801176
unsetCanForwardReadTimestampFlag(ba)
1181-
// If the batch is split and the header has AsyncConsensus, we
1182-
// unconditionally unset it because it may be unsafe if the write needs to
1183-
// be observed by different reads that have been split.
1184-
unsetAsyncConsensus(ba)
11851177
}
11861178

11871179
return parts
@@ -1209,17 +1201,6 @@ func unsetCanForwardReadTimestampFlag(ba *kvpb.BatchRequest) {
12091201
}
12101202
}
12111203

1212-
// unsetAsyncConsensus ensures that if a batch that has been split because of
1213-
// incompatible request flags, AsyncConsensus is not set.
1214-
//
1215-
// NOTE(ssd): We could be "smarter" here and only unset this flag if we have a
1216-
// pipeline-able write that intersects requests in two different splits. But,
1217-
// batches that require splitting are uncommon and thus aren't worth unnecessary
1218-
// complexity.
1219-
func unsetAsyncConsensus(ba *kvpb.BatchRequest) {
1220-
ba.AsyncConsensus = false
1221-
}
1222-
12231204
// Send implements the batch.Sender interface. It subdivides the Batch
12241205
// into batches admissible for sending (preventing certain illegal
12251206
// mixtures of requests), executes each individual part (which may
@@ -1260,7 +1241,7 @@ func (ds *DistSender) Send(
12601241
if ba.Txn != nil && ba.Txn.Epoch > 0 && !require1PC {
12611242
splitET = true
12621243
}
1263-
parts := splitBatchAndCheckForIncompatibilities(ba, splitET)
1244+
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
12641245
var singleRplChunk [1]*kvpb.BatchResponse
12651246
rplChunks := singleRplChunk[:0:1]
12661247

@@ -1305,7 +1286,7 @@ func (ds *DistSender) Send(
13051286
} else if require1PC {
13061287
log.Fatalf(ctx, "required 1PC transaction cannot be split: %s", ba)
13071288
}
1308-
parts = splitBatchAndCheckForIncompatibilities(ba, true /* split ET */)
1289+
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
13091290
onePart = false
13101291
// Restart transaction of the last chunk as multiple parts with
13111292
// EndTxn in the last part.

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -4535,100 +4535,3 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
45354535
close(blockCh)
45364536
wg.Wait()
45374537
}
4538-
4539-
// TestBatchPutScanReverseScanWithFailedPutReplication is a regression test for
4540-
// #150304 in which a batch containing a Put, Scan, and ReverseScan, failed to
4541-
// read its own write during one of the Scans. This was the result of the batch
4542-
// being split downstream of the txnPipeliner and thus the Put not being
4543-
// verified before the overlapping scan was issued.
4544-
func TestBatchPutScanReverseScanWithFailedPutReplication(t *testing.T) {
4545-
defer leaktest.AfterTest(t)()
4546-
defer log.Scope(t).Close(t)
4547-
4548-
// TODO(yuzefovich): figure out whether we should just delete this test.
4549-
skip.IgnoreLint(t, "this test constructs no longer valid scenario")
4550-
4551-
keyA := roachpb.Key("a")
4552-
keyB := roachpb.Key("b")
4553-
keyC := roachpb.Key("c")
4554-
4555-
var (
4556-
targetTxnIDString atomic.Value
4557-
cmdID atomic.Value
4558-
)
4559-
cmdID.Store(kvserverbase.CmdIDKey(""))
4560-
targetTxnIDString.Store("")
4561-
ctx := context.Background()
4562-
st := cluster.MakeTestingClusterSettings()
4563-
4564-
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
4565-
ServerArgs: base.TestServerArgs{
4566-
Settings: st,
4567-
Knobs: base.TestingKnobs{
4568-
Store: &kvserver.StoreTestingKnobs{
4569-
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
4570-
if fArgs.Req.Header.Txn == nil ||
4571-
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
4572-
return nil // not our txn
4573-
}
4574-
putReq, ok := fArgs.Req.Requests[0].GetInner().(*kvpb.PutRequest)
4575-
// Only fail replication on the Put to keyB.
4576-
if ok && putReq.Key.Equal(keyB) {
4577-
t.Logf("will fail application for txn %s; req: %+v; raft cmdID: %x",
4578-
fArgs.Req.Header.Txn.ID.String(), putReq, fArgs.CmdID)
4579-
cmdID.Store(fArgs.CmdID)
4580-
}
4581-
return nil
4582-
},
4583-
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4584-
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4585-
t.Logf("failing application for raft cmdID: %x", cmdID)
4586-
return 0, kvpb.NewErrorf("test injected error")
4587-
}
4588-
return 0, nil
4589-
},
4590-
},
4591-
},
4592-
},
4593-
})
4594-
defer tc.Stopper().Stop(ctx)
4595-
4596-
db := tc.Server(0).DB()
4597-
4598-
err := db.Put(ctx, keyA, "initial_a")
4599-
require.NoError(t, err)
4600-
4601-
err = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4602-
if txnID := targetTxnIDString.Load(); txnID == "" {
4603-
// Store the txnID for the testing knobs.
4604-
targetTxnIDString.Store(txn.ID().String())
4605-
t.Logf("txn ID is: %s", txn.ID())
4606-
}
4607-
4608-
// Create a batch that will be split into two batches because of scan
4609-
// directions. We've arranged for the put to fail application.
4610-
b := txn.NewBatch()
4611-
b.Put(keyB, "value_b")
4612-
b.ScanForUpdate(keyA, keyC, kvpb.GuaranteedDurability)
4613-
b.ReverseScanForShare(keyA, keyC, kvpb.GuaranteedDurability)
4614-
4615-
// If the batch isn't marked for async consensus, the batch will fail.
4616-
if err := txn.Run(ctx, b); err != nil {
4617-
return err
4618-
}
4619-
4620-
// If the batch doesn't fail, then it _must_ have both rows in the second
4621-
// scan.
4622-
require.Equal(t, 2, len(b.Results[2].Rows))
4623-
return nil
4624-
})
4625-
4626-
// The transaction should fail due to the replication failure.
4627-
require.Error(t, err)
4628-
require.ErrorContains(t, err, "test injected error")
4629-
4630-
// Verify that the Put didn't succeed by checking that keyB doesn't exist.
4631-
res, err := db.Get(ctx, keyB)
4632-
require.NoError(t, err)
4633-
require.False(t, res.Exists(), "keyB should not exist due to failed replication")
4634-
}

0 commit comments

Comments
 (0)