@@ -784,8 +784,7 @@ func (rd *replicationDriver) ensureStandbyPollerAdvances(ctx context.Context, in
784
784
785
785
info , err := getStreamIngestionJobInfo (rd .setup .dst .db , ingestionJobID )
786
786
require .NoError (rd .t , err )
787
- pcrReplicatedTime := info .GetHighWater ()
788
- require .False (rd .t , pcrReplicatedTime .IsZero (), "PCR job has no replicated time" )
787
+ initialPCRReplicatedTime := info .GetHighWater ()
789
788
790
789
// Connect to the reader tenant
791
790
readerTenantName := fmt .Sprintf ("%s-readonly" , rd .setup .dst .name )
@@ -795,28 +794,28 @@ func (rd *replicationDriver) ensureStandbyPollerAdvances(ctx context.Context, in
795
794
796
795
// Poll the standby poller job until its high water timestamp matches the PCR job's replicated time
797
796
testutils .SucceedsWithin (rd .t , func () error {
798
- var standbyHighWaterStr string
797
+ var standbyTimeStr string
799
798
readerTenantSQL .QueryRow (rd .t ,
800
799
`SELECT COALESCE(high_water_timestamp, '0')
801
800
FROM crdb_internal.jobs
802
- WHERE job_type = 'STANDBY READ TS POLLER'` ).Scan (& standbyHighWaterStr )
801
+ WHERE job_type = 'STANDBY READ TS POLLER'` ).Scan (& standbyTimeStr )
803
802
804
- if standbyHighWaterStr == "0" {
803
+ if standbyTimeStr == "0" {
805
804
return errors .New ("standby poller job not found or has no high water timestamp" )
806
805
}
807
806
808
- standbyHighWater := DecimalTimeToHLC (rd .t , standbyHighWaterStr )
809
- standbyHighWaterTime := standbyHighWater .GoTime ()
807
+ standbyHLC := DecimalTimeToHLC (rd .t , standbyTimeStr )
808
+ standbyTime := standbyHLC .GoTime ()
810
809
811
- rd .t .L ().Printf ("Standby poller high water: %s; replicated time %s" , standbyHighWaterTime , pcrReplicatedTime )
810
+ rd .t .L ().Printf ("Standby poller high water: %s; replicated time %s" , standbyTime , initialPCRReplicatedTime )
812
811
813
- if standbyHighWaterTime .Compare (pcrReplicatedTime ) >= 0 {
812
+ if standbyTime .Compare (initialPCRReplicatedTime ) >= 0 {
814
813
rd .t .L ().Printf ("Standby poller has advanced to PCR replicated time" )
815
814
return nil
816
815
}
817
816
818
817
return errors .Newf ("standby poller high water %s not yet at PCR replicated time %s" ,
819
- standbyHighWaterTime , pcrReplicatedTime )
818
+ standbyTime , initialPCRReplicatedTime )
820
819
}, 5 * time .Minute )
821
820
}
822
821
@@ -1019,6 +1018,61 @@ func (rd *replicationDriver) maybeRunSchemaChangeWorkload(
1019
1018
}
1020
1019
}
1021
1020
1021
+ // maybeRestartReaderTenantService restarts the reader tenant service if
1022
+ // physical_cluster_replication.reader_system_table_id_offset was set, as the
1023
+ // namespace cache needs to be rehydrated after the reader tenant ingests the
1024
+ // priviledge table at a higher id.
1025
+ func (rd * replicationDriver ) maybeRestartReaderTenantService (ctx context.Context ) {
1026
+ if rd .rs .withReaderWorkload == nil {
1027
+ // No reader tenant configured, nothing to do
1028
+ return
1029
+ }
1030
+
1031
+ // Check if the reader system table ID offset setting is configured
1032
+ var offsetValue int
1033
+ rd .setup .dst .sysSQL .QueryRow (rd .t , "SHOW CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset" ).Scan (& offsetValue )
1034
+
1035
+ if offsetValue == 0 {
1036
+ rd .t .L ().Printf ("reader_system_table_id_offset not set, skipping reader tenant service restart" )
1037
+ return
1038
+ }
1039
+ readerTenantName := fmt .Sprintf ("%s-readonly" , rd .setup .dst .name )
1040
+
1041
+ // Wait for the reader tenant to be in the correct data state and service mode before restarting.
1042
+ testutils .SucceedsSoon (rd .t , func () error {
1043
+ var dataState , serviceMode string
1044
+ rd .setup .dst .sysSQL .QueryRow (rd .t , fmt .Sprintf ("SELECT data_state, service_mode FROM [SHOW TENANTS] WHERE name = '%s'" , readerTenantName )).Scan (& dataState , & serviceMode )
1045
+ if dataState != "ready" {
1046
+ return errors .Newf ("reader tenant %q data state is %q, expected 'ready'" , readerTenantName , dataState )
1047
+ }
1048
+ if serviceMode != "shared" {
1049
+ return errors .Newf ("reader tenant %q service mode is %q, expected 'shared'" , readerTenantName , serviceMode )
1050
+ }
1051
+ return nil
1052
+ })
1053
+
1054
+ rd .t .Status ("restarting reader tenant service" )
1055
+
1056
+ // Stop the reader tenant service
1057
+ rd .setup .dst .sysSQL .Exec (rd .t , fmt .Sprintf ("ALTER VIRTUAL CLUSTER '%s' STOP SERVICE" , readerTenantName ))
1058
+
1059
+ // Wait for the service to fully stop
1060
+ testutils .SucceedsSoon (rd .t , func () error {
1061
+ // Try to connect to the reader tenant - if it fails, the service is stopped
1062
+ conn := rd .c .Conn (ctx , rd .t .L (), rd .setup .dst .gatewayNodes [0 ], option .VirtualClusterName (readerTenantName ))
1063
+ defer conn .Close ()
1064
+ if err := conn .Ping (); err == nil {
1065
+ return errors .Newf ("reader tenant %q still accepting connections" , readerTenantName )
1066
+ }
1067
+ return nil
1068
+ })
1069
+
1070
+ // Start the service back up
1071
+ rd .setup .dst .sysSQL .Exec (rd .t , fmt .Sprintf ("ALTER VIRTUAL CLUSTER '%s' START SERVICE SHARED" , readerTenantName ))
1072
+
1073
+ rd .t .L ().Printf ("successfully restarted reader tenant service" )
1074
+ }
1075
+
1022
1076
// checkParticipatingNodes asserts that multiple nodes in the source and dest cluster are
1023
1077
// participating in the replication stream.
1024
1078
//
@@ -1136,6 +1190,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
1136
1190
rd .t .Status (fmt .Sprintf (`initial scan complete. run workload and repl. stream for another %s minutes` ,
1137
1191
rd .rs .additionalDuration ))
1138
1192
1193
+ rd .maybeRestartReaderTenantService (ctx )
1139
1194
rd .maybeRunSchemaChangeWorkload (ctx , workloadMonitor )
1140
1195
rd .maybeRunReaderTenantWorkload (ctx , workloadMonitor )
1141
1196
@@ -1153,7 +1208,6 @@ func (rd *replicationDriver) main(ctx context.Context) {
1153
1208
return
1154
1209
}
1155
1210
rd .ensureStandbyPollerAdvances (ctx , ingestionJobID )
1156
-
1157
1211
rd .checkParticipatingNodes (ctx , ingestionJobID )
1158
1212
1159
1213
retainedTime := rd .getReplicationRetainedTime ()
0 commit comments