Skip to content

Commit 32f846f

Browse files
craig[bot]msbutlerrimadeodhar
committed
107671: c2c: add SetupSpanConfigsStream(tenantName) to stream client r=stevendanna a=msbutler This patch adds a new call to the stream client, which uses the new crdb_internal.setup_span_configs_stream() builtin, to create a replication stream specification to stream span config updates relevant to the replicating tenant. This new client call essentially combines the client.Create() and client.Plan() into one call, as it returns a streamID and topology to the user. Note this new call differs from client.Create() as it neither creates a job nor lays a protected timestamp on the source cluster. In other words, the user should treat this span configuration stream as ephemeral. A future PR will teach client.Subscribe to stream these updates and ingest them into the destination. Informs cockroachdb#106823 Release note: None 107915: sql: Fix mutex leak within TestCheckConstraintDropAndColumn r=rimadeodhar a=rimadeodhar The test does not unlock the jobControlMu mutex in the case of an error. This PR fixes that. Epic: none Fixes: cockroachdb#107433 Release note: none Co-authored-by: Michael Butler <[email protected]> Co-authored-by: rimadeodhar <[email protected]>
3 parents 00abaf4 + ffea5af + e578000 commit 32f846f

File tree

21 files changed

+309
-51
lines changed

21 files changed

+309
-51
lines changed

pkg/ccl/logictestccl/testdata/logic_test/crdb_internal

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ subtest replication-builtins
168168

169169
user root
170170

171-
query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
172-
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())
173171

174172
query error pq: crdb_internal\.stream_ingestion_stats_json\(\): unimplemented
175173
SELECT crdb_internal.stream_ingestion_stats_json(1);

pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ go_library(
4343
"//pkg/util/ctxgroup",
4444
"//pkg/util/hlc",
4545
"//pkg/util/protoutil",
46+
"//pkg/util/randutil",
4647
"//pkg/util/retry",
4748
"//pkg/util/timeutil",
4849
"@com_github_cockroachdb_apd_v3//:apd",

pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ package replicationtestutils
1111
import (
1212
"bytes"
1313
"context"
14+
"math/rand"
1415
"net/url"
16+
"os"
1517
"strings"
1618
"testing"
1719
"time"
@@ -29,6 +31,7 @@ import (
2931
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3032
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3133
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
34+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3235
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3336
"github.com/cockroachdb/errors"
3437
"github.com/stretchr/testify/require"
@@ -185,6 +188,8 @@ type ReplicationHelper struct {
185188
SysSQL *sqlutils.SQLRunner
186189
// PGUrl is the pgurl of this server.
187190
PGUrl url.URL
191+
192+
rng *rand.Rand
188193
}
189194

190195
// NewReplicationHelper starts test server with the required cluster settings for streming
@@ -211,10 +216,14 @@ SET CLUSTER SETTING cross_cluster_replication.enabled = true;
211216
// Sink to read data from.
212217
sink, cleanupSink := sqlutils.PGUrl(t, s.AdvSQLAddr(), t.Name(), url.User(username.RootUser))
213218

219+
rng, seed := randutil.NewPseudoRand()
220+
t.Logf("Replication helper seed %d", seed)
221+
214222
h := &ReplicationHelper{
215223
SysServer: s,
216224
SysSQL: sqlutils.MakeSQLRunner(db),
217225
PGUrl: sink,
226+
rng: rng,
218227
}
219228

220229
return h, func() {
@@ -262,3 +271,35 @@ func (rh *ReplicationHelper) StartReplicationStream(
262271
require.NoError(t, err)
263272
return replicationProducerSpec
264273
}
274+
275+
func (rh *ReplicationHelper) SetupSpanConfigsReplicationStream(
276+
t *testing.T, sourceTenantName roachpb.TenantName,
277+
) streampb.ReplicationStreamSpec {
278+
var rawSpec []byte
279+
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.setup_span_configs_stream($1)`, sourceTenantName)
280+
row.Scan(&rawSpec)
281+
var spec streampb.ReplicationStreamSpec
282+
err := protoutil.Unmarshal(rawSpec, &spec)
283+
require.NoError(t, err)
284+
return spec
285+
}
286+
287+
func (rh *ReplicationHelper) MaybeGenerateInlineURL(t *testing.T) *url.URL {
288+
if rh.rng.Float64() > 0.5 {
289+
return &rh.PGUrl
290+
}
291+
292+
t.Log("using inline certificates")
293+
ret := rh.PGUrl
294+
v := ret.Query()
295+
for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} {
296+
path := v.Get(opt)
297+
content, err := os.ReadFile(path)
298+
require.NoError(t, err)
299+
v.Set(opt, string(content))
300+
301+
}
302+
v.Set("sslinline", "true")
303+
ret.RawQuery = v.Encode()
304+
return &ret
305+
}

pkg/ccl/streamingccl/streamclient/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type Client interface {
5151
// can be used to interact with this stream in the future.
5252
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)
5353

54+
// SetupSpanConfigsStream creates a stream for the span configs
55+
// that apply to the passed in tenant, and returns the subscriptions the
56+
// client can subscribe to. No protected timestamp or job is persisted to the
57+
// source cluster.
58+
SetupSpanConfigsStream(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, Topology, error)
59+
5460
// Dial checks if the source is able to be connected to for queries
5561
Dial(ctx context.Context) error
5662

pkg/ccl/streamingccl/streamclient/client_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ func (sc testStreamClient) Create(
4747
}, nil
4848
}
4949

50+
// SetupSpanConfigsStream implements the Client interface.
51+
func (sc testStreamClient) SetupSpanConfigsStream(
52+
ctx context.Context, tenant roachpb.TenantName,
53+
) (streampb.StreamID, Topology, error) {
54+
panic("not implemented")
55+
}
56+
5057
// Plan implements the Client interface.
5158
func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) {
5259
return Topology{

pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func (p *partitionedStreamClient) Create(
8484
) (streampb.ReplicationProducerSpec, error) {
8585
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
8686
defer sp.Finish()
87-
8887
p.mu.Lock()
8988
defer p.mu.Unlock()
9089
var rawReplicationProducerSpec []byte
@@ -101,6 +100,30 @@ func (p *partitionedStreamClient) Create(
101100
return replicationProducerSpec, err
102101
}
103102

103+
func (p *partitionedStreamClient) SetupSpanConfigsStream(
104+
ctx context.Context, tenantName roachpb.TenantName,
105+
) (streampb.StreamID, Topology, error) {
106+
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.SetupSpanConfigsStream")
107+
defer sp.Finish()
108+
var spec streampb.ReplicationStreamSpec
109+
110+
{
111+
p.mu.Lock()
112+
defer p.mu.Unlock()
113+
114+
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.setup_span_configs_stream($1)`, tenantName)
115+
var rawSpec []byte
116+
if err := row.Scan(&rawSpec); err != nil {
117+
return 0, Topology{}, errors.Wrapf(err, "cannot setup span config replication stream for tenant %s", tenantName)
118+
}
119+
if err := protoutil.Unmarshal(rawSpec, &spec); err != nil {
120+
return 0, Topology{}, err
121+
}
122+
}
123+
topology, err := p.createTopology(spec)
124+
return spec.SpanConfigStreamID, topology, err
125+
}
126+
104127
// Dial implements Client interface.
105128
func (p *partitionedStreamClient) Dial(ctx context.Context) error {
106129
p.mu.Lock()
@@ -161,7 +184,12 @@ func (p *partitionedStreamClient) Plan(
161184
return Topology{}, err
162185
}
163186
}
187+
return p.createTopology(spec)
188+
}
164189

190+
func (p *partitionedStreamClient) createTopology(
191+
spec streampb.ReplicationStreamSpec,
192+
) (Topology, error) {
165193
topology := Topology{
166194
SourceTenantID: spec.SourceTenantID,
167195
}

pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ package streamclient_test
1111
import (
1212
"context"
1313
"fmt"
14-
"net/url"
15-
"os"
1614
"strings"
1715
"testing"
1816
"time"
@@ -35,7 +33,6 @@ import (
3533
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3634
"github.com/cockroachdb/cockroach/pkg/util/log"
3735
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
38-
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3936
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4037
"github.com/cockroachdb/errors"
4138
"github.com/lib/pq"
@@ -62,6 +59,48 @@ func (f *subscriptionFeedSource) Error() error {
6259
// Close implements the streamingtest.FeedSource interface.
6360
func (f *subscriptionFeedSource) Close(ctx context.Context) {}
6461

62+
func TestPartitionedSpanConfigReplicationClient(t *testing.T) {
63+
defer leaktest.AfterTest(t)()
64+
defer log.Scope(t).Close(t)
65+
66+
h, cleanup := replicationtestutils.NewReplicationHelper(t,
67+
base.TestServerArgs{
68+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
69+
Knobs: base.TestingKnobs{
70+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
71+
},
72+
},
73+
)
74+
75+
defer cleanup()
76+
77+
testTenantName := roachpb.TenantName("test-tenant")
78+
tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
79+
defer cleanupTenant()
80+
81+
ctx := context.Background()
82+
83+
maybeInlineURL := h.MaybeGenerateInlineURL(t)
84+
client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL)
85+
require.NoError(t, err)
86+
defer func() {
87+
require.NoError(t, client.Close(ctx))
88+
}()
89+
90+
streamID, topology, err := client.SetupSpanConfigsStream(ctx, testTenantName)
91+
require.NoError(t, err)
92+
93+
require.NotEqual(t, 0, streamID)
94+
require.Equal(t, 1, len(topology.Partitions))
95+
require.Equal(t, tenant.ID, topology.SourceTenantID)
96+
97+
// Since a span config replication stream does not create a job, the HeartBeat
98+
// call will deem the stream inactive.
99+
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
100+
require.NoError(t, err)
101+
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)
102+
}
103+
65104
func TestPartitionedStreamReplicationClient(t *testing.T) {
66105
defer leaktest.AfterTest(t)()
67106
defer log.Scope(t).Close(t)
@@ -96,28 +135,7 @@ INSERT INTO d.t1 (i) VALUES (42);
96135
INSERT INTO d.t2 VALUES (2);
97136
`)
98137

99-
rng, _ := randutil.NewPseudoRand()
100-
maybeGenerateInlineURL := func(orig *url.URL) *url.URL {
101-
if rng.Float64() > 0.5 {
102-
return orig
103-
}
104-
105-
t.Log("using inline certificates")
106-
ret := *orig
107-
v := ret.Query()
108-
for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} {
109-
path := v.Get(opt)
110-
content, err := os.ReadFile(path)
111-
require.NoError(t, err)
112-
v.Set(opt, string(content))
113-
114-
}
115-
v.Set("sslinline", "true")
116-
ret.RawQuery = v.Encode()
117-
return &ret
118-
}
119-
120-
maybeInlineURL := maybeGenerateInlineURL(&h.PGUrl)
138+
maybeInlineURL := h.MaybeGenerateInlineURL(t)
121139
client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL)
122140
defer func() {
123141
require.NoError(t, client.Close(ctx))
@@ -142,6 +160,11 @@ INSERT INTO d.t2 VALUES (2);
142160
_, err = client.Plan(ctx, 999)
143161
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)
144162

163+
var telemetryJobID int64
164+
h.SysSQL.QueryRow(t, "SELECT crdb_internal.create_sql_schema_telemetry_job()").Scan(&telemetryJobID)
165+
_, err = client.Plan(ctx, streampb.StreamID(telemetryJobID))
166+
require.True(t, testutils.IsError(err, fmt.Sprintf("job with id %d is not a replication stream job", telemetryJobID)), err)
167+
145168
expectStreamState(streamID, jobs.StatusRunning)
146169
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
147170
require.NoError(t, err)

pkg/ccl/streamingccl/streamclient/random_stream_client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,13 @@ func (m *RandomStreamClient) Create(
393393
}, nil
394394
}
395395

396+
// SetupSpanConfigsStream implements the Client interface.
397+
func (m *RandomStreamClient) SetupSpanConfigsStream(
398+
ctx context.Context, tenant roachpb.TenantName,
399+
) (streampb.StreamID, Topology, error) {
400+
panic("SetupSpanConfigsStream not implemented")
401+
}
402+
396403
// Heartbeat implements the Client interface.
397404
func (m *RandomStreamClient) Heartbeat(
398405
ctx context.Context, _ streampb.StreamID, ts hlc.Timestamp,

pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ func (m *mockStreamClient) Create(
7171
panic("unimplemented")
7272
}
7373

74+
// SetupSpanConfigsStream implements the Client interface.
75+
func (m *mockStreamClient) SetupSpanConfigsStream(
76+
ctx context.Context, tenant roachpb.TenantName,
77+
) (streampb.StreamID, streamclient.Topology, error) {
78+
panic("unimplemented")
79+
}
80+
7481
// Dial implements the Client interface.
7582
func (m *mockStreamClient) Dial(_ context.Context) error {
7683
panic("unimplemented")

pkg/ccl/streamingccl/streamproducer/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ go_library(
3232
"//pkg/security/username",
3333
"//pkg/settings/cluster",
3434
"//pkg/sql",
35+
"//pkg/sql/catalog/descpb",
36+
"//pkg/sql/catalog/descs",
37+
"//pkg/sql/catalog/systemschema",
3538
"//pkg/sql/isql",
3639
"//pkg/sql/pgwire/pgcode",
3740
"//pkg/sql/pgwire/pgerror",
3841
"//pkg/sql/privilege",
42+
"//pkg/sql/sem/builtins",
3943
"//pkg/sql/sem/eval",
4044
"//pkg/sql/sem/tree",
4145
"//pkg/sql/syntheticprivilege",
@@ -96,6 +100,7 @@ go_test(
96100
"//pkg/sql",
97101
"//pkg/sql/catalog/descs",
98102
"//pkg/sql/catalog/desctestutils",
103+
"//pkg/sql/catalog/systemschema",
99104
"//pkg/sql/distsql",
100105
"//pkg/sql/isql",
101106
"//pkg/sql/sem/eval",

0 commit comments

Comments
 (0)