Skip to content

Commit 0429616

Browse files
committed
crosscluster/physical: add reader tenant system table id offset setting
This patch adds the private physical_cluster_replication.reader_system_table_id_offset setting, which a pcr customer can set on the destination system tenant to some very large number, like 1,000,000, which will bootstrap the reader tenant with dynamically allocated system table ids to be offset+i. This setting can be set when the reader tenant fails to start up because a source table id collides with a system table id. Informs #152909 Release note: none
1 parent 008a5f4 commit 0429616

File tree

2 files changed

+132
-3
lines changed

2 files changed

+132
-3
lines changed

pkg/crosscluster/physical/standby_read_ts_poller_job_test.go

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
3737
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
3838
defer cleanup()
3939

40+
c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
41+
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)
42+
43+
offset, offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables(t, c)
44+
4045
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
4146

4247
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
@@ -71,6 +76,11 @@ INSERT INTO a VALUES (1);
7176
observeValueInReaderTenant(t, c.ReaderTenantSQL)
7277
waitForPollerTimeToAdvance(t, c.ReaderTenantSQL, pollerResolvedTime)
7378

79+
var idWithOffsetCount int
80+
c.ReaderTenantSQL.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM system.namespace where id = %d", 50+offset)).Scan(&idWithOffsetCount)
81+
require.Equal(t, 1, idWithOffsetCount, "expected to find namespace entry for table a with offset applied")
82+
offsetChecksInReaderTenant(c.ReaderTenantSQL)
83+
7484
// Failback and setup stanby reader tenant on the og source.
7585
{
7686
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
@@ -105,9 +115,115 @@ INSERT INTO a VALUES (1);
105115
var numTables int
106116
srcReaderSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
107117
observeValueInReaderTenant(t, srcReaderSQL)
118+
offsetChecksInReaderTenant(srcReaderSQL)
108119
}
109120
}
110121

122+
func maybeOffsetReaderTenantSystemTables(
123+
t *testing.T, c *replicationtestutils.TenantStreamingClusters,
124+
) (int, func(sql *sqlutils.SQLRunner)) {
125+
if c.Rng.Intn(2) == 0 {
126+
return 0, func(sql *sqlutils.SQLRunner) {}
127+
}
128+
offset := 100000
129+
c.DestSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
130+
// Set on source to ensure failback works well too.
131+
c.SrcSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
132+
133+
// swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
134+
scaryTableIDRemapFunc := `
135+
CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
136+
$$
137+
BEGIN
138+
-- Rewrite the ID within the descriptor
139+
SELECT crdb_internal.unsafe_upsert_descriptor(
140+
newid,
141+
crdb_internal.json_to_pb(
142+
'cockroach.sql.sqlbase.Descriptor',
143+
d
144+
),
145+
true
146+
)
147+
FROM (
148+
SELECT id,
149+
json_set(
150+
json_set(
151+
crdb_internal.pb_to_json(
152+
'cockroach.sql.sqlbase.Descriptor',
153+
descriptor,
154+
false
155+
),
156+
ARRAY['table', 'id'],
157+
newid::STRING::JSONB
158+
),
159+
ARRAY['table', 'modificationTime'],
160+
json_build_object(
161+
'wallTime',
162+
(
163+
(
164+
extract('epoch', now())
165+
* 1000000
166+
)::INT8
167+
* 1000
168+
)::STRING
169+
)
170+
) AS d
171+
FROM system.descriptor
172+
WHERE id IN (oldid,)
173+
);
174+
-- Update the namespace entry and delete the old descriptor.
175+
SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
176+
SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);
177+
178+
RETURN true;
179+
180+
END
181+
$$ LANGUAGE PLpgSQL;`
182+
183+
c.SrcTenantSQL.Exec(t, scaryTableIDRemapFunc)
184+
var txnInsightsID, privilegesID int
185+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
186+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
187+
require.NotEqual(t, 0, txnInsightsID)
188+
require.NotEqual(t, 0, privilegesID)
189+
190+
// renumber these two priv tables to be out of the way
191+
txnInsightIDRemapedID := txnInsightsID + 1000
192+
privilegesIDRemapedID := privilegesID + 1000
193+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, txnInsightsID, txnInsightIDRemapedID)
194+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, privilegesID, privilegesIDRemapedID)
195+
196+
// create two user tables on the source and interleave them with system table ids
197+
var fooID, barID int
198+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&fooID)
199+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'bar'`).Scan(&barID)
200+
require.NotEqual(t, 0, fooID)
201+
require.NotEqual(t, 0, barID)
202+
203+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, fooID, txnInsightsID)
204+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, barID, privilegesID)
205+
206+
// Drop the function, to avoid hitting 152978
207+
c.SrcTenantSQL.Exec(t, `DROP FUNCTION renumber_desc`)
208+
209+
offsetChecksInReaderTenant := func(sql *sqlutils.SQLRunner) {
210+
// Check that txn execution insights table is not at the same id as source as it's not replicated.
211+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
212+
require.NotEqual(t, txnInsightIDRemapedID, txnInsightsID)
213+
214+
// Check that the privildges table is at the same id as source, since it is
215+
// replicated. This also implies that the og priviliges table created during
216+
// reader tenant bootstrapping at id+offset was removed.
217+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
218+
require.Equal(t, privilegesIDRemapedID, privilegesID)
219+
var count int
220+
sql.QueryRow(t, `SELECT count(*) FROM system.namespace WHERE name = 'privileges'`).Scan(&count)
221+
require.Equal(t, 1, count)
222+
}
223+
224+
return offset, offsetChecksInReaderTenant
225+
}
226+
111227
func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
112228
now := timeutil.Now()
113229
// Verify that updates have been replicated to reader tenant. This may take a
@@ -116,8 +232,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
116232
var numTables int
117233
readerSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
118234

119-
if numTables != 1 {
120-
return errors.Errorf("expected 1 table to be present in reader tenant, but got %d instead", numTables)
235+
if numTables != 3 {
236+
return errors.Errorf("expected 3 tables to be present in reader tenant, but got %d instead", numTables)
121237
}
122238

123239
var actualQueryResult int

pkg/crosscluster/physical/stream_ingestion_planning.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package physical
77

88
import (
99
"context"
10+
"math"
1011

1112
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
1213
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
@@ -17,6 +18,7 @@ import (
1718
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
21+
"github.com/cockroachdb/cockroach/pkg/settings"
2022
"github.com/cockroachdb/cockroach/pkg/sql"
2123
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
2224
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
@@ -34,6 +36,16 @@ import (
3436
// replicated data will be retained.
3537
const defaultRetentionTTLSeconds = int32(4 * 60 * 60)
3638

39+
var readerTenantSystemTableIDOffset = settings.RegisterIntSetting(
40+
settings.ApplicationLevel,
41+
"physical_cluster_replication.reader_system_table_id_offset",
42+
"the offset added to dynamically allocated system table IDs in the reader tenant",
43+
0,
44+
// Max offset is 1000 less than MaxUint32 to leave room 1000 dynamically
45+
// allocated system table ids. Hope that never happens.
46+
settings.NonNegativeIntWithMaximum(math.MaxUint32-1000),
47+
)
48+
3749
func streamIngestionJobDescription(
3850
p sql.PlanHookState,
3951
source streamclient.ConfigUri,
@@ -331,7 +343,8 @@ func createReaderTenant(
331343
}
332344

333345
readerInfo.ID = readerID.ToUint64()
334-
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, 0)
346+
systemTableIDOffset := readerTenantSystemTableIDOffset.Get(&p.ExecCfg().Settings.SV)
347+
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, uint32(systemTableIDOffset))
335348
if err != nil {
336349
return readerID, err
337350
}

0 commit comments

Comments
 (0)