Skip to content

Commit 89241b1

Browse files
committed
crosscluster/physical: add reader tenant system table id offset setting
This patch adds the private physical_cluster_replication.reader_system_table_id_offset setting, which a pcr customer can set on the destination system tenant to some very large number, like 1,000,000, which will bootstrap the reader tenant with dynamically allocated system table ids to be offset+i. This setting can be set when the reader tenant fails to start up because a source table id collides with a system table id. Informs #152909 Release note: none
1 parent d998594 commit 89241b1

File tree

2 files changed

+133
-3
lines changed

2 files changed

+133
-3
lines changed

pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
3737
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
3838
defer cleanup()
3939

40+
c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
41+
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)
42+
43+
offset, offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables(t, c)
44+
4045
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
4146

4247
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
@@ -68,6 +73,11 @@ INSERT INTO a VALUES (1);
6873
waitForPollerJobToStartDest(t, c, ingestionJobID)
6974
observeValueInReaderTenant(t, c.ReaderTenantSQL)
7075

76+
var idWithOffsetCount int
77+
c.ReaderTenantSQL.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM system.namespace where id = %d", 50+offset)).Scan(&idWithOffsetCount)
78+
require.Equal(t, 1, idWithOffsetCount, "expected to find namespace entry for table a with offset applied")
79+
offsetChecksInReaderTenant(c.ReaderTenantSQL)
80+
7181
// Failback and setup stanby reader tenant on the og source.
7282
{
7383
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
@@ -101,7 +111,111 @@ INSERT INTO a VALUES (1);
101111
var numTables int
102112
srcReaderSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
103113
observeValueInReaderTenant(t, srcReaderSQL)
114+
offsetChecksInReaderTenant(srcReaderSQL)
115+
}
116+
}
117+
118+
func maybeOffsetReaderTenantSystemTables(
119+
t *testing.T, c *replicationtestutils.TenantStreamingClusters,
120+
) (int, func(sql *sqlutils.SQLRunner)) {
121+
if c.Rng.Intn(2) == 0 {
122+
return 0, func(sql *sqlutils.SQLRunner) {}
123+
}
124+
offset := 100000
125+
c.DestSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
126+
// Set on source to ensure failback works well too.
127+
c.SrcSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
128+
129+
// swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
130+
scaryTableIDRemapFunc := `
131+
CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
132+
$$
133+
BEGIN
134+
-- Rewrite the ID within the descriptor
135+
SELECT crdb_internal.unsafe_upsert_descriptor(
136+
newid,
137+
crdb_internal.json_to_pb(
138+
'cockroach.sql.sqlbase.Descriptor',
139+
d
140+
),
141+
true
142+
)
143+
FROM (
144+
SELECT id,
145+
json_set(
146+
json_set(
147+
crdb_internal.pb_to_json(
148+
'cockroach.sql.sqlbase.Descriptor',
149+
descriptor,
150+
false
151+
),
152+
ARRAY['table', 'id'],
153+
newid::STRING::JSONB
154+
),
155+
ARRAY['table', 'modificationTime'],
156+
json_build_object(
157+
'wallTime',
158+
(
159+
(
160+
extract('epoch', now())
161+
* 1000000
162+
)::INT8
163+
* 1000
164+
)::STRING
165+
)
166+
) AS d
167+
FROM system.descriptor
168+
WHERE id IN (oldid,)
169+
);
170+
-- Update the namespace entry and delete the old descriptor.
171+
SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
172+
SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);
173+
174+
RETURN true;
175+
176+
END
177+
$$ LANGUAGE PLpgSQL;`
178+
179+
c.SrcTenantSQL.Exec(t, scaryTableIDRemapFunc)
180+
var txnInsightsID, privilegesID int
181+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
182+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
183+
require.NotEqual(t, 0, txnInsightsID)
184+
require.NotEqual(t, 0, privilegesID)
185+
186+
// renumber these two priv tables to be out of the way
187+
txnInsightIDRemapedID := txnInsightsID + 1000
188+
privilegesIDRemapedID := privilegesID + 1000
189+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, txnInsightsID, txnInsightIDRemapedID)
190+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, privilegesID, privilegesIDRemapedID)
191+
192+
// create two user tables on the source and interleave them with system table ids
193+
var fooID, barID int
194+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&fooID)
195+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'bar'`).Scan(&barID)
196+
require.NotEqual(t, 0, fooID)
197+
require.NotEqual(t, 0, barID)
198+
199+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, fooID, txnInsightsID)
200+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, barID, privilegesID)
201+
202+
// Drop the function, to avoid hitting 152978
203+
c.SrcTenantSQL.Exec(t, `DROP FUNCTION renumber_desc`)
204+
205+
offsetChecksInReaderTenant := func(sql *sqlutils.SQLRunner) {
206+
// Check that txn execution insights table is not at the same id as source as it's not replicated.
207+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
208+
require.NotEqual(t, txnInsightIDRemapedID, txnInsightsID)
209+
210+
// On 25.3, the privs table is not replicated so the ids should differ.
211+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
212+
require.NotEqual(t, privilegesIDRemapedID, privilegesID)
213+
var count int
214+
sql.QueryRow(t, `SELECT count(*) FROM system.namespace WHERE name = 'privileges'`).Scan(&count)
215+
require.Equal(t, 1, count)
104216
}
217+
218+
return offset, offsetChecksInReaderTenant
105219
}
106220

107221
func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
@@ -112,8 +226,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
112226
var numTables int
113227
readerSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
114228

115-
if numTables != 1 {
116-
return errors.Errorf("expected 1 table to be present in reader tenant, but got %d instead", numTables)
229+
if numTables != 3 {
230+
return errors.Errorf("expected 3 tables to be present in reader tenant, but got %d instead", numTables)
117231
}
118232

119233
var actualQueryResult int
@@ -174,6 +288,9 @@ func TestReaderTenantCutover(t *testing.T) {
174288
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
175289
defer cleanup()
176290

291+
c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
292+
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)
293+
177294
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
178295

179296
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))

pkg/ccl/crosscluster/physical/stream_ingestion_planning.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package physical
77

88
import (
99
"context"
10+
"math"
1011

1112
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
1213
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
@@ -18,6 +19,7 @@ import (
1819
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1920
"github.com/cockroachdb/cockroach/pkg/roachpb"
2021
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
22+
"github.com/cockroachdb/cockroach/pkg/settings"
2123
"github.com/cockroachdb/cockroach/pkg/sql"
2224
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
2325
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
@@ -33,6 +35,16 @@ import (
3335
// replicated data will be retained.
3436
const defaultRetentionTTLSeconds = int32(4 * 60 * 60)
3537

38+
var readerTenantSystemTableIDOffset = settings.RegisterIntSetting(
39+
settings.ApplicationLevel,
40+
"physical_cluster_replication.reader_system_table_id_offset",
41+
"the offset added to dynamically allocated system table IDs in the reader tenant",
42+
0,
43+
// Max offset is 1000 less than MaxUint32 to leave room 1000 dynamically
44+
// allocated system table ids. Hope that never happens.
45+
settings.NonNegativeIntWithMaximum(math.MaxUint32-1000),
46+
)
47+
3648
// CannotSetExpirationWindowErr get returned if the user attempts to specify the
3749
// EXPIRATION WINDOW option to create a replication stream, as this job setting
3850
// should only be set from the producer cluster.
@@ -333,7 +345,8 @@ func createReaderTenant(
333345
}
334346

335347
readerInfo.ID = readerID.ToUint64()
336-
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, 0)
348+
systemTableIDOffset := readerTenantSystemTableIDOffset.Get(&p.ExecCfg().Settings.SV)
349+
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, uint32(systemTableIDOffset))
337350
if err != nil {
338351
return readerID, err
339352
}

0 commit comments

Comments
 (0)