Skip to content

Commit 9a156da

Browse files
committed
crosscluster/logical: auto-update config on external conn changes
Release note (sql change): logical cluster now uses external connection and automatically updates its configuration when that connection changes. Epic: none
1 parent ed9b0e5 commit 9a156da

File tree

4 files changed

+166
-1
lines changed

4 files changed

+166
-1
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"//pkg/ccl/changefeedccl/changefeedbase",
3535
"//pkg/ccl/utilccl",
3636
"//pkg/cloud",
37+
"//pkg/cloud/externalconn",
3738
"//pkg/clusterversion",
3839
"//pkg/crosscluster",
3940
"//pkg/crosscluster/physical",

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package logical
88
import (
99
"context"
1010
"fmt"
11+
"net/url"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
1416
"github.com/cockroachdb/cockroach/pkg/crosscluster"
1517
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
1618
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
@@ -279,7 +281,9 @@ func (r *logicalReplicationResumer) ingest(
279281
stopReplanner()
280282
}()
281283

284+
confPoller := make(chan struct{})
282285
execPlan := func(ctx context.Context) error {
286+
defer close(confPoller)
283287
rh := rowHandler{
284288
replicatedTimeAtStart: replicatedTimeAtStart,
285289
frontier: frontier,
@@ -322,6 +326,36 @@ func (r *logicalReplicationResumer) ingest(
322326
return err
323327
}
324328

329+
refreshConn := func(ctx context.Context) error {
330+
ingestionJob := r.job
331+
details := ingestionJob.Details().(jobspb.LogicalReplicationDetails)
332+
resolvedDest, err := resolveDest(ctx, jobExecCtx.ExecCfg(), details.SourceClusterConnUri)
333+
if err != nil {
334+
return err
335+
}
336+
pollingInterval := 2 * time.Minute
337+
if knobs := jobExecCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
338+
pollingInterval = *knobs.ExternalConnectionPollingInterval
339+
}
340+
t := time.NewTicker(pollingInterval)
341+
defer t.Stop()
342+
for {
343+
select {
344+
case <-ctx.Done():
345+
return ctx.Err()
346+
case <-confPoller:
347+
return nil
348+
case <-t.C:
349+
newDest, err := reloadDest(ctx, ingestionJob.ID(), jobExecCtx.ExecCfg())
350+
if err != nil {
351+
log.Warningf(ctx, "failed to check for updated configuration: %v", err)
352+
} else if newDest != resolvedDest {
353+
return errors.Mark(errors.Newf("replan due to detail change: old=%s, new=%s", resolvedDest, newDest), sql.ErrPlanChanged)
354+
}
355+
}
356+
}
357+
}
358+
325359
defer func() {
326360
if l := payload.MetricsLabel; l != "" {
327361
metrics.LabeledScanningRanges.Update(map[string]string{"label": l}, 0)
@@ -331,7 +365,7 @@ func (r *logicalReplicationResumer) ingest(
331365
metrics.CatchupRanges.Update(0)
332366
}()
333367

334-
err = ctxgroup.GoAndWait(ctx, execPlan, replanner, startHeartbeat)
368+
err = ctxgroup.GoAndWait(ctx, execPlan, replanner, startHeartbeat, refreshConn)
335369
if errors.Is(err, sql.ErrPlanChanged) {
336370
metrics.ReplanCount.Inc(1)
337371
}
@@ -890,6 +924,7 @@ func (r *logicalReplicationResumer) ingestWithRetries(
890924
ro := getRetryPolicy(execCtx.ExecCfg().StreamingTestingKnobs)
891925
var err error
892926
var lastReplicatedTime hlc.Timestamp
927+
893928
for retrier := retry.Start(ro); retrier.Next(); {
894929
err = r.ingest(ctx, execCtx)
895930
if err == nil {
@@ -1054,6 +1089,35 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
10541089
}
10551090
}
10561091

1092+
func resolveDest(
1093+
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
1094+
) (string, error) {
1095+
u, err := url.Parse(sourceURI)
1096+
if err != nil {
1097+
return "", err
1098+
}
1099+
1100+
resolved := ""
1101+
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1102+
conn, err := externalconn.LoadExternalConnection(ctx, u.Host, txn)
1103+
if err != nil {
1104+
return err
1105+
}
1106+
resolved = conn.UnredactedConnectionStatement()
1107+
return nil
1108+
})
1109+
return resolved, err
1110+
}
1111+
1112+
func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfig) (string, error) {
1113+
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
1114+
if err != nil {
1115+
return "", err
1116+
}
1117+
newDetails := reloadedJob.Details().(jobspb.LogicalReplicationDetails)
1118+
return resolveDest(ctx, execCfg, newDetails.SourceClusterConnUri)
1119+
}
1120+
10571121
func init() {
10581122
m := MakeMetrics(base.DefaultHistogramWindowInterval())
10591123
jobs.RegisterConstructor(

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,6 +2443,102 @@ func TestLogicalReplicationGatewayRoute(t *testing.T) {
24432443
require.Empty(t, progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.PartitionConnUris)
24442444
}
24452445

2446+
// TestAlterExternalConnection tests that logical replication streams can
2447+
// dynamically switch between different source nodes when the external
2448+
// connection URI is updated. It verifies that data continues to replicate
2449+
// correctly after the connection change.
2450+
func TestAlterExternalConnection(t *testing.T) {
2451+
defer leaktest.AfterTest(t)()
2452+
skip.UnderDeadlock(t)
2453+
skip.UnderRace(t)
2454+
defer log.Scope(t).Close(t)
2455+
2456+
ctx := context.Background()
2457+
pollingInterval := 100 * time.Millisecond
2458+
2459+
clusterArgs := base.TestClusterArgs{
2460+
ServerArgs: base.TestServerArgs{
2461+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2462+
Knobs: base.TestingKnobs{
2463+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2464+
Streaming: &sql.StreamingTestingKnobs{
2465+
ExternalConnectionPollingInterval: &pollingInterval,
2466+
},
2467+
},
2468+
},
2469+
}
2470+
2471+
activeLogicalSessionSQL := "SELECT count(*) > 0 FROM crdb_internal.node_sessions WHERE application_name like '$ internal repstream job id=%' AND status='ACTIVE'"
2472+
countLogicalSessionSQL := "SELECT count(*) FROM crdb_internal.node_sessions WHERE application_name like '$ internal repstream job id=%' AND status='ACTIVE'"
2473+
server, node0, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 3, 2)
2474+
defer server.Stopper().Stop(ctx)
2475+
2476+
dbA := runners[0]
2477+
dbB := runners[1]
2478+
2479+
dbANode0URL, cleanup := node0.PGUrl(t, serverutils.DBName(dbNames[0]))
2480+
defer cleanup()
2481+
dbANode0 := sqlutils.MakeSQLRunner(node0.SQLConn(t, serverutils.DBName(dbNames[0])))
2482+
node1 := server.Server(1).ApplicationLayer()
2483+
dbANode1URL, cleanup := node1.PGUrl(t, serverutils.DBName(dbNames[0]))
2484+
defer cleanup()
2485+
dbANode1 := sqlutils.MakeSQLRunner(node1.SQLConn(t, serverutils.DBName(dbNames[0])))
2486+
2487+
q0 := dbANode0URL.Query()
2488+
q0.Set(streamclient.RoutingModeKey, string(streamclient.RoutingModeGateway))
2489+
dbANode0URL.RawQuery = q0.Encode()
2490+
2491+
q1 := dbANode1URL.Query()
2492+
q1.Set(streamclient.RoutingModeKey, string(streamclient.RoutingModeGateway))
2493+
dbANode1URL.RawQuery = q1.Encode()
2494+
2495+
// We want to make sure operations for cluster B is on seperate node from cluster A.
2496+
node2 := server.Server(2).ApplicationLayer()
2497+
dbBNode2 := sqlutils.MakeSQLRunner(node2.SQLConn(t, serverutils.DBName(dbNames[1])))
2498+
2499+
require.NotEqual(t, dbANode0URL.String(), dbANode1URL.String())
2500+
2501+
externalConnName := "test_conn"
2502+
dbBNode2.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION '%s' AS '%s'", externalConnName, dbANode0URL.String()))
2503+
2504+
var jobID jobspb.JobID
2505+
dbBNode2.QueryRow(t, fmt.Sprintf(
2506+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON 'external://%s' INTO TABLE tab",
2507+
externalConnName)).Scan(&jobID)
2508+
2509+
dbANode0.Exec(t, "INSERT INTO tab VALUES (1, 'via_node_0')")
2510+
2511+
now := node0.Clock().Now()
2512+
WaitUntilReplicatedTime(t, now, dbB, jobID)
2513+
2514+
dbANode0.CheckQueryResults(t,
2515+
activeLogicalSessionSQL,
2516+
[][]string{{"true"}})
2517+
dbANode1.CheckQueryResults(t,
2518+
countLogicalSessionSQL,
2519+
[][]string{{"0"}})
2520+
2521+
dbBNode2.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 1", [][]string{
2522+
{"1", "via_node_0"},
2523+
})
2524+
2525+
dbBNode2.Exec(t, fmt.Sprintf("ALTER EXTERNAL CONNECTION '%s' AS '%s'", externalConnName, dbANode1URL.String()))
2526+
dbANode1.CheckQueryResultsRetry(t,
2527+
activeLogicalSessionSQL,
2528+
[][]string{{"true"}})
2529+
dbANode0.CheckQueryResultsRetry(t,
2530+
countLogicalSessionSQL,
2531+
[][]string{{"0"}})
2532+
2533+
dbA.Exec(t, "INSERT INTO tab VALUES (2, 'via_node_1')")
2534+
now = node0.Clock().Now()
2535+
WaitUntilReplicatedTime(t, now, dbB, jobID)
2536+
2537+
dbBNode2.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 2", [][]string{
2538+
{"2", "via_node_1"},
2539+
})
2540+
}
2541+
24462542
func TestMismatchColIDs(t *testing.T) {
24472543
defer leaktest.AfterTest(t)()
24482544
skip.UnderDeadlock(t)

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,6 +2048,10 @@ type StreamingTestingKnobs struct {
20482048
// for whether the job record is updated on a progress update.
20492049
CutoverProgressShouldUpdate func() bool
20502050

2051+
//ExternalConnectionPollingInterval override the LDR alter
2052+
// connection polling frequency.
2053+
ExternalConnectionPollingInterval *time.Duration
2054+
20512055
DistSQLRetryPolicy *retry.Options
20522056

20532057
AfterRetryIteration func(err error)

0 commit comments

Comments
 (0)