Skip to content

Commit 4b9c7ab

Browse files
craig[bot]msbutlertbgpav-kv
committed
154298: crosscluster/physical: add alter connection watcher r=jeffswenson a=msbutler PCR now automatically updates the uri used if it's apart of an external connection that is altered. Informs #153471 Release note: none 154427: admission: correctly reset diskReadTokensAllocated r=tbg a=tbg Epic: none 154444: kvserver: deflake TestProcessSplitAfterRightHandSideHasBeenRemoved r=tbg a=pav-kv The test relies on the fact that the LHS replica will be caught up via log, and run the split trigger. In the flake, the replica has been caught up by a snapshot. Likely cause is that the leader's log has been truncated while the replica has been partitioned away. The PR disables log truncation for this test, so that the catch up via log is guaranteed. Fixes #154313 Co-authored-by: Michael Butler <[email protected]> Co-authored-by: Tobias Grieger <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]>
4 parents 1a7fc9d + 20c7483 + 9aae12a + 6e387b6 commit 4b9c7ab

File tree

8 files changed

+188
-70
lines changed

8 files changed

+188
-70
lines changed

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -326,35 +326,13 @@ func (r *logicalReplicationResumer) ingest(
326326
return err
327327
}
328328

329-
refreshConn := func(ctx context.Context) error {
330-
ingestionJob := r.job
331-
details := ingestionJob.Details().(jobspb.LogicalReplicationDetails)
332-
resolvedDest, err := resolveDest(ctx, jobExecCtx.ExecCfg(), details.SourceClusterConnUri)
333-
if err != nil {
334-
return err
335-
}
336-
pollingInterval := 2 * time.Minute
337-
if knobs := jobExecCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
338-
pollingInterval = *knobs.ExternalConnectionPollingInterval
339-
}
340-
t := time.NewTicker(pollingInterval)
341-
defer t.Stop()
342-
for {
343-
select {
344-
case <-ctx.Done():
345-
return ctx.Err()
346-
case <-confPoller:
347-
return nil
348-
case <-t.C:
349-
newDest, err := reloadDest(ctx, ingestionJob.ID(), jobExecCtx.ExecCfg())
350-
if err != nil {
351-
log.Dev.Warningf(ctx, "failed to check for updated configuration: %v", err)
352-
} else if newDest != resolvedDest {
353-
return errors.Mark(errors.Newf("replan due to detail change: old=%s, new=%s", resolvedDest, newDest), sql.ErrPlanChanged)
354-
}
355-
}
356-
}
357-
}
329+
refreshConn := replicationutils.GetAlterConnectionChecker(
330+
r.job.ID(),
331+
uris[0].Serialize(),
332+
geURIFromLoadedJobDetails,
333+
execCfg,
334+
confPoller,
335+
)
358336

359337
defer func() {
360338
if l := payload.MetricsLabel; l != "" {
@@ -1089,29 +1067,8 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
10891067
}
10901068
}
10911069

1092-
func resolveDest(
1093-
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
1094-
) (string, error) {
1095-
configUri, err := streamclient.ParseConfigUri(sourceURI)
1096-
if err != nil {
1097-
return "", err
1098-
}
1099-
1100-
clusterUri, err := configUri.AsClusterUri(ctx, execCfg.InternalDB)
1101-
if err != nil {
1102-
return "", err
1103-
}
1104-
1105-
return clusterUri.Serialize(), nil
1106-
}
1107-
1108-
func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfig) (string, error) {
1109-
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
1110-
if err != nil {
1111-
return "", err
1112-
}
1113-
newDetails := reloadedJob.Details().(jobspb.LogicalReplicationDetails)
1114-
return resolveDest(ctx, execCfg, newDetails.SourceClusterConnUri)
1070+
func geURIFromLoadedJobDetails(details jobspb.Details) string {
1071+
return details.(jobspb.LogicalReplicationDetails).SourceClusterConnUri
11151072
}
11161073

11171074
func init() {

pkg/crosscluster/physical/replication_stream_e2e_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,3 +1800,62 @@ func splitPrimaryKeyIndexSpan(
18001800
require.NoError(t, db.AdminSplit(ctx, pkStartKey, hlc.MaxTimestamp))
18011801
require.NoError(t, db.AdminSplit(ctx, pkEndKey, hlc.MaxTimestamp))
18021802
}
1803+
1804+
func TestAlterExternalConnection(t *testing.T) {
1805+
defer leaktest.AfterTest(t)()
1806+
skip.UnderDeadlock(t)
1807+
skip.UnderRace(t)
1808+
defer log.Scope(t).Close(t)
1809+
1810+
ctx := context.Background()
1811+
pollingInterval := 100 * time.Millisecond
1812+
1813+
var alreadyReplanned atomic.Int32
1814+
1815+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
1816+
args.TestingKnobs = &sql.StreamingTestingKnobs{
1817+
ExternalConnectionPollingInterval: &pollingInterval,
1818+
AfterRetryIteration: func(err error) {
1819+
alreadyReplanned.Add(1)
1820+
},
1821+
}
1822+
1823+
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
1824+
defer cleanup()
1825+
1826+
externalConnection := "replication-source-addr"
1827+
ogConnection := c.SrcURL.String()
1828+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`,
1829+
externalConnection, ogConnection))
1830+
c.DestSysSQL.Exec(c.T, c.BuildCreateTenantQuery(externalConnection))
1831+
streamProducerJobID, ingestionJobID := replicationtestutils.GetStreamJobIds(c.T, ctx, c.DestSysSQL, c.Args.DestTenantName)
1832+
1833+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(streamProducerJobID))
1834+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
1835+
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
1836+
1837+
// Alter the external connection to break the stream, to test that pcr watches for new connection changes
1838+
c.SrcSysSQL.Exec(t, fmt.Sprintf("CREATE USER %s", username.TestUser))
1839+
srcAppURL, cleanupSinkCert := pgurlutils.PGUrl(t, c.SrcSysServer.AdvSQLAddr(), t.Name(), url.User(username.TestUser))
1840+
defer cleanupSinkCert()
1841+
1842+
beforeReplanCount := alreadyReplanned.Load()
1843+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`ALTER EXTERNAL CONNECTION "%s" AS "%s"`,
1844+
externalConnection, &srcAppURL))
1845+
1846+
testutils.SucceedsSoon(t, func() error {
1847+
if alreadyReplanned.Load() <= beforeReplanCount+2 {
1848+
return errors.New("not yet replanned twice")
1849+
}
1850+
return nil
1851+
})
1852+
1853+
// Alter the external connection to fix the stream, and ensure replication resumes
1854+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`ALTER EXTERNAL CONNECTION "%s" AS "%s"`,
1855+
externalConnection, ogConnection))
1856+
c.DestSysSQL.Exec(c.T, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
1857+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
1858+
// ensure the stream advances
1859+
srcTime := c.SrcCluster.Server(0).Clock().Now()
1860+
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
1861+
}

pkg/crosscluster/physical/stream_ingestion_dist.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,16 @@ func startDistIngestion(
9696
updateStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)
9797
}
9898

99-
client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB,
100-
streamclient.WithStreamID(streamID))
99+
clusterUris, err := getClusterUris(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
100+
if err != nil {
101+
return err
102+
}
103+
client, err := streamclient.GetFirstActiveClient(ctx, clusterUris, execCtx.ExecCfg().InternalDB, streamclient.WithStreamID(streamID))
101104
if err != nil {
102105
return err
103106
}
104107
defer closeAndLog(ctx, client)
108+
105109
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
106110
return err
107111
}
@@ -197,11 +201,23 @@ func startDistIngestion(
197201
}
198202
return ingestor.ingestSpanConfigs(ctx, details.SourceTenantName)
199203
}
204+
205+
refreshConnStopper := make(chan struct{})
206+
207+
refreshConn := replicationutils.GetAlterConnectionChecker(
208+
ingestionJob.ID(),
209+
clusterUris[0].Serialize(),
210+
geURIFromIngestionJobDetails,
211+
execCtx.ExecCfg(),
212+
refreshConnStopper,
213+
)
214+
200215
execInitialPlan := func(ctx context.Context) error {
201216
defer func() {
202217
stopReplanner()
203218
close(tracingAggCh)
204219
close(spanConfigIngestStopper)
220+
close(refreshConnStopper)
205221
}()
206222
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)
207223

@@ -273,13 +289,17 @@ func startDistIngestion(
273289
return err
274290
}
275291

276-
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs)
292+
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs, refreshConn)
277293
if errors.Is(err, sql.ErrPlanChanged) {
278294
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
279295
}
280296
return err
281297
}
282298

299+
func geURIFromIngestionJobDetails(details jobspb.Details) string {
300+
return details.(jobspb.StreamIngestionDetails).SourceClusterConnUri
301+
}
302+
283303
func sortSpans(partitions []streamclient.PartitionInfo) roachpb.Spans {
284304
spansToSort := make(roachpb.Spans, 0)
285305
for i := range partitions {

pkg/crosscluster/replicationutils/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "replicationutils",
55
srcs = [
6+
"connection_checker.go",
67
"stats.go",
78
"utils.go",
89
],
910
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils",
1011
visibility = ["//visibility:public"],
1112
deps = [
13+
"//pkg/crosscluster/streamclient",
1214
"//pkg/jobs",
1315
"//pkg/jobs/jobspb",
1416
"//pkg/kv/kvpb",
@@ -33,6 +35,7 @@ go_library(
3335
"//pkg/testutils/fingerprintutils",
3436
"//pkg/util/ctxgroup",
3537
"//pkg/util/hlc",
38+
"//pkg/util/log",
3639
"//pkg/util/syncutil",
3740
"//pkg/util/timeutil",
3841
"@com_github_cockroachdb_errors//:errors",
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2022 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package replicationutils
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/util/log"
16+
"github.com/cockroachdb/errors"
17+
)
18+
19+
// GetAlterConnectionChecker returns a function that will poll for an altered
20+
// external connection. The initial URI passed to this function should be the
21+
// same one passed to subsequent client creation calls.
22+
func GetAlterConnectionChecker(
23+
id jobspb.JobID,
24+
initialURI string,
25+
uriGetter URIGetter,
26+
execCfg *sql.ExecutorConfig,
27+
stopper chan struct{},
28+
) func(ctx context.Context) error {
29+
return func(ctx context.Context) error {
30+
pollingInterval := 2 * time.Minute
31+
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
32+
pollingInterval = *knobs.ExternalConnectionPollingInterval
33+
}
34+
t := time.NewTicker(pollingInterval)
35+
defer t.Stop()
36+
for {
37+
select {
38+
case <-ctx.Done():
39+
return ctx.Err()
40+
case <-stopper:
41+
return nil
42+
case <-t.C:
43+
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
44+
if err != nil {
45+
return err
46+
}
47+
newURI, err := resolveURI(ctx, execCfg, uriGetter(reloadedJob.Details()))
48+
if err != nil {
49+
log.Dev.Warningf(ctx, "failed to load uri: %v", err)
50+
} else if newURI != initialURI {
51+
return errors.Mark(errors.Newf("uri has been updated: old=%s, new=%s", errors.Redact(initialURI), errors.Redact(newURI)), sql.ErrPlanChanged)
52+
}
53+
}
54+
}
55+
}
56+
}
57+
58+
type URIGetter func(details jobspb.Details) string
59+
60+
func resolveURI(
61+
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
62+
) (string, error) {
63+
configUri, err := streamclient.ParseConfigUri(sourceURI)
64+
if err != nil {
65+
return "", err
66+
}
67+
68+
clusterUri, err := configUri.AsClusterUri(ctx, execCfg.InternalDB)
69+
if err != nil {
70+
return "", err
71+
}
72+
73+
return clusterUri.Serialize(), nil
74+
}

pkg/kv/kvserver/client_raft_test.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
3434
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
3535
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
36+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
3637
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
3738
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
3839
raft "github.com/cockroachdb/cockroach/pkg/raft"
@@ -5326,13 +5327,13 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
53265327
t *testing.T, store *kvserver.Store, rangeID roachpb.RangeID,
53275328
) raftpb.HardState {
53285329
t.Helper()
5329-
hs, err := stateloader.Make(rangeID).LoadHardState(ctx, store.TODOEngine())
5330+
hs, err := logstore.NewStateLoader(rangeID).LoadHardState(ctx, store.LogEngine())
53305331
require.NoError(t, err)
53315332
return hs
53325333
}
53335334
partitionReplicaOnSplit := func(t *testing.T, tc *testcluster.TestCluster, key roachpb.Key, basePartition *testClusterPartitionedRange, partRange **testClusterPartitionedRange) {
5334-
// Set up a hook to partition the RHS range at its initial range ID
5335-
// before proposing the split trigger.
5335+
// Set up a hook to partition away the first store of the RHS range at the
5336+
// first opportunity (when the split trigger is proposed).
53365337
var setupOnce sync.Once
53375338
f := kvserverbase.ReplicaProposalFilter(func(args kvserverbase.ProposalFilterArgs) *kvpb.Error {
53385339
req, ok := args.Req.GetArg(kvpb.EndTxn)
@@ -5363,9 +5364,10 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
53635364
proposalFilter.Store(f)
53645365
}
53655366

5366-
// The basic setup for all of these tests are that we have a LHS range on 3
5367-
// nodes and we've partitioned store 0 for the LHS range. The tests will now
5368-
// perform a split, remove the RHS, add it back and validate assumptions.
5367+
// The basic setup for all of these tests are that we have an LHS range on 3
5368+
// nodes (lease on the last one), and we've partitioned store 0 for the LHS
5369+
// range. The tests will now perform a split, remove the RHS, add it back
5370+
// and validate assumptions.
53695371
//
53705372
// Different outcomes will occur depending on whether and how the RHS is
53715373
// partitioned and whether the server is killed. In all cases we want the
@@ -5400,6 +5402,11 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54005402
// Newly-started stores (including the "rogue" one) should not GC
54015403
// their replicas. We'll turn this back on when needed.
54025404
DisableReplicaGCQueue: true,
5405+
// Some subtests, e.g. (4), expect that n1 catches up on the raft
5406+
// log after a partition, and runs the split trigger. Disable raft
5407+
// log truncation to make sure that it doesn't miss the split
5408+
// trigger by being caught up by a later snapshot. See #154313.
5409+
DisableRaftLogQueue: true,
54035410
TestingProposalFilter: testingProposalFilter,
54045411
},
54055412
},
@@ -5486,9 +5493,9 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54865493
target := tc.Target(0)
54875494
log.KvExec.Infof(ctx, "removing voter: %v", target)
54885495
tc.RemoveVotersOrFatal(t, keyB, target)
5489-
// Unsuccessful because the RHS will not accept the learner snapshot
5490-
// and will be rolled back. Nevertheless it will have learned that it
5491-
// has been removed at the old replica ID.
5496+
// Unsuccessful because the RHS will not accept the learner snapshot and
5497+
// will be rolled back. Nevertheless, it will have learned that it has
5498+
// been removed at the old replica ID.
54925499
_, err = tc.Servers[0].DB().AdminChangeReplicas(
54935500
ctx, keyB, tc.LookupRangeOrFatal(t, keyB),
54945501
kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, target),
@@ -5701,8 +5708,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
57015708
target := tc.Target(0)
57025709
log.KvExec.Infof(ctx, "removing voter: %v", target)
57035710
tc.RemoveVotersOrFatal(t, keyB, target)
5704-
// Unsuccessfuly because the RHS will not accept the learner snapshot
5705-
// and will be rolled back. Nevertheless it will have learned that it
5711+
// Unsuccessful because the RHS will not accept the learner snapshot
5712+
// and will be rolled back. Nevertheless, it will have learned that it
57065713
// has been removed at the old replica ID.
57075714
//
57085715
// Not using tc.AddVoters because we expect an error, but that error
@@ -5722,7 +5729,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
57225729
log.KvExec.Infof(ctx, "added %d to RHS partition: %s", rhsInfo.Desc.NextReplicaID, rhsPartition)
57235730

57245731
// We do all of this incrementing to ensure that nobody will ever
5725-
// succeed in sending a message the new RHS replica after we restart
5732+
// succeed in sending a message to the new RHS replica after we restart
57265733
// the store. Previously there were races which could happen if we
57275734
// stopped the store immediately. Sleeps worked but this feels somehow
57285735
// more principled.

pkg/sql/exec_util.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2176,8 +2176,6 @@ type StreamingTestingKnobs struct {
21762176
// for whether the job record is updated on a progress update.
21772177
CutoverProgressShouldUpdate func() bool
21782178

2179-
//ExternalConnectionPollingInterval override the LDR alter
2180-
// connection polling frequency.
21812179
ExternalConnectionPollingInterval *time.Duration
21822180

21832181
DistSQLRetryPolicy *retry.Options

pkg/util/admission/io_load_listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
745745
io.diskWriteTokens = tokens.writeByteTokens
746746
io.diskWriteTokensAllocated = 0
747747
io.diskReadTokens = tokens.readByteTokens
748-
io.diskWriteTokensAllocated = 0
748+
io.diskReadTokensAllocated = 0
749749
}
750750
io.diskBandwidthLimiter.unlimitedTokensOverride = false
751751
if metrics.DiskStats.ProvisionedBandwidth == 0 ||

0 commit comments

Comments
 (0)