Skip to content

Commit 20c7483

Browse files
committed
crosscluster/physical: add alter connection watcher
PCR now automatically updates the uri used if it's apart of an external connection that is altered. Informs #153471 Release note: none
1 parent f3c2f0e commit 20c7483

File tree

3 files changed

+82
-5
lines changed

3 files changed

+82
-5
lines changed

pkg/crosscluster/physical/replication_stream_e2e_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,3 +1800,62 @@ func splitPrimaryKeyIndexSpan(
18001800
require.NoError(t, db.AdminSplit(ctx, pkStartKey, hlc.MaxTimestamp))
18011801
require.NoError(t, db.AdminSplit(ctx, pkEndKey, hlc.MaxTimestamp))
18021802
}
1803+
1804+
func TestAlterExternalConnection(t *testing.T) {
1805+
defer leaktest.AfterTest(t)()
1806+
skip.UnderDeadlock(t)
1807+
skip.UnderRace(t)
1808+
defer log.Scope(t).Close(t)
1809+
1810+
ctx := context.Background()
1811+
pollingInterval := 100 * time.Millisecond
1812+
1813+
var alreadyReplanned atomic.Int32
1814+
1815+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
1816+
args.TestingKnobs = &sql.StreamingTestingKnobs{
1817+
ExternalConnectionPollingInterval: &pollingInterval,
1818+
AfterRetryIteration: func(err error) {
1819+
alreadyReplanned.Add(1)
1820+
},
1821+
}
1822+
1823+
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
1824+
defer cleanup()
1825+
1826+
externalConnection := "replication-source-addr"
1827+
ogConnection := c.SrcURL.String()
1828+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`,
1829+
externalConnection, ogConnection))
1830+
c.DestSysSQL.Exec(c.T, c.BuildCreateTenantQuery(externalConnection))
1831+
streamProducerJobID, ingestionJobID := replicationtestutils.GetStreamJobIds(c.T, ctx, c.DestSysSQL, c.Args.DestTenantName)
1832+
1833+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(streamProducerJobID))
1834+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
1835+
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
1836+
1837+
// Alter the external connection to break the stream, to test that pcr watches for new connection changes
1838+
c.SrcSysSQL.Exec(t, fmt.Sprintf("CREATE USER %s", username.TestUser))
1839+
srcAppURL, cleanupSinkCert := pgurlutils.PGUrl(t, c.SrcSysServer.AdvSQLAddr(), t.Name(), url.User(username.TestUser))
1840+
defer cleanupSinkCert()
1841+
1842+
beforeReplanCount := alreadyReplanned.Load()
1843+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`ALTER EXTERNAL CONNECTION "%s" AS "%s"`,
1844+
externalConnection, &srcAppURL))
1845+
1846+
testutils.SucceedsSoon(t, func() error {
1847+
if alreadyReplanned.Load() <= beforeReplanCount+2 {
1848+
return errors.New("not yet replanned twice")
1849+
}
1850+
return nil
1851+
})
1852+
1853+
// Alter the external connection to fix the stream, and ensure replication resumes
1854+
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`ALTER EXTERNAL CONNECTION "%s" AS "%s"`,
1855+
externalConnection, ogConnection))
1856+
c.DestSysSQL.Exec(c.T, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
1857+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
1858+
// ensure the stream advances
1859+
srcTime := c.SrcCluster.Server(0).Clock().Now()
1860+
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
1861+
}

pkg/crosscluster/physical/stream_ingestion_dist.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,16 @@ func startDistIngestion(
9696
updateStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)
9797
}
9898

99-
client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB,
100-
streamclient.WithStreamID(streamID))
99+
clusterUris, err := getClusterUris(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
100+
if err != nil {
101+
return err
102+
}
103+
client, err := streamclient.GetFirstActiveClient(ctx, clusterUris, execCtx.ExecCfg().InternalDB, streamclient.WithStreamID(streamID))
101104
if err != nil {
102105
return err
103106
}
104107
defer closeAndLog(ctx, client)
108+
105109
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
106110
return err
107111
}
@@ -197,11 +201,23 @@ func startDistIngestion(
197201
}
198202
return ingestor.ingestSpanConfigs(ctx, details.SourceTenantName)
199203
}
204+
205+
refreshConnStopper := make(chan struct{})
206+
207+
refreshConn := replicationutils.GetAlterConnectionChecker(
208+
ingestionJob.ID(),
209+
clusterUris[0].Serialize(),
210+
geURIFromIngestionJobDetails,
211+
execCtx.ExecCfg(),
212+
refreshConnStopper,
213+
)
214+
200215
execInitialPlan := func(ctx context.Context) error {
201216
defer func() {
202217
stopReplanner()
203218
close(tracingAggCh)
204219
close(spanConfigIngestStopper)
220+
close(refreshConnStopper)
205221
}()
206222
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)
207223

@@ -273,13 +289,17 @@ func startDistIngestion(
273289
return err
274290
}
275291

276-
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs)
292+
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs, refreshConn)
277293
if errors.Is(err, sql.ErrPlanChanged) {
278294
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
279295
}
280296
return err
281297
}
282298

299+
func geURIFromIngestionJobDetails(details jobspb.Details) string {
300+
return details.(jobspb.StreamIngestionDetails).SourceClusterConnUri
301+
}
302+
283303
func sortSpans(partitions []streamclient.PartitionInfo) roachpb.Spans {
284304
spansToSort := make(roachpb.Spans, 0)
285305
for i := range partitions {

pkg/sql/exec_util.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,8 +2160,6 @@ type StreamingTestingKnobs struct {
21602160
// for whether the job record is updated on a progress update.
21612161
CutoverProgressShouldUpdate func() bool
21622162

2163-
//ExternalConnectionPollingInterval override the LDR alter
2164-
// connection polling frequency.
21652163
ExternalConnectionPollingInterval *time.Duration
21662164

21672165
DistSQLRetryPolicy *retry.Options

0 commit comments

Comments
 (0)