Skip to content

Commit b0f5e8c

Browse files
craig[bot]spilchenarulajmanimiraradevayuzefovich
committed
154276: sql/inspect: add checkpointing tracking to INSPECT r=spilchen a=spilchen Previously, INSPECT jobs did not support checkpointing, meaning job restarts would reprocess all data from the beginning. This change adds span based checkpointing to allow jobs to resume from their last completed spans. Progress is tracked using jobfrontier to store completed spans. On job restart, the resumer loads completed spans and filters them from the work to be done. Closes #148297 Release note: None Epic: CRDB-30356 154391: kv: make TestFollowerReadsWithStaleDescriptor work with multi-tenancy r=stevendanna a=arulajmani This previously didn't work for a slew of reasons, commented in-line. References #142800 Release note: None 154404: kvcoord: treat non-transactional writes as non-idempotent r=stevendanna a=miraradeva Previously, non-transactional requests were treated as idempotent by the `DistSender`, meaning they were retried in the presence of RPC errors. This could lead to double-evaluating these requests, including writes. This can result in, for example, an increment applying twice, or more subtle problems like a blind write evaluating twice, overwriting another unrelated write that fell in-between. This issue is known and documented in the `DistSender` code. It is not a production correctness issue since SQL always uses the transactional KV API. This commit treats non-transactional write batches the same way as transactional batches that contain a commit: if they experience an RPC error, they should not be retried, and should return an ambiguous error. The new logic is currently behind an off-by-default cluster setting as it will require deflaking many tests. The plan is to enable this in some kvnemesis tests first. Informs: #154389 Informs: #114814 Release note: None 154557: colcontainer: harden ctx capture in diskQueueWriter r=yuzefovich a=yuzefovich In ffb4666 we added more memory accounting to the disk queue. This required capturing the context inside the `diskQueueWriter` in order to preserve the `io.Writer` signature. We just saw a test failure where this capture was problematic - it's possible that when we close the disk queue as a whole (which happens _after_ closing all operators) that the captured context contains the tracing span that has been finished (since the relevant operator has already been closed). To go around this issue we now update the captured context before finishing each file. An alternative solution could've been to use `context.Background` since we only need the context object for memory accounting purposes (in a single `Resize` call) so losing some contextual information there doesn't seem like a big deal. Fixes: #154347. Release note: None Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Arul Ajmani <[email protected]> Co-authored-by: Mira Radeva <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
5 parents 2a1771e + 8b39b69 + 47ed376 + af9a222 + 6858087 commit b0f5e8c

File tree

11 files changed

+882
-221
lines changed

11 files changed

+882
-221
lines changed

pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ go_test(
6060
"//pkg/kv/kvserver/concurrency/lock",
6161
"//pkg/kv/kvserver/kvserverbase",
6262
"//pkg/kv/kvtestutils",
63+
"//pkg/multitenant/tenantcapabilitiespb",
6364
"//pkg/roachpb",
6465
"//pkg/rpc",
6566
"//pkg/rpc/rpcbase",

pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2828
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2929
"github.com/cockroachdb/cockroach/pkg/kv/kvtestutils"
30+
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilitiespb"
3031
"github.com/cockroachdb/cockroach/pkg/roachpb"
3132
"github.com/cockroachdb/cockroach/pkg/rpc"
3233
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
@@ -804,9 +805,8 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
804805
base.TestClusterArgs{
805806
ReplicationMode: base.ReplicationManual,
806807
ServerArgs: base.TestServerArgs{
807-
Settings: settings,
808-
DefaultTestTenant: base.TODOTestTenantDisabled,
809-
UseDatabase: "t",
808+
Settings: settings,
809+
UseDatabase: "t",
810810
},
811811
// n4 pretends to have low latency to n2 and n3, so that it tries to use
812812
// them for follower reads.
@@ -847,15 +847,24 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
847847
// load based rebalancing to make sure it doesn't move.
848848
kvserverbase.LoadBasedRebalancingMode.Override(ctx, &settings.SV, kvserverbase.LBRebalancingOff)
849849

850+
// NB: Tenants need capabilities to be able to relocate ranges.
851+
if !tc.Server(0).DeploymentMode().IsSingleTenant() {
852+
require.NoError(t, tc.Server(0).GrantTenantCapabilities(
853+
context.Background(), serverutils.TestTenantID(),
854+
map[tenantcapabilitiespb.ID]string{
855+
tenantcapabilitiespb.CanAdminRelocateRange: "true",
856+
}))
857+
}
858+
850859
n1 := sqlutils.MakeSQLRunner(tc.Conns[0])
851860
n1.Exec(t, `CREATE DATABASE t`)
852861
n1.Exec(t, `CREATE TABLE test (k INT PRIMARY KEY)`)
853862
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,2], 1)`)
854863
// Speed up closing of timestamps, in order to sleep less below before we can
855864
// use follower_read_timestamp(). follower_read_timestamp() uses the sum of
856865
// the following settings.
857-
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
858-
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
866+
closedts.TargetDuration.Override(ctx, &settings.SV, 100*time.Millisecond)
867+
closedts.SideTransportCloseInterval.Override(ctx, &settings.SV, 100*time.Millisecond)
859868

860869
// Sleep so that we can perform follower reads. The read timestamp needs to be
861870
// above the timestamp when the table was created.
@@ -865,11 +874,16 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
865874

866875
// Run a query on n4 to populate its cache.
867876
n4 := sqlutils.MakeSQLRunner(tc.Conns[3])
868-
n4.Exec(t, "SELECT * from test WHERE k=1")
869877
// Check that the cache was indeed populated.
870878
var tableID uint32
871879
n1.QueryRow(t, `SELECT id from system.namespace WHERE name='test'`).Scan(&tableID)
872-
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
880+
tablePrefix := keys.MustAddr(tc.Server(0).Codec().TablePrefix(tableID))
881+
// NB: Splitting a range helps prevent tenants from getting an out of band
882+
// RangeDescriptor update to the RangeCache. Unclear about the exact mechanics
883+
// at play here, but they don't matter for our test.
884+
_, _, err := tc.SplitRange(roachpb.Key(tablePrefix))
885+
require.NoError(t, err)
886+
n4.Exec(t, "SELECT * from test WHERE k=1")
873887
n4Cache := tc.Server(3).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
874888
entry, err := n4Cache.TestingGetCached(ctx, tablePrefix, false, roachpb.LAG_BY_CLUSTER_SETTING)
875889
require.NoError(t, err)
@@ -964,8 +978,18 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
964978

965979
// Sanity check that the plan was distributed.
966980
require.True(t, strings.Contains(rec.String(), "creating DistSQL plan with isLocal=false"))
967-
// Look at the trace and check that we've served a follower read.
968-
require.True(t, kvtestutils.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec)
981+
// NB: We're distributing the plan here, and it (the DistSender) is running on
982+
// n3. Note that we've only injected latencies on n4, so any replica is fair
983+
// game for n3. When run in normal or shared-process multi-tenancy
984+
// deployments, n3 will route to its local replica (which is a follower), as
985+
// that guy always sorts first. However, for external process multi-tenancy,
986+
// there's no such concept of a local replica. Requests can therefore be
987+
// routed to either n1 or n3, so we can't make any assertions about whether
988+
// there'll be a follower read or not.
989+
if !tc.Server(0).DeploymentMode().IsExternal() {
990+
// Look at the trace and check that we've served a follower read.
991+
require.True(t, kvtestutils.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec)
992+
}
969993
// Verify that we didn't produce the "misplanned ranges" metadata that would
970994
// purge the non-stale entries from the range cache on n4.
971995
require.False(t, strings.Contains(rec.String(), "clearing entries overlapping"))

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,20 @@ var ProxyBatchRequest = settings.RegisterBoolSetting(
419419
true,
420420
)
421421

422+
// NonTransactionalWritesNotIdempotent controls whether non-transactional writes
423+
// are considered idempotent or not. When this setting is true, a
424+
// non-transactional write that experiences an RPC error is not retried, and
425+
// returns an ambiguous error. This is the same behavior as commit batches (or
426+
// batched issued concurrently with a commit batch). This is arguably the
427+
// correct behavior for non-transactional writes, but it's behind a default-off
428+
// cluster setting to get some kvnemesis mileage first.
429+
var NonTransactionalWritesNotIdempotent = settings.RegisterBoolSetting(
430+
settings.ApplicationLevel,
431+
"kv.dist_sender.non_transactional_writes_not_idempotent.enabled",
432+
"when true, non-transactional writes are not retried and may return an ambiguous error",
433+
false,
434+
)
435+
422436
// DistSenderMetrics is the set of metrics for a given distributed sender.
423437
type DistSenderMetrics struct {
424438
BatchCount *metric.Counter
@@ -2541,7 +2555,13 @@ const slowDistSenderReplicaThreshold = 10 * time.Second
25412555
func (ds *DistSender) sendToReplicas(
25422556
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
25432557
) (*kvpb.BatchResponse, error) {
2544-
2558+
// In addition to batches where withCommit is true, non-transactional write
2559+
// batches are also not safe to be retried as they are not guaranteed to be
2560+
// idempotent. Returning ambiguous errors for those batches is controlled by a
2561+
// cluster setting for now.
2562+
nonIdempotentWrite :=
2563+
ba.Txn == nil && ba.IsWrite() && NonTransactionalWritesNotIdempotent.Get(&ds.st.SV)
2564+
nonIdempotent := withCommit || nonIdempotentWrite
25452565
// If this request can be sent to a follower to perform a consistent follower
25462566
// read under the closed timestamp, promote its routing policy to NEAREST.
25472567
// If we don't know the closed timestamp policy, we ought to optimistically
@@ -2759,7 +2779,9 @@ func (ds *DistSender) sendToReplicas(
27592779
}
27602780

27612781
tBegin := crtime.NowMono() // for slow log message
2762-
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).Track(ctx, ba, withCommit, tBegin)
2782+
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).Track(
2783+
ctx, ba, nonIdempotent, tBegin,
2784+
)
27632785
if cbErr != nil {
27642786
// Circuit breaker is tripped. err will be handled below.
27652787
err = cbErr
@@ -2861,12 +2883,12 @@ func (ds *DistSender) sendToReplicas(
28612883
// prevents them from double evaluation. This can result in, for example,
28622884
// an increment applying twice, or more subtle problems like a blind write
28632885
// evaluating twice, overwriting another unrelated write that fell
2864-
// in-between.
2886+
// in-between. This is fixed under the cluster setting
2887+
// NonTransactionalWritesNotIdempotent. Consider enabling it by default.
28652888
//
2866-
// NB: If this partial batch does not contain the EndTxn request but the
2867-
// batch contains a commit, the ambiguous error should be caught on
2868-
// retrying the writes, should it need to be propagated.
2869-
if withCommit && !grpcutil.RequestDidNotStart(err) {
2889+
// NB: If this partial batch is not idempotent, the ambiguous error should
2890+
// be caught on retrying the writes, should it need to be propagated.
2891+
if nonIdempotent && !grpcutil.RequestDidNotStart(err) {
28702892
ambiguousError = err
28712893
}
28722894
// If we get a gRPC error against the leaseholder, we don't want to
@@ -2987,9 +3009,9 @@ func (ds *DistSender) sendToReplicas(
29873009
// return it if all other replicas fail (regardless of error).
29883010
replicaUnavailableError = br.Error.GoError()
29893011
}
2990-
// The circuit breaker may have tripped while a commit proposal was in
2991-
// flight, so we have to mark it as ambiguous as well.
2992-
if withCommit && ambiguousError == nil {
3012+
// The circuit breaker may have tripped while a non-idempotent request
3013+
// was in flight, so we have to mark it as ambiguous as well.
3014+
if nonIdempotent && ambiguousError == nil {
29933015
ambiguousError = br.Error.GoError()
29943016
}
29953017
case *kvpb.NotLeaseHolderError:

pkg/kv/kvclient/kvcoord/dist_sender_test.go

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3664,6 +3664,7 @@ func TestReplicaErrorsMerged(t *testing.T) {
36643664

36653665
notLeaseHolderErr := kvpb.NewError(kvpb.NewNotLeaseHolderError(lease3, 0, &descriptor2, ""))
36663666
startedRequestError := errors.New("request might have started")
3667+
notStartedRequestError := grpcstatus.Errorf(codes.PermissionDenied, "request did not start")
36673668
unavailableError1 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[0]))
36683669
unavailableError2 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[1]))
36693670

@@ -3675,52 +3676,101 @@ func TestReplicaErrorsMerged(t *testing.T) {
36753676
// See https://cockroachlabs.com/blog/demonic-nondeterminism/#appendix for
36763677
// the gory details.
36773678
testCases := []struct {
3679+
transactional bool
36783680
withCommit bool
36793681
sendErr1, sendErr2 error
36803682
err1, err2 *kvpb.Error
36813683
expErr string
36823684
}{
36833685
// The ambiguous error is returned with higher priority for withCommit.
36843686
{
3685-
withCommit: true,
3686-
sendErr1: startedRequestError,
3687-
err2: notLeaseHolderErr,
3688-
expErr: "result is ambiguous",
3687+
transactional: true,
3688+
withCommit: true,
3689+
sendErr1: startedRequestError,
3690+
err2: notLeaseHolderErr,
3691+
expErr: "result is ambiguous",
36893692
},
36903693
// The not leaseholder errors is the last error.
36913694
{
3692-
withCommit: false,
3693-
sendErr1: startedRequestError,
3694-
err2: notLeaseHolderErr,
3695-
expErr: "leaseholder not found in transport",
3695+
transactional: true,
3696+
withCommit: false,
3697+
sendErr1: startedRequestError,
3698+
err2: notLeaseHolderErr,
3699+
expErr: "leaseholder not found in transport",
36963700
},
36973701
// The ambiguous error is returned with higher priority for withCommit.
36983702
{
3699-
withCommit: true,
3700-
sendErr1: startedRequestError,
3701-
err2: unavailableError2,
3702-
expErr: "result is ambiguous",
3703+
transactional: true,
3704+
withCommit: true,
3705+
sendErr1: startedRequestError,
3706+
err2: unavailableError2,
3707+
expErr: "result is ambiguous",
37033708
},
37043709
// The unavailable error is the last error.
37053710
{
3706-
withCommit: false,
3707-
sendErr1: startedRequestError,
3708-
err2: unavailableError2,
3709-
expErr: "unavailable",
3711+
transactional: true,
3712+
withCommit: false,
3713+
sendErr1: startedRequestError,
3714+
err2: unavailableError2,
3715+
expErr: "unavailable",
3716+
},
3717+
// The ambiguous error is returned with higher priority for
3718+
// non-transactional batches (next 2 test cases). This is the case only
3719+
// because the test sets NonTransactionalWritesNotIdempotent = true.
3720+
// Otherwise, the non-transactional requests would be treated like they are
3721+
// idempotent and the NLHE/RUE would be returned as the last error.
3722+
{
3723+
transactional: false,
3724+
withCommit: false,
3725+
sendErr1: startedRequestError,
3726+
err2: notLeaseHolderErr,
3727+
expErr: "result is ambiguous",
37103728
},
3711-
// The unavailable error is returned with higher priority regardless of withCommit.
37123729
{
3713-
withCommit: true,
3714-
err1: unavailableError1,
3715-
err2: notLeaseHolderErr,
3716-
expErr: "unavailable",
3730+
transactional: false,
3731+
withCommit: false,
3732+
sendErr1: startedRequestError,
3733+
err2: unavailableError2,
3734+
expErr: "result is ambiguous",
3735+
},
3736+
// If we know the request did not start, do not return an ambiguous error
3737+
// (next 2 test cases).
3738+
{
3739+
transactional: false,
3740+
withCommit: false,
3741+
sendErr1: notStartedRequestError,
3742+
err2: notLeaseHolderErr,
3743+
expErr: "leaseholder not found in transport",
3744+
},
3745+
{
3746+
transactional: false,
3747+
withCommit: false,
3748+
sendErr1: notStartedRequestError,
3749+
err2: unavailableError2,
3750+
expErr: "unavailable",
3751+
},
3752+
// The unavailable error is returned with higher priority regardless of
3753+
// withCommit and transactional (next 3 test cases).
3754+
{
3755+
transactional: true,
3756+
withCommit: true,
3757+
err1: unavailableError1,
3758+
err2: notLeaseHolderErr,
3759+
expErr: "unavailable",
37173760
},
3718-
// The unavailable error is returned with higher priority regardless of withCommit.
37193761
{
3720-
withCommit: false,
3721-
err1: unavailableError1,
3722-
err2: notLeaseHolderErr,
3723-
expErr: "unavailable",
3762+
transactional: true,
3763+
withCommit: false,
3764+
err1: unavailableError1,
3765+
err2: notLeaseHolderErr,
3766+
expErr: "unavailable",
3767+
},
3768+
{
3769+
transactional: false,
3770+
withCommit: false,
3771+
err1: unavailableError1,
3772+
err2: notLeaseHolderErr,
3773+
expErr: "unavailable",
37243774
},
37253775
}
37263776
clock := hlc.NewClockForTesting(nil)
@@ -3744,6 +3794,7 @@ func TestReplicaErrorsMerged(t *testing.T) {
37443794
stopper := stop.NewStopper()
37453795
defer stopper.Stop(ctx)
37463796
st := cluster.MakeTestingClusterSettings()
3797+
NonTransactionalWritesNotIdempotent.Override(ctx, &st.SV, true)
37473798
rc := rangecache.NewRangeCache(st, nil /* db */, func() int64 { return 100 }, stopper)
37483799
rc.Insert(ctx, roachpb.RangeInfo{
37493800
Desc: initDescriptor,
@@ -3786,12 +3837,16 @@ func TestReplicaErrorsMerged(t *testing.T) {
37863837
return nil, nil, errors.New("range desc db unexpectedly used")
37873838
}),
37883839
TransportFactory: adaptSimpleTransport(transportFn),
3789-
Settings: cluster.MakeTestingClusterSettings(),
3840+
Settings: st,
37903841
}
37913842
ds := NewDistSender(cfg)
37923843

37933844
ba := &kvpb.BatchRequest{}
37943845
ba.Add(kvpb.NewGet(roachpb.Key("a")))
3846+
ba.Add(kvpb.NewPut(roachpb.Key("b"), roachpb.MakeValueFromString("value")))
3847+
if tc.transactional {
3848+
ba.Txn = &roachpb.Transaction{Name: "test"}
3849+
}
37953850
tok, err := rc.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false)
37963851
require.NoError(t, err)
37973852
br, err := ds.sendToReplicas(ctx, ba, tok, tc.withCommit)

pkg/sql/colcontainer/diskqueue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,12 @@ func (d *diskQueue) writeFooterAndFlush(ctx context.Context) (err error) {
597597
d.serializer = nil
598598
}
599599
}()
600+
if d.writer != nil {
601+
// The context that we captured when we created the diskQueueWriter
602+
// might have a tracing span that has already been finished. To go
603+
// around this, we capture the fresh context.
604+
d.writer.ctx = ctx
605+
}
600606
if err := d.serializer.Finish(); err != nil {
601607
return err
602608
}

pkg/sql/inspect/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
visibility = ["//visibility:public"],
1919
deps = [
2020
"//pkg/jobs",
21+
"//pkg/jobs/jobfrontier",
2122
"//pkg/jobs/jobspb",
2223
"//pkg/keys",
2324
"//pkg/kv/kvserver/protectedts/ptpb",
@@ -47,7 +48,8 @@ go_library(
4748
"//pkg/util/ctxgroup",
4849
"//pkg/util/hlc",
4950
"//pkg/util/log",
50-
"//pkg/util/stop",
51+
"//pkg/util/protoutil",
52+
"//pkg/util/span",
5153
"//pkg/util/syncutil",
5254
"//pkg/util/timeutil",
5355
"//pkg/util/tracing",
@@ -77,6 +79,7 @@ go_test(
7779
deps = [
7880
"//pkg/base",
7981
"//pkg/jobs",
82+
"//pkg/jobs/jobfrontier",
8083
"//pkg/jobs/jobspb",
8184
"//pkg/keys",
8285
"//pkg/kv",

0 commit comments

Comments
 (0)