@@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
37
37
c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
38
38
defer cleanup ()
39
39
40
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
41
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE bar (i INT PRIMARY KEY)` )
42
+
43
+ offset , offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables (t , c )
44
+
40
45
producerJobID , ingestionJobID := c .StartStreamReplication (ctx )
41
46
42
47
jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
@@ -68,6 +73,11 @@ INSERT INTO a VALUES (1);
68
73
waitForPollerJobToStartDest (t , c , ingestionJobID )
69
74
observeValueInReaderTenant (t , c .ReaderTenantSQL )
70
75
76
+ var idWithOffsetCount int
77
+ c .ReaderTenantSQL .QueryRow (t , fmt .Sprintf ("SELECT count(*) FROM system.namespace where id = %d" , 50 + offset )).Scan (& idWithOffsetCount )
78
+ require .Equal (t , 1 , idWithOffsetCount , "expected to find namespace entry for table a with offset applied" )
79
+ offsetChecksInReaderTenant (c .ReaderTenantSQL )
80
+
71
81
// Failback and setup stanby reader tenant on the og source.
72
82
{
73
83
c .Cutover (ctx , producerJobID , ingestionJobID , srcTime .GoTime (), false )
@@ -102,7 +112,111 @@ INSERT INTO a VALUES (1);
102
112
var numTables int
103
113
srcReaderSQL .QueryRow (t , `SELECT count(*) FROM [SHOW TABLES]` ).Scan (& numTables )
104
114
observeValueInReaderTenant (t , srcReaderSQL )
115
+ offsetChecksInReaderTenant (srcReaderSQL )
116
+ }
117
+ }
118
+
119
+ func maybeOffsetReaderTenantSystemTables (
120
+ t * testing.T , c * replicationtestutils.TenantStreamingClusters ,
121
+ ) (int , func (sql * sqlutils.SQLRunner )) {
122
+ if c .Rng .Intn (2 ) == 0 {
123
+ return 0 , func (sql * sqlutils.SQLRunner ) {}
124
+ }
125
+ offset := 100000
126
+ c .DestSysSQL .Exec (t , fmt .Sprintf (`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d` , offset ))
127
+ // Set on source to ensure failback works well too.
128
+ c .SrcSysSQL .Exec (t , fmt .Sprintf (`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d` , offset ))
129
+
130
+ // swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
131
+ scaryTableIDRemapFunc := `
132
+ CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
133
+ $$
134
+ BEGIN
135
+ -- Rewrite the ID within the descriptor
136
+ SELECT crdb_internal.unsafe_upsert_descriptor(
137
+ newid,
138
+ crdb_internal.json_to_pb(
139
+ 'cockroach.sql.sqlbase.Descriptor',
140
+ d
141
+ ),
142
+ true
143
+ )
144
+ FROM (
145
+ SELECT id,
146
+ json_set(
147
+ json_set(
148
+ crdb_internal.pb_to_json(
149
+ 'cockroach.sql.sqlbase.Descriptor',
150
+ descriptor,
151
+ false
152
+ ),
153
+ ARRAY['table', 'id'],
154
+ newid::STRING::JSONB
155
+ ),
156
+ ARRAY['table', 'modificationTime'],
157
+ json_build_object(
158
+ 'wallTime',
159
+ (
160
+ (
161
+ extract('epoch', now())
162
+ * 1000000
163
+ )::INT8
164
+ * 1000
165
+ )::STRING
166
+ )
167
+ ) AS d
168
+ FROM system.descriptor
169
+ WHERE id IN (oldid,)
170
+ );
171
+ -- Update the namespace entry and delete the old descriptor.
172
+ SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
173
+ SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);
174
+
175
+ RETURN true;
176
+
177
+ END
178
+ $$ LANGUAGE PLpgSQL;`
179
+
180
+ c .SrcTenantSQL .Exec (t , scaryTableIDRemapFunc )
181
+ var txnInsightsID , privilegesID int
182
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'` ).Scan (& txnInsightsID )
183
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'privileges'` ).Scan (& privilegesID )
184
+ require .NotEqual (t , 0 , txnInsightsID )
185
+ require .NotEqual (t , 0 , privilegesID )
186
+
187
+ // renumber these two priv tables to be out of the way
188
+ txnInsightIDRemapedID := txnInsightsID + 1000
189
+ privilegesIDRemapedID := privilegesID + 1000
190
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , txnInsightsID , txnInsightIDRemapedID )
191
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , privilegesID , privilegesIDRemapedID )
192
+
193
+ // create two user tables on the source and interleave them with system table ids
194
+ var fooID , barID int
195
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'foo'` ).Scan (& fooID )
196
+ c .SrcTenantSQL .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'bar'` ).Scan (& barID )
197
+ require .NotEqual (t , 0 , fooID )
198
+ require .NotEqual (t , 0 , barID )
199
+
200
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , fooID , txnInsightsID )
201
+ c .SrcTenantSQL .Exec (t , `SELECT renumber_desc($1, $2)` , barID , privilegesID )
202
+
203
+ // Drop the function, to avoid hitting 152978
204
+ c .SrcTenantSQL .Exec (t , `DROP FUNCTION renumber_desc` )
205
+
206
+ offsetChecksInReaderTenant := func (sql * sqlutils.SQLRunner ) {
207
+ // Check that txn execution insights table is not at the same id as source as it's not replicated.
208
+ sql .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'` ).Scan (& txnInsightsID )
209
+ require .NotEqual (t , txnInsightIDRemapedID , txnInsightsID )
210
+
211
+ // On 25.3, the privs table is not replicated so the ids should differ.
212
+ sql .QueryRow (t , `SELECT id FROM system.namespace WHERE name = 'privileges'` ).Scan (& privilegesID )
213
+ require .NotEqual (t , privilegesIDRemapedID , privilegesID )
214
+ var count int
215
+ sql .QueryRow (t , `SELECT count(*) FROM system.namespace WHERE name = 'privileges'` ).Scan (& count )
216
+ require .Equal (t , 1 , count )
105
217
}
218
+
219
+ return offset , offsetChecksInReaderTenant
106
220
}
107
221
108
222
func observeValueInReaderTenant (t * testing.T , readerSQL * sqlutils.SQLRunner ) {
@@ -113,8 +227,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
113
227
var numTables int
114
228
readerSQL .QueryRow (t , `SELECT count(*) FROM [SHOW TABLES]` ).Scan (& numTables )
115
229
116
- if numTables != 1 {
117
- return errors .Errorf ("expected 1 table to be present in reader tenant, but got %d instead" , numTables )
230
+ if numTables != 3 {
231
+ return errors .Errorf ("expected 3 tables to be present in reader tenant, but got %d instead" , numTables )
118
232
}
119
233
120
234
var actualQueryResult int
@@ -175,6 +289,9 @@ func TestReaderTenantCutover(t *testing.T) {
175
289
c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
176
290
defer cleanup ()
177
291
292
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
293
+ c .SrcTenantSQL .Exec (t , `CREATE TABLE bar (i INT PRIMARY KEY)` )
294
+
178
295
producerJobID , ingestionJobID := c .StartStreamReplication (ctx )
179
296
180
297
jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
0 commit comments