Skip to content

Commit af9a222

Browse files
committed
kvcoord: treat non-transactional writes as non-idempotent
Previously, non-transactional requests were treated as idempotent by the `DistSender`, meaning they were retried in the presence of RPC errors. This could lead to double-evaluating these requests, including writes. This can result in, for example, an increment applying twice, or more subtle problems like a blind write evaluating twice, overwriting another unrelated write that fell in-between. This issue is known and documented in the `DistSender` code. It is not a production correctness issue since SQL always uses the transactional KV API. This commit treats non-transactional write batches the same way as transactional batches that contain a commit: if they experience an RPC error, they should not be retried, and should return an ambiguous error. The new logic is currently behind an off-by-default cluster setting as it will require deflaking many tests. The plan is to enable this in some kvnemesis tests first. Informs: #154389 Informs: #114814 Release note: None
1 parent e5f821c commit af9a222

File tree

2 files changed

+114
-37
lines changed

2 files changed

+114
-37
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,20 @@ var ProxyBatchRequest = settings.RegisterBoolSetting(
419419
true,
420420
)
421421

422+
// NonTransactionalWritesNotIdempotent controls whether non-transactional writes
423+
// are considered idempotent or not. When this setting is true, a
424+
// non-transactional write that experiences an RPC error is not retried, and
425+
// returns an ambiguous error. This is the same behavior as commit batches (or
426+
// batched issued concurrently with a commit batch). This is arguably the
427+
// correct behavior for non-transactional writes, but it's behind a default-off
428+
// cluster setting to get some kvnemesis mileage first.
429+
var NonTransactionalWritesNotIdempotent = settings.RegisterBoolSetting(
430+
settings.ApplicationLevel,
431+
"kv.dist_sender.non_transactional_writes_not_idempotent.enabled",
432+
"when true, non-transactional writes are not retried and may return an ambiguous error",
433+
false,
434+
)
435+
422436
// DistSenderMetrics is the set of metrics for a given distributed sender.
423437
type DistSenderMetrics struct {
424438
BatchCount *metric.Counter
@@ -2541,7 +2555,13 @@ const slowDistSenderReplicaThreshold = 10 * time.Second
25412555
func (ds *DistSender) sendToReplicas(
25422556
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
25432557
) (*kvpb.BatchResponse, error) {
2544-
2558+
// In addition to batches where withCommit is true, non-transactional write
2559+
// batches are also not safe to be retried as they are not guaranteed to be
2560+
// idempotent. Returning ambiguous errors for those batches is controlled by a
2561+
// cluster setting for now.
2562+
nonIdempotentWrite :=
2563+
ba.Txn == nil && ba.IsWrite() && NonTransactionalWritesNotIdempotent.Get(&ds.st.SV)
2564+
nonIdempotent := withCommit || nonIdempotentWrite
25452565
// If this request can be sent to a follower to perform a consistent follower
25462566
// read under the closed timestamp, promote its routing policy to NEAREST.
25472567
// If we don't know the closed timestamp policy, we ought to optimistically
@@ -2759,7 +2779,9 @@ func (ds *DistSender) sendToReplicas(
27592779
}
27602780

27612781
tBegin := crtime.NowMono() // for slow log message
2762-
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).Track(ctx, ba, withCommit, tBegin)
2782+
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).Track(
2783+
ctx, ba, nonIdempotent, tBegin,
2784+
)
27632785
if cbErr != nil {
27642786
// Circuit breaker is tripped. err will be handled below.
27652787
err = cbErr
@@ -2861,12 +2883,12 @@ func (ds *DistSender) sendToReplicas(
28612883
// prevents them from double evaluation. This can result in, for example,
28622884
// an increment applying twice, or more subtle problems like a blind write
28632885
// evaluating twice, overwriting another unrelated write that fell
2864-
// in-between.
2886+
// in-between. This is fixed under the cluster setting
2887+
// NonTransactionalWritesNotIdempotent. Consider enabling it by default.
28652888
//
2866-
// NB: If this partial batch does not contain the EndTxn request but the
2867-
// batch contains a commit, the ambiguous error should be caught on
2868-
// retrying the writes, should it need to be propagated.
2869-
if withCommit && !grpcutil.RequestDidNotStart(err) {
2889+
// NB: If this partial batch is not idempotent, the ambiguous error should
2890+
// be caught on retrying the writes, should it need to be propagated.
2891+
if nonIdempotent && !grpcutil.RequestDidNotStart(err) {
28702892
ambiguousError = err
28712893
}
28722894
// If we get a gRPC error against the leaseholder, we don't want to
@@ -2987,9 +3009,9 @@ func (ds *DistSender) sendToReplicas(
29873009
// return it if all other replicas fail (regardless of error).
29883010
replicaUnavailableError = br.Error.GoError()
29893011
}
2990-
// The circuit breaker may have tripped while a commit proposal was in
2991-
// flight, so we have to mark it as ambiguous as well.
2992-
if withCommit && ambiguousError == nil {
3012+
// The circuit breaker may have tripped while a non-idempotent request
3013+
// was in flight, so we have to mark it as ambiguous as well.
3014+
if nonIdempotent && ambiguousError == nil {
29933015
ambiguousError = br.Error.GoError()
29943016
}
29953017
case *kvpb.NotLeaseHolderError:

pkg/kv/kvclient/kvcoord/dist_sender_test.go

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3664,6 +3664,7 @@ func TestReplicaErrorsMerged(t *testing.T) {
36643664

36653665
notLeaseHolderErr := kvpb.NewError(kvpb.NewNotLeaseHolderError(lease3, 0, &descriptor2, ""))
36663666
startedRequestError := errors.New("request might have started")
3667+
notStartedRequestError := grpcstatus.Errorf(codes.PermissionDenied, "request did not start")
36673668
unavailableError1 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[0]))
36683669
unavailableError2 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[1]))
36693670

@@ -3675,52 +3676,101 @@ func TestReplicaErrorsMerged(t *testing.T) {
36753676
// See https://cockroachlabs.com/blog/demonic-nondeterminism/#appendix for
36763677
// the gory details.
36773678
testCases := []struct {
3679+
transactional bool
36783680
withCommit bool
36793681
sendErr1, sendErr2 error
36803682
err1, err2 *kvpb.Error
36813683
expErr string
36823684
}{
36833685
// The ambiguous error is returned with higher priority for withCommit.
36843686
{
3685-
withCommit: true,
3686-
sendErr1: startedRequestError,
3687-
err2: notLeaseHolderErr,
3688-
expErr: "result is ambiguous",
3687+
transactional: true,
3688+
withCommit: true,
3689+
sendErr1: startedRequestError,
3690+
err2: notLeaseHolderErr,
3691+
expErr: "result is ambiguous",
36893692
},
36903693
// The not leaseholder errors is the last error.
36913694
{
3692-
withCommit: false,
3693-
sendErr1: startedRequestError,
3694-
err2: notLeaseHolderErr,
3695-
expErr: "leaseholder not found in transport",
3695+
transactional: true,
3696+
withCommit: false,
3697+
sendErr1: startedRequestError,
3698+
err2: notLeaseHolderErr,
3699+
expErr: "leaseholder not found in transport",
36963700
},
36973701
// The ambiguous error is returned with higher priority for withCommit.
36983702
{
3699-
withCommit: true,
3700-
sendErr1: startedRequestError,
3701-
err2: unavailableError2,
3702-
expErr: "result is ambiguous",
3703+
transactional: true,
3704+
withCommit: true,
3705+
sendErr1: startedRequestError,
3706+
err2: unavailableError2,
3707+
expErr: "result is ambiguous",
37033708
},
37043709
// The unavailable error is the last error.
37053710
{
3706-
withCommit: false,
3707-
sendErr1: startedRequestError,
3708-
err2: unavailableError2,
3709-
expErr: "unavailable",
3711+
transactional: true,
3712+
withCommit: false,
3713+
sendErr1: startedRequestError,
3714+
err2: unavailableError2,
3715+
expErr: "unavailable",
3716+
},
3717+
// The ambiguous error is returned with higher priority for
3718+
// non-transactional batches (next 2 test cases). This is the case only
3719+
// because the test sets NonTransactionalWritesNotIdempotent = true.
3720+
// Otherwise, the non-transactional requests would be treated like they are
3721+
// idempotent and the NLHE/RUE would be returned as the last error.
3722+
{
3723+
transactional: false,
3724+
withCommit: false,
3725+
sendErr1: startedRequestError,
3726+
err2: notLeaseHolderErr,
3727+
expErr: "result is ambiguous",
37103728
},
3711-
// The unavailable error is returned with higher priority regardless of withCommit.
37123729
{
3713-
withCommit: true,
3714-
err1: unavailableError1,
3715-
err2: notLeaseHolderErr,
3716-
expErr: "unavailable",
3730+
transactional: false,
3731+
withCommit: false,
3732+
sendErr1: startedRequestError,
3733+
err2: unavailableError2,
3734+
expErr: "result is ambiguous",
3735+
},
3736+
// If we know the request did not start, do not return an ambiguous error
3737+
// (next 2 test cases).
3738+
{
3739+
transactional: false,
3740+
withCommit: false,
3741+
sendErr1: notStartedRequestError,
3742+
err2: notLeaseHolderErr,
3743+
expErr: "leaseholder not found in transport",
3744+
},
3745+
{
3746+
transactional: false,
3747+
withCommit: false,
3748+
sendErr1: notStartedRequestError,
3749+
err2: unavailableError2,
3750+
expErr: "unavailable",
3751+
},
3752+
// The unavailable error is returned with higher priority regardless of
3753+
// withCommit and transactional (next 3 test cases).
3754+
{
3755+
transactional: true,
3756+
withCommit: true,
3757+
err1: unavailableError1,
3758+
err2: notLeaseHolderErr,
3759+
expErr: "unavailable",
37173760
},
3718-
// The unavailable error is returned with higher priority regardless of withCommit.
37193761
{
3720-
withCommit: false,
3721-
err1: unavailableError1,
3722-
err2: notLeaseHolderErr,
3723-
expErr: "unavailable",
3762+
transactional: true,
3763+
withCommit: false,
3764+
err1: unavailableError1,
3765+
err2: notLeaseHolderErr,
3766+
expErr: "unavailable",
3767+
},
3768+
{
3769+
transactional: false,
3770+
withCommit: false,
3771+
err1: unavailableError1,
3772+
err2: notLeaseHolderErr,
3773+
expErr: "unavailable",
37243774
},
37253775
}
37263776
clock := hlc.NewClockForTesting(nil)
@@ -3744,6 +3794,7 @@ func TestReplicaErrorsMerged(t *testing.T) {
37443794
stopper := stop.NewStopper()
37453795
defer stopper.Stop(ctx)
37463796
st := cluster.MakeTestingClusterSettings()
3797+
NonTransactionalWritesNotIdempotent.Override(ctx, &st.SV, true)
37473798
rc := rangecache.NewRangeCache(st, nil /* db */, func() int64 { return 100 }, stopper)
37483799
rc.Insert(ctx, roachpb.RangeInfo{
37493800
Desc: initDescriptor,
@@ -3786,12 +3837,16 @@ func TestReplicaErrorsMerged(t *testing.T) {
37863837
return nil, nil, errors.New("range desc db unexpectedly used")
37873838
}),
37883839
TransportFactory: adaptSimpleTransport(transportFn),
3789-
Settings: cluster.MakeTestingClusterSettings(),
3840+
Settings: st,
37903841
}
37913842
ds := NewDistSender(cfg)
37923843

37933844
ba := &kvpb.BatchRequest{}
37943845
ba.Add(kvpb.NewGet(roachpb.Key("a")))
3846+
ba.Add(kvpb.NewPut(roachpb.Key("b"), roachpb.MakeValueFromString("value")))
3847+
if tc.transactional {
3848+
ba.Txn = &roachpb.Transaction{Name: "test"}
3849+
}
37953850
tok, err := rc.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false)
37963851
require.NoError(t, err)
37973852
br, err := ds.sendToReplicas(ctx, ba, tok, tc.withCommit)

0 commit comments

Comments
 (0)