@@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
37
37
c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
38
38
defer cleanup ()
39
39
40
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
41
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE bar (i INT PRIMARY KEY)` )
42
+
43
+ offset , offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables (t , c )
44
+
40
45
producerJobID , ingestionJobID := c .StartStreamReplication (ctx )
41
46
42
47
jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
@@ -68,6 +73,11 @@ INSERT INTO a VALUES (1);
68
73
waitForPollerJobToStartDest (t , c , ingestionJobID )
69
74
observeValueInReaderTenant (t , c .ReaderTenantSQL )
70
75
76
+ var idWithOffsetCount int
77
+ c .ReaderTenantSQL .QueryRow (t , fmt .Sprintf ("SELECT count(*) FROM system.namespace where id = %d" , 50 + offset )).Scan (& idWithOffsetCount )
78
+ require .Equal (t , 1 , idWithOffsetCount , "expected to find namespace entry for table a with offset applied" )
79
+ offsetChecksInReaderTenant (c .ReaderTenantSQL )
80
+
71
81
// Failback and setup stanby reader tenant on the og source.
72
82
{
73
83
c .Cutover (ctx , producerJobID , ingestionJobID , srcTime .GoTime (), false )
@@ -101,7 +111,111 @@ INSERT INTO a VALUES (1);
101
111
var numTables int
102
112
srcReaderSQL .QueryRow (t , `SELECT count(*) FROM [SHOW TABLES]` ).Scan (& numTables )
103
113
observeValueInReaderTenant (t , srcReaderSQL )
114
+ offsetChecksInReaderTenant (srcReaderSQL )
115
+ }
116
+ }
117
+
118
+ func maybeOffsetReaderTenantSystemTables (
119
+ t * testing.T , c * replicationtestutils.TenantStreamingClusters ,
120
+ ) (int , func (sql * sqlutils.SQLRunner )) {
121
+ if c .Rng .Intn (2 ) == 0 {
122
+ return 0 , func (sql * sqlutils.SQLRunner ) {}
123
+ }
124
+ offset := 100000
125
+ c .DestSysSQL .Exec (t , fmt .Sprintf (`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d` , offset ))
126
+ // Set on source to ensure failback works well too.
127
+ c .SrcSysSQL .Exec (t , fmt .Sprintf (`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d` , offset ))
128
+
129
+ // swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
130
+ scaryTableIDRemapFunc := `
131
+ CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
132
+ $$
133
+ BEGIN
134
+ -- Rewrite the ID within the descriptor
135
+ SELECT crdb_internal.unsafe_upsert_descriptor(
136
+ newid,
137
+ crdb_internal.json_to_pb(
138
+ 'cockroach.sql.sqlbase.Descriptor',
139
+ d
140
+ ),
141
+ true
142
+ )
143
+ FROM (
144
+ SELECT id,
145
+ json_set(
146
+ json_set(
147
+ crdb_internal.pb_to_json(
148
+ 'cockroach.sql.sqlbase.Descriptor',
149
+ descriptor,
150
+ false
151
+ ),
152
+ ARRAY['table', 'id'],
153
+ newid::STRING::JSONB
154
+ ),
155
+ ARRAY['table', 'modificationTime'],
156
+ json_build_object(
157
+ 'wallTime',
158
+ (
159
+ (
160
+ extract('epoch', now())
161
+ * 1000000
162
+ )::INT8
163
+ * 1000
164
+ )::STRING
165
+ )
166
+ ) AS d
167
+ FROM system.descriptor
168
+ WHERE id IN (oldid,)
169
+ );
170
+ -- Update the namespace entry and delete the old descriptor.
171
+ SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
172
+ SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);
173
+
174
+ RETURN true;
175
+
176
+ END
177
+ $$ LANGUAGE PLpgSQL;`
178
+
179
+ c .SrcTenantSQL .Exec (t , scaryTableIDRemapFunc )
180
+ var txnInsightsID , privilegesID int
181
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'` ).Scan (& txnInsightsID )
182
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'privileges'` ).Scan (& privilegesID )
183
+ require .NotEqual (t , 0 , txnInsightsID )
184
+ require .NotEqual (t , 0 , privilegesID )
185
+
186
+ // renumber these two priv tables to be out of the way
187
+ txnInsightIDRemapedID := txnInsightsID + 1000
188
+ privilegesIDRemapedID := privilegesID + 1000
189
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , txnInsightsID , txnInsightIDRemapedID )
190
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , privilegesID , privilegesIDRemapedID )
191
+
192
+ // create two user tables on the source and interleave them with system table ids
193
+ var fooID , barID int
194
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'foo'` ).Scan (& fooID )
195
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'bar'` ).Scan (& barID )
196
+ require .NotEqual (t , 0 , fooID )
197
+ require .NotEqual (t , 0 , barID )
198
+
199
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , fooID , txnInsightsID )
200
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , barID , privilegesID )
201
+
202
+ // Drop the function, to avoid hitting 152978
203
+ c .SrcTenantSQL .Exec (t , `DROP FUNCTION renumber_desc` )
204
+
205
+ offsetChecksInReaderTenant := func (sql * sqlutils.SQLRunner ) {
206
+ // Check that txn execution insights table is not at the same id as source as it's not replicated.
207
+ sql .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'` ).Scan (& txnInsightsID )
208
+ require .NotEqual (t , txnInsightIDRemapedID , txnInsightsID )
209
+
210
+ // On 25.3, the privs table is not replicated so the ids should differ.
211
+ sql .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'privileges'` ).Scan (& privilegesID )
212
+ require .NotEqual (t , privilegesIDRemapedID , privilegesID )
213
+ var count int
214
+ sql .QueryRow (t , `SELECT count(*) FROM system.namespace WHERE name = 'privileges'` ).Scan (& count )
215
+ require .Equal (t , 1 , count )
104
216
}
217
+
218
+ return offset , offsetChecksInReaderTenant
105
219
}
106
220
107
221
func observeValueInReaderTenant (t * testing.T , readerSQL * sqlutils.SQLRunner ) {
@@ -112,8 +226,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
112
226
var numTables int
113
227
readerSQL .QueryRow (t , `SELECT count(*) FROM [SHOW TABLES]` ).Scan (& numTables )
114
228
115
- if numTables != 1 {
116
- return errors .Errorf ("expected 1 table to be present in reader tenant, but got %d instead" , numTables )
229
+ if numTables != 3 {
230
+ return errors .Errorf ("expected 3 tables to be present in reader tenant, but got %d instead" , numTables )
117
231
}
118
232
119
233
var actualQueryResult int
@@ -174,6 +288,9 @@ func TestReaderTenantCutover(t *testing.T) {
174
288
c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
175
289
defer cleanup ()
176
290
291
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
292
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE bar (i INT PRIMARY KEY)` )
293
+
177
294
producerJobID , ingestionJobID := c .StartStreamReplication (ctx )
178
295
179
296
jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
0 commit comments