Skip to content

Commit 3fd72bb

Browse files
craig[bot]edwardguo-crlDarrylWongrafiss
committed
149261: crosscluster/logical: auto-update config on external conn changes r=edwardguo-crl a=edwardguo-crl Release note (sql change): Automatically reflect external connection changes in logical clusters. Epic: none Reference: #98610 151745: mixedversion: don't mark shared process tenant availability in monitor r=Dev-Kyle,herkolategan a=DarrylWong The monitor does not support monitoring shared process tenants in isolation, i.e. it can only monitor both system and the secondary shared process. However, the mixed version framework was reporting availability of shared process tenants to the monitor. This broke an assumption that the monitor makes that any registered process is either the system tenant or a separate process tenant. This would cause the monitor to mistakenly believe there was a running separate process tenant. Fixes: #151450 151749: roachtest: update rust_postgres test for LTREE r=rafiss a=rafiss Two tests are passing now that support for LTREE was added. fixes #151637 Release note: None 151751: bench/rttanalysis: skip BenchmarkShowGrants under short config r=rafiss a=rafiss Previously, this benchmark was included in CI runs and would take more than 20 minutes to complete a single iteration (--bench-time=1s --count=1). This is because the benchmark aims for a measured runtime of 1 second, but the unmeasured operations within each iteration take over 20 minutes combined to reach that target. This change skips the benchmark in CI runs. Fixes: #151653 Release note: None Co-authored-by: Edward Guo <[email protected]> Co-authored-by: DarrylWong <[email protected]> Co-authored-by: Rafi Shamim <[email protected]>
5 parents 7a7aab5 + 9a156da + d109d2a + 9a26c2f + 38628f6 commit 3fd72bb

File tree

8 files changed

+187
-11
lines changed

8 files changed

+187
-11
lines changed

pkg/bench/rttanalysis/grant_revoke_role_bench_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55

66
package rttanalysis
77

8-
import "testing"
8+
import (
9+
"testing"
10+
11+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
12+
)
913

1014
func BenchmarkGrantRole(b *testing.B) { reg.Run(b) }
1115
func init() {
@@ -28,7 +32,11 @@ CREATE ROLE c;`,
2832
})
2933
}
3034

31-
func BenchmarkShowGrants(b *testing.B) { reg.Run(b) }
35+
func BenchmarkShowGrants(b *testing.B) {
36+
skip.UnderShort(b, "skipping long benchmark")
37+
reg.Run(b)
38+
}
39+
3240
func init() {
3341
reg.Register("ShowGrants", []RoundTripBenchTestCase{
3442
{

pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,19 @@ func (s *Service) RandomDB(rng *rand.Rand) (int, *gosql.DB) {
9797
return node, s.Connect(node)
9898
}
9999

100-
// prepareQuery returns a connection to one of the `nodes` provided
101-
// and logs the query and gateway node in the step's log file. Called
100+
// prepareQuery returns a connection to one of the available nodes in `nodes`
101+
// provided and logs the query and gateway node in the step's log file. Called
102102
// before the query is actually performed.
103103
func (s *Service) prepareQuery(
104104
rng *rand.Rand, nodes option.NodeListOption, query string, args ...any,
105105
) (*gosql.DB, error) {
106106
availableNodes := s.AvailableNodes().Intersect(nodes)
107+
if len(availableNodes) == 0 {
108+
return nil, errors.Newf(
109+
"no available nodes in the intersection of %s and %s",
110+
s.AvailableNodes(), nodes,
111+
)
112+
}
107113
node := availableNodes.SeededRandNode(rng)[0]
108114
db := s.Connect(node)
109115

@@ -137,8 +143,7 @@ func (s *Service) Exec(rng *rand.Rand, query string, args ...interface{}) error
137143
func (s *Service) ExecWithGateway(
138144
rng *rand.Rand, nodes option.NodeListOption, query string, args ...interface{},
139145
) error {
140-
availableNodes := s.AvailableNodes().Intersect(nodes)
141-
db, err := s.prepareQuery(rng, availableNodes, query, args...)
146+
db, err := s.prepareQuery(rng, nodes, query, args...)
142147
if err != nil {
143148
return err
144149
}

pkg/cmd/roachtest/roachtestutil/mixedversion/steps.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,7 @@ func (s networkPartitionInjectStep) Run(
903903
ctx context.Context, l *logger.Logger, _ *rand.Rand, h *Helper,
904904
) error {
905905
h.runner.monitor.ExpectProcessDead(s.targetNode)
906-
if h.Tenant != nil {
906+
if h.DeploymentMode() == SeparateProcessDeployment {
907907
opt := option.VirtualClusterName(h.Tenant.Descriptor.Name)
908908
h.runner.monitor.ExpectProcessDead(s.targetNode, opt)
909909
}
@@ -958,7 +958,7 @@ func (s networkPartitionRecoveryStep) Run(
958958
}
959959

960960
h.runner.monitor.ExpectProcessAlive(s.targetNode)
961-
if h.Tenant != nil {
961+
if h.DeploymentMode() == SeparateProcessDeployment {
962962
opt := option.VirtualClusterName(h.Tenant.Descriptor.Name)
963963
h.runner.monitor.ExpectProcessAlive(s.targetNode, opt)
964964
}

pkg/cmd/roachtest/tests/rust_postgres_blocklist.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ var rustPostgresBlocklist = blocklist{
5656
"types.enum_": "experimental feature - https://github.com/cockroachdb/cockroach/issues/46260",
5757
"types.lquery": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
5858
"types.lquery_any": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
59-
"types.ltree": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
60-
"types.ltree_any": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
6159
"types.ltxtquery": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
6260
"types.ltxtquery_any": "unsupported datatype - https://github.com/cockroachdb/cockroach/issues/44657",
6361
"types.test_array_vec_params": "default int size (int4 vs int8) mismatch",

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"//pkg/ccl/changefeedccl/changefeedbase",
3535
"//pkg/ccl/utilccl",
3636
"//pkg/cloud",
37+
"//pkg/cloud/externalconn",
3738
"//pkg/clusterversion",
3839
"//pkg/crosscluster",
3940
"//pkg/crosscluster/physical",

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package logical
88
import (
99
"context"
1010
"fmt"
11+
"net/url"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
1416
"github.com/cockroachdb/cockroach/pkg/crosscluster"
1517
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
1618
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
@@ -279,7 +281,9 @@ func (r *logicalReplicationResumer) ingest(
279281
stopReplanner()
280282
}()
281283

284+
confPoller := make(chan struct{})
282285
execPlan := func(ctx context.Context) error {
286+
defer close(confPoller)
283287
rh := rowHandler{
284288
replicatedTimeAtStart: replicatedTimeAtStart,
285289
frontier: frontier,
@@ -322,6 +326,36 @@ func (r *logicalReplicationResumer) ingest(
322326
return err
323327
}
324328

329+
refreshConn := func(ctx context.Context) error {
330+
ingestionJob := r.job
331+
details := ingestionJob.Details().(jobspb.LogicalReplicationDetails)
332+
resolvedDest, err := resolveDest(ctx, jobExecCtx.ExecCfg(), details.SourceClusterConnUri)
333+
if err != nil {
334+
return err
335+
}
336+
pollingInterval := 2 * time.Minute
337+
if knobs := jobExecCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
338+
pollingInterval = *knobs.ExternalConnectionPollingInterval
339+
}
340+
t := time.NewTicker(pollingInterval)
341+
defer t.Stop()
342+
for {
343+
select {
344+
case <-ctx.Done():
345+
return ctx.Err()
346+
case <-confPoller:
347+
return nil
348+
case <-t.C:
349+
newDest, err := reloadDest(ctx, ingestionJob.ID(), jobExecCtx.ExecCfg())
350+
if err != nil {
351+
log.Warningf(ctx, "failed to check for updated configuration: %v", err)
352+
} else if newDest != resolvedDest {
353+
return errors.Mark(errors.Newf("replan due to detail change: old=%s, new=%s", resolvedDest, newDest), sql.ErrPlanChanged)
354+
}
355+
}
356+
}
357+
}
358+
325359
defer func() {
326360
if l := payload.MetricsLabel; l != "" {
327361
metrics.LabeledScanningRanges.Update(map[string]string{"label": l}, 0)
@@ -331,7 +365,7 @@ func (r *logicalReplicationResumer) ingest(
331365
metrics.CatchupRanges.Update(0)
332366
}()
333367

334-
err = ctxgroup.GoAndWait(ctx, execPlan, replanner, startHeartbeat)
368+
err = ctxgroup.GoAndWait(ctx, execPlan, replanner, startHeartbeat, refreshConn)
335369
if errors.Is(err, sql.ErrPlanChanged) {
336370
metrics.ReplanCount.Inc(1)
337371
}
@@ -890,6 +924,7 @@ func (r *logicalReplicationResumer) ingestWithRetries(
890924
ro := getRetryPolicy(execCtx.ExecCfg().StreamingTestingKnobs)
891925
var err error
892926
var lastReplicatedTime hlc.Timestamp
927+
893928
for retrier := retry.Start(ro); retrier.Next(); {
894929
err = r.ingest(ctx, execCtx)
895930
if err == nil {
@@ -1054,6 +1089,35 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
10541089
}
10551090
}
10561091

1092+
func resolveDest(
1093+
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
1094+
) (string, error) {
1095+
u, err := url.Parse(sourceURI)
1096+
if err != nil {
1097+
return "", err
1098+
}
1099+
1100+
resolved := ""
1101+
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1102+
conn, err := externalconn.LoadExternalConnection(ctx, u.Host, txn)
1103+
if err != nil {
1104+
return err
1105+
}
1106+
resolved = conn.UnredactedConnectionStatement()
1107+
return nil
1108+
})
1109+
return resolved, err
1110+
}
1111+
1112+
func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfig) (string, error) {
1113+
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
1114+
if err != nil {
1115+
return "", err
1116+
}
1117+
newDetails := reloadedJob.Details().(jobspb.LogicalReplicationDetails)
1118+
return resolveDest(ctx, execCfg, newDetails.SourceClusterConnUri)
1119+
}
1120+
10571121
func init() {
10581122
m := MakeMetrics(base.DefaultHistogramWindowInterval())
10591123
jobs.RegisterConstructor(

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,6 +2443,102 @@ func TestLogicalReplicationGatewayRoute(t *testing.T) {
24432443
require.Empty(t, progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.PartitionConnUris)
24442444
}
24452445

2446+
// TestAlterExternalConnection tests that logical replication streams can
2447+
// dynamically switch between different source nodes when the external
2448+
// connection URI is updated. It verifies that data continues to replicate
2449+
// correctly after the connection change.
2450+
func TestAlterExternalConnection(t *testing.T) {
2451+
defer leaktest.AfterTest(t)()
2452+
skip.UnderDeadlock(t)
2453+
skip.UnderRace(t)
2454+
defer log.Scope(t).Close(t)
2455+
2456+
ctx := context.Background()
2457+
pollingInterval := 100 * time.Millisecond
2458+
2459+
clusterArgs := base.TestClusterArgs{
2460+
ServerArgs: base.TestServerArgs{
2461+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2462+
Knobs: base.TestingKnobs{
2463+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2464+
Streaming: &sql.StreamingTestingKnobs{
2465+
ExternalConnectionPollingInterval: &pollingInterval,
2466+
},
2467+
},
2468+
},
2469+
}
2470+
2471+
activeLogicalSessionSQL := "SELECT count(*) > 0 FROM crdb_internal.node_sessions WHERE application_name like '$ internal repstream job id=%' AND status='ACTIVE'"
2472+
countLogicalSessionSQL := "SELECT count(*) FROM crdb_internal.node_sessions WHERE application_name like '$ internal repstream job id=%' AND status='ACTIVE'"
2473+
server, node0, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 3, 2)
2474+
defer server.Stopper().Stop(ctx)
2475+
2476+
dbA := runners[0]
2477+
dbB := runners[1]
2478+
2479+
dbANode0URL, cleanup := node0.PGUrl(t, serverutils.DBName(dbNames[0]))
2480+
defer cleanup()
2481+
dbANode0 := sqlutils.MakeSQLRunner(node0.SQLConn(t, serverutils.DBName(dbNames[0])))
2482+
node1 := server.Server(1).ApplicationLayer()
2483+
dbANode1URL, cleanup := node1.PGUrl(t, serverutils.DBName(dbNames[0]))
2484+
defer cleanup()
2485+
dbANode1 := sqlutils.MakeSQLRunner(node1.SQLConn(t, serverutils.DBName(dbNames[0])))
2486+
2487+
q0 := dbANode0URL.Query()
2488+
q0.Set(streamclient.RoutingModeKey, string(streamclient.RoutingModeGateway))
2489+
dbANode0URL.RawQuery = q0.Encode()
2490+
2491+
q1 := dbANode1URL.Query()
2492+
q1.Set(streamclient.RoutingModeKey, string(streamclient.RoutingModeGateway))
2493+
dbANode1URL.RawQuery = q1.Encode()
2494+
2495+
// We want to make sure operations for cluster B is on seperate node from cluster A.
2496+
node2 := server.Server(2).ApplicationLayer()
2497+
dbBNode2 := sqlutils.MakeSQLRunner(node2.SQLConn(t, serverutils.DBName(dbNames[1])))
2498+
2499+
require.NotEqual(t, dbANode0URL.String(), dbANode1URL.String())
2500+
2501+
externalConnName := "test_conn"
2502+
dbBNode2.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION '%s' AS '%s'", externalConnName, dbANode0URL.String()))
2503+
2504+
var jobID jobspb.JobID
2505+
dbBNode2.QueryRow(t, fmt.Sprintf(
2506+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON 'external://%s' INTO TABLE tab",
2507+
externalConnName)).Scan(&jobID)
2508+
2509+
dbANode0.Exec(t, "INSERT INTO tab VALUES (1, 'via_node_0')")
2510+
2511+
now := node0.Clock().Now()
2512+
WaitUntilReplicatedTime(t, now, dbB, jobID)
2513+
2514+
dbANode0.CheckQueryResults(t,
2515+
activeLogicalSessionSQL,
2516+
[][]string{{"true"}})
2517+
dbANode1.CheckQueryResults(t,
2518+
countLogicalSessionSQL,
2519+
[][]string{{"0"}})
2520+
2521+
dbBNode2.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 1", [][]string{
2522+
{"1", "via_node_0"},
2523+
})
2524+
2525+
dbBNode2.Exec(t, fmt.Sprintf("ALTER EXTERNAL CONNECTION '%s' AS '%s'", externalConnName, dbANode1URL.String()))
2526+
dbANode1.CheckQueryResultsRetry(t,
2527+
activeLogicalSessionSQL,
2528+
[][]string{{"true"}})
2529+
dbANode0.CheckQueryResultsRetry(t,
2530+
countLogicalSessionSQL,
2531+
[][]string{{"0"}})
2532+
2533+
dbA.Exec(t, "INSERT INTO tab VALUES (2, 'via_node_1')")
2534+
now = node0.Clock().Now()
2535+
WaitUntilReplicatedTime(t, now, dbB, jobID)
2536+
2537+
dbBNode2.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 2", [][]string{
2538+
{"2", "via_node_1"},
2539+
})
2540+
}
2541+
24462542
func TestMismatchColIDs(t *testing.T) {
24472543
defer leaktest.AfterTest(t)()
24482544
skip.UnderDeadlock(t)

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,6 +2048,10 @@ type StreamingTestingKnobs struct {
20482048
// for whether the job record is updated on a progress update.
20492049
CutoverProgressShouldUpdate func() bool
20502050

2051+
//ExternalConnectionPollingInterval override the LDR alter
2052+
// connection polling frequency.
2053+
ExternalConnectionPollingInterval *time.Duration
2054+
20512055
DistSQLRetryPolicy *retry.Options
20522056

20532057
AfterRetryIteration func(err error)

0 commit comments

Comments
 (0)