Skip to content

Commit edc4191

Browse files
craig[bot]jeffswenson
andcommitted
Merge #153902
153902: logical: deflake TestRandomStream r=jeffswenson a=jeffswenson TestRandomStream passes a non-external URI. This causes the external connection polling logic added by #149261 to return an error which results in the processor shutting down right away. The only reason the test mostly works is the tear down takes some time and we are usually able to replicate a few rows before shutdown completes. Release note: none Fixes: #153666 Fixes: #152435 Co-authored-by: Jeff Swenson <[email protected]>
2 parents c46724d + fd2ed0c commit edc4191

File tree

2 files changed

+7
-14
lines changed

2 files changed

+7
-14
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ go_library(
3434
"//pkg/ccl/changefeedccl/changefeedbase",
3535
"//pkg/ccl/utilccl",
3636
"//pkg/cloud",
37-
"//pkg/cloud/externalconn",
3837
"//pkg/clusterversion",
3938
"//pkg/crosscluster",
4039
"//pkg/crosscluster/physical",

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ package logical
88
import (
99
"context"
1010
"fmt"
11-
"net/url"
1211
"time"
1312

1413
"github.com/cockroachdb/cockroach/pkg/base"
15-
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
1614
"github.com/cockroachdb/cockroach/pkg/crosscluster"
1715
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
1816
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
@@ -1092,21 +1090,17 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
10921090
func resolveDest(
10931091
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
10941092
) (string, error) {
1095-
u, err := url.Parse(sourceURI)
1093+
configUri, err := streamclient.ParseConfigUri(sourceURI)
10961094
if err != nil {
10971095
return "", err
10981096
}
10991097

1100-
resolved := ""
1101-
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1102-
conn, err := externalconn.LoadExternalConnection(ctx, u.Host, txn)
1103-
if err != nil {
1104-
return err
1105-
}
1106-
resolved = conn.UnredactedConnectionStatement()
1107-
return nil
1108-
})
1109-
return resolved, err
1098+
clusterUri, err := configUri.AsClusterUri(ctx, execCfg.InternalDB)
1099+
if err != nil {
1100+
return "", err
1101+
}
1102+
1103+
return clusterUri.Serialize(), nil
11101104
}
11111105

11121106
func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfig) (string, error) {

0 commit comments

Comments
 (0)