Skip to content

Commit f3c2f0e

Browse files
committed
replicationutils: move external conn loader to utility package
Prepping to use this in PCR. Epic: none Release note: none
1 parent 4ddb3e2 commit f3c2f0e

File tree

3 files changed

+86
-52
lines changed

3 files changed

+86
-52
lines changed

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -326,35 +326,13 @@ func (r *logicalReplicationResumer) ingest(
326326
return err
327327
}
328328

329-
refreshConn := func(ctx context.Context) error {
330-
ingestionJob := r.job
331-
details := ingestionJob.Details().(jobspb.LogicalReplicationDetails)
332-
resolvedDest, err := resolveDest(ctx, jobExecCtx.ExecCfg(), details.SourceClusterConnUri)
333-
if err != nil {
334-
return err
335-
}
336-
pollingInterval := 2 * time.Minute
337-
if knobs := jobExecCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
338-
pollingInterval = *knobs.ExternalConnectionPollingInterval
339-
}
340-
t := time.NewTicker(pollingInterval)
341-
defer t.Stop()
342-
for {
343-
select {
344-
case <-ctx.Done():
345-
return ctx.Err()
346-
case <-confPoller:
347-
return nil
348-
case <-t.C:
349-
newDest, err := reloadDest(ctx, ingestionJob.ID(), jobExecCtx.ExecCfg())
350-
if err != nil {
351-
log.Dev.Warningf(ctx, "failed to check for updated configuration: %v", err)
352-
} else if newDest != resolvedDest {
353-
return errors.Mark(errors.Newf("replan due to detail change: old=%s, new=%s", resolvedDest, newDest), sql.ErrPlanChanged)
354-
}
355-
}
356-
}
357-
}
329+
refreshConn := replicationutils.GetAlterConnectionChecker(
330+
r.job.ID(),
331+
uris[0].Serialize(),
332+
geURIFromLoadedJobDetails,
333+
execCfg,
334+
confPoller,
335+
)
358336

359337
defer func() {
360338
if l := payload.MetricsLabel; l != "" {
@@ -1089,29 +1067,8 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
10891067
}
10901068
}
10911069

1092-
func resolveDest(
1093-
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
1094-
) (string, error) {
1095-
configUri, err := streamclient.ParseConfigUri(sourceURI)
1096-
if err != nil {
1097-
return "", err
1098-
}
1099-
1100-
clusterUri, err := configUri.AsClusterUri(ctx, execCfg.InternalDB)
1101-
if err != nil {
1102-
return "", err
1103-
}
1104-
1105-
return clusterUri.Serialize(), nil
1106-
}
1107-
1108-
func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfig) (string, error) {
1109-
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
1110-
if err != nil {
1111-
return "", err
1112-
}
1113-
newDetails := reloadedJob.Details().(jobspb.LogicalReplicationDetails)
1114-
return resolveDest(ctx, execCfg, newDetails.SourceClusterConnUri)
1070+
func geURIFromLoadedJobDetails(details jobspb.Details) string {
1071+
return details.(jobspb.LogicalReplicationDetails).SourceClusterConnUri
11151072
}
11161073

11171074
func init() {

pkg/crosscluster/replicationutils/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "replicationutils",
55
srcs = [
6+
"connection_checker.go",
67
"stats.go",
78
"utils.go",
89
],
910
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils",
1011
visibility = ["//visibility:public"],
1112
deps = [
13+
"//pkg/crosscluster/streamclient",
1214
"//pkg/jobs",
1315
"//pkg/jobs/jobspb",
1416
"//pkg/kv/kvpb",
@@ -33,6 +35,7 @@ go_library(
3335
"//pkg/testutils/fingerprintutils",
3436
"//pkg/util/ctxgroup",
3537
"//pkg/util/hlc",
38+
"//pkg/util/log",
3639
"//pkg/util/syncutil",
3740
"//pkg/util/timeutil",
3841
"@com_github_cockroachdb_errors//:errors",
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2022 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package replicationutils
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/util/log"
16+
"github.com/cockroachdb/errors"
17+
)
18+
19+
// GetAlterConnectionChecker returns a function that will poll for an altered
20+
// external connection. The initial URI passed to this function should be the
21+
// same one passed to subsequent client creation calls.
22+
func GetAlterConnectionChecker(
23+
id jobspb.JobID,
24+
initialURI string,
25+
uriGetter URIGetter,
26+
execCfg *sql.ExecutorConfig,
27+
stopper chan struct{},
28+
) func(ctx context.Context) error {
29+
return func(ctx context.Context) error {
30+
pollingInterval := 2 * time.Minute
31+
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.ExternalConnectionPollingInterval != nil {
32+
pollingInterval = *knobs.ExternalConnectionPollingInterval
33+
}
34+
t := time.NewTicker(pollingInterval)
35+
defer t.Stop()
36+
for {
37+
select {
38+
case <-ctx.Done():
39+
return ctx.Err()
40+
case <-stopper:
41+
return nil
42+
case <-t.C:
43+
reloadedJob, err := execCfg.JobRegistry.LoadJob(ctx, id)
44+
if err != nil {
45+
return err
46+
}
47+
newURI, err := resolveURI(ctx, execCfg, uriGetter(reloadedJob.Details()))
48+
if err != nil {
49+
log.Dev.Warningf(ctx, "failed to load uri: %v", err)
50+
} else if newURI != initialURI {
51+
return errors.Mark(errors.Newf("uri has been updated: old=%s, new=%s", errors.Redact(initialURI), errors.Redact(newURI)), sql.ErrPlanChanged)
52+
}
53+
}
54+
}
55+
}
56+
}
57+
58+
type URIGetter func(details jobspb.Details) string
59+
60+
func resolveURI(
61+
ctx context.Context, execCfg *sql.ExecutorConfig, sourceURI string,
62+
) (string, error) {
63+
configUri, err := streamclient.ParseConfigUri(sourceURI)
64+
if err != nil {
65+
return "", err
66+
}
67+
68+
clusterUri, err := configUri.AsClusterUri(ctx, execCfg.InternalDB)
69+
if err != nil {
70+
return "", err
71+
}
72+
73+
return clusterUri.Serialize(), nil
74+
}

0 commit comments

Comments
 (0)