@@ -26,7 +26,6 @@ import (
2626 "github.com/cockroachdb/cockroach/pkg/keys"
2727 "github.com/cockroachdb/cockroach/pkg/kv"
2828 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
29- "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
3029 "github.com/cockroachdb/cockroach/pkg/roachpb"
3130 "github.com/cockroachdb/cockroach/pkg/security/username"
3231 "github.com/cockroachdb/cockroach/pkg/server"
@@ -952,172 +951,105 @@ func TestStreamingReplanOnLag(t *testing.T) {
952951 c .WaitUntilReplicatedTime (cutoverTime , jobspb .JobID (ingestionJobID ))
953952}
954953
955- // TestProtectedTimestampManagement tests the active protected
956- // timestamps management on the destination tenant's keyspan.
954+ // TestProtectedTimestampManagement tests that the destination tenant's
955+ // protected timestamp is created when the ingestion job starts and
956+ // released when the job completes or is canceled.
957+ //
958+ // Note: The PTS formula (protectedTS = replicatedTime - TTL) is unit tested
959+ // in TestResolveHeartbeatTime. Producer PTS management is tested in
960+ // TestTenantStreamingCancelIngestion.
957961func TestProtectedTimestampManagement (t * testing.T ) {
958962 defer leaktest .AfterTest (t )()
959963 defer log .Scope (t ).Close (t )
960964
961- skip .UnderRace (t , "slow test" ) // takes >1mn under race.
962-
963965 ctx := context .Background ()
964966 args := replicationtestutils .DefaultTenantStreamingClustersArgs
965- // Override the replication job details ReplicationTTLSeconds to a small value
966- // so that every progress update results in a protected timestamp update.
967- args .RetentionTTLSeconds = 1
968-
969- testutils .RunTrueAndFalse (t , "pause-before-terminal" , func (t * testing.T , pauseBeforeTerminal bool ) {
970- testutils .RunTrueAndFalse (t , "complete-replication" , func (t * testing.T , completeReplication bool ) {
971-
972- // waitForProducerProtection asserts that there is a PTS record protecting
973- // the source tenant. We ensure the PTS record is protecting a timestamp
974- // greater or equal to the frontier we know we have replicated up until.
975- waitForProducerProtection := func (c * replicationtestutils.TenantStreamingClusters , frontier hlc.Timestamp , producerJobID int ) {
976- testutils .SucceedsSoon (t , func () error {
977- srv := c .SrcSysServer
978- job , err := srv .JobRegistry ().(* jobs.Registry ).LoadJob (ctx , jobspb .JobID (producerJobID ))
979- if err != nil {
980- return err
981- }
982- ptsRecordID := job .Payload ().Details .(* jobspb.Payload_StreamReplication ).StreamReplication .ProtectedTimestampRecordID
983- ptsProvider := srv .ExecutorConfig ().(sql.ExecutorConfig ).ProtectedTimestampProvider
984-
985- var ptsRecord * ptpb.Record
986- if err := srv .InternalDB ().(descs.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
987- var err error
988- ptsRecord , err = ptsProvider .WithTxn (txn ).GetRecord (ctx , ptsRecordID )
989- return err
990- }); err != nil {
991- return err
992- }
993- if ptsRecord .Timestamp .Less (frontier ) {
994- return errors .Newf ("protection is at %s, expected to be >= %s" ,
995- ptsRecord .Timestamp .String (), frontier .String ())
996- }
997- return nil
998- })
999- }
1000-
1001- // checkNoDestinationProtections asserts that there is no PTS record
1002- // protecting the destination tenant.
1003- checkNoDestinationProtection := func (c * replicationtestutils.TenantStreamingClusters , replicationJobID int ) {
1004- execCfg := c .DestSysServer .ExecutorConfig ().(sql.ExecutorConfig )
1005- require .NoError (t , c .DestSysServer .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
1006- j , err := execCfg .JobRegistry .LoadJobWithTxn (ctx , jobspb .JobID (replicationJobID ), txn )
1007- require .NoError (t , err )
1008- payload := j .Payload ()
1009- replicationDetails := payload .GetStreamIngestion ()
1010- ptp := execCfg .ProtectedTimestampProvider .WithTxn (txn )
1011- _ , err = ptp .GetRecord (ctx , * replicationDetails .ProtectedTimestampRecordID )
1012- require .EqualError (t , err , protectedts .ErrNotExists .Error ())
1013- return nil
1014- }))
1015- }
1016- checkDestinationProtection := func (c * replicationtestutils.TenantStreamingClusters , frontier hlc.Timestamp , replicationJobID int ) {
1017- execCfg := c .DestSysServer .ExecutorConfig ().(sql.ExecutorConfig )
1018- ptp := execCfg .ProtectedTimestampProvider
1019- require .NoError (t , c .DestSysServer .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
1020- j , err := execCfg .JobRegistry .LoadJobWithTxn (ctx , jobspb .JobID (replicationJobID ), txn )
1021- if err != nil {
1022- return err
1023- }
1024- payload := j .Payload ()
1025- progress := j .Progress ()
1026- replicationDetails := payload .GetStreamIngestion ()
1027-
1028- require .NotNil (t , replicationDetails .ProtectedTimestampRecordID )
1029- rec , err := ptp .WithTxn (txn ).GetRecord (ctx , * replicationDetails .ProtectedTimestampRecordID )
1030- if err != nil {
1031- return err
1032- }
1033-
1034- replicatedTime := replicationutils .ReplicatedTimeFromProgress (& progress )
1035- require .True (t , frontier .LessEq (replicatedTime ))
1036-
1037- roundedReplicatedTime := replicatedTime .GoTime ().Round (time .Millisecond )
1038- roundedProtectedTime := rec .Timestamp .GoTime ().Round (time .Millisecond )
1039- window := roundedReplicatedTime .Sub (roundedProtectedTime )
1040- require .Equal (t , time .Second , window )
1041- return nil
1042- }))
1043- }
967+ // Disable metamorphic external connection since we reuse clusters across
968+ // subtests and the external connection would already exist on subsequent runs.
969+ args .NoMetamorphicExternalConnection = true
1044970
1045- c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
1046- defer cleanup ()
1047-
1048- c .DestSysSQL .Exec (t , "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'" )
1049- c .DestSysSQL .Exec (t , "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';" )
1050-
1051- producerJobID , replicationJobID := c .StartStreamReplication (ctx )
1052-
1053- jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
1054- jobutils .WaitForJobToRun (c .T , c .DestSysSQL , jobspb .JobID (replicationJobID ))
1055-
1056- // Ensure that we wait at least a second so that the gap between the first
1057- // time we write the protected timestamp (t1) during replication job
1058- // startup, and the first progress update (t2) is greater than 1s. This is
1059- // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we
1060- // will not update the PTS record.
1061- now := c .SrcCluster .Server (0 ).SystemLayer ().Clock ().Now ().Add (int64 (time .Second )* 2 , 0 )
1062- c .WaitUntilReplicatedTime (now , jobspb .JobID (replicationJobID ))
1063-
1064- // Check that the producer and replication job have written a protected
1065- // timestamp.
1066- waitForProducerProtection (c , now , producerJobID )
1067- checkDestinationProtection (c , now , replicationJobID )
1068-
1069- now2 := now .Add (time .Second .Nanoseconds (), 0 )
1070- c .WaitUntilReplicatedTime (now2 , jobspb .JobID (replicationJobID ))
1071- // Let the replication progress for a second before checking that the
1072- // protected timestamp record has also been updated on the destination
1073- // cluster. This update happens in the same txn in which we update the
1074- // replication job's progress.
1075- waitForProducerProtection (c , now2 , producerJobID )
1076- checkDestinationProtection (c , now2 , replicationJobID )
1077-
1078- if pauseBeforeTerminal {
1079- c .DestSysSQL .Exec (t , fmt .Sprintf ("PAUSE JOB %d" , replicationJobID ))
1080- jobutils .WaitForJobToPause (c .T , c .DestSysSQL , jobspb .JobID (replicationJobID ))
1081- }
971+ // Create clusters once and reuse them across all subtests.
972+ c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
973+ defer cleanup ()
1082974
1083- if completeReplication {
1084- c .DestSysSQL .Exec (t , fmt .Sprintf ("RESUME JOB %d" , replicationJobID ))
1085- jobutils .WaitForJobToRun (c .T , c .DestSysSQL , jobspb .JobID (replicationJobID ))
1086- var emptyCutoverTime time.Time
1087- c .Cutover (ctx , producerJobID , replicationJobID , emptyCutoverTime , false )
1088- c .SrcSysSQL .Exec (t , fmt .Sprintf (`ALTER TENANT '%s' SET REPLICATION SOURCE EXPIRATION WINDOW ='100ms'` , c .Args .SrcTenantName ))
1089- }
975+ // checkDestinationPTSExists verifies that a PTS record exists for the
976+ // destination tenant's keyspan.
977+ checkDestinationPTSExists := func (t * testing.T , replicationJobID int ) {
978+ t .Helper ()
979+ execCfg := c .DestSysServer .ExecutorConfig ().(sql.ExecutorConfig )
980+ testutils .SucceedsSoon (t , func () error {
981+ return c .DestSysServer .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
982+ j , err := execCfg .JobRegistry .LoadJobWithTxn (ctx , jobspb .JobID (replicationJobID ), txn )
983+ if err != nil {
984+ return err
985+ }
986+ payload := j .Payload ()
987+ replicationDetails := payload .GetStreamIngestion ()
988+ if replicationDetails .ProtectedTimestampRecordID == nil {
989+ return errors .New ("ProtectedTimestampRecordID not yet set" )
990+ }
991+ ptp := execCfg .ProtectedTimestampProvider .WithTxn (txn )
992+ _ , err = ptp .GetRecord (ctx , * replicationDetails .ProtectedTimestampRecordID )
993+ return err
994+ })
995+ })
996+ }
1090997
1091- // Set GC TTL low, so that the GC job completes quickly in the test.
1092- c .DestSysSQL .Exec (t , "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;" )
1093- c .DestSysSQL .Exec (t , fmt .Sprintf ("DROP TENANT %s" , c .Args .DestTenantName ))
998+ // checkNoDestinationProtection asserts that the PTS record has been released.
999+ checkNoDestinationProtection := func (t * testing.T , replicationJobID int ) {
1000+ t .Helper ()
1001+ execCfg := c .DestSysServer .ExecutorConfig ().(sql.ExecutorConfig )
1002+ testutils .SucceedsSoon (t , func () error {
1003+ return c .DestSysServer .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
1004+ j , err := execCfg .JobRegistry .LoadJobWithTxn (ctx , jobspb .JobID (replicationJobID ), txn )
1005+ if err != nil {
1006+ return err
1007+ }
1008+ payload := j .Payload ()
1009+ replicationDetails := payload .GetStreamIngestion ()
1010+ ptp := execCfg .ProtectedTimestampProvider .WithTxn (txn )
1011+ _ , err = ptp .GetRecord (ctx , * replicationDetails .ProtectedTimestampRecordID )
1012+ if err == nil {
1013+ return errors .New ("PTS record still exists" )
1014+ }
1015+ if ! errors .Is (err , protectedts .ErrNotExists ) {
1016+ return err
1017+ }
1018+ return nil
1019+ })
1020+ })
1021+ }
10941022
1095- if ! completeReplication {
1096- c .SrcSysSQL .Exec (t , fmt .Sprintf (`ALTER TENANT '%s' SET REPLICATION SOURCE EXPIRATION WINDOW ='1ms'` , c .Args .SrcTenantName ))
1097- jobutils .WaitForJobToCancel (c .T , c .DestSysSQL , jobspb .JobID (replicationJobID ))
1098- jobutils .WaitForJobToFail (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
1099- }
1023+ // Use a counter to assign unique destination tenant IDs for each subtest.
1024+ destTenantCounter := 0
1025+ testutils .RunTrueAndFalse (t , "complete-replication" , func (t * testing.T , completeReplication bool ) {
1026+ // Assign a unique destination tenant for this subtest.
1027+ destTenantCounter ++
1028+ destTenantID := roachpb .MustMakeTenantID (uint64 (destTenantCounter + 1 ))
1029+ destTenantName := roachpb .TenantName (fmt .Sprintf ("destination%d" , destTenantCounter ))
1030+ c .Args .DestTenantID = destTenantID
1031+ c .Args .DestTenantName = destTenantName
11001032
1101- requireReleasedProducerPTSRecord ( t , ctx , c . SrcSysServer , jobspb . JobID ( producerJobID ) )
1033+ producerJobID , replicationJobID := c . StartStreamReplication ( ctx )
11021034
1103- // Check if the replication job has released protected timestamp.
1104- checkNoDestinationProtection ( c , replicationJobID )
1035+ jobutils . WaitForJobToRun ( t , c . SrcSysSQL , jobspb . JobID ( producerJobID ))
1036+ jobutils . WaitForJobToRun ( t , c . DestSysSQL , jobspb . JobID ( replicationJobID ) )
11051037
1106- // Wait for the GC job to finish, this should happen once the protected
1107- // timestamp has been released.
1108- c .DestSysSQL .Exec (t , "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'" )
1038+ // Verify PTS exists on the destination.
1039+ checkDestinationPTSExists (t , replicationJobID )
11091040
1110- // Check if dest tenant key range is cleaned up.
1111- destTenantSpan := keys .MakeTenantSpan (args .DestTenantID )
1112- rows , err := c .DestSysServer .DB ().
1113- Scan (ctx , destTenantSpan .Key , destTenantSpan .EndKey , 10 )
1114- require .NoError (t , err )
1115- require .Empty (t , rows )
1041+ if completeReplication {
1042+ // Complete replication via cutover.
1043+ var emptyCutoverTime time.Time
1044+ c .Cutover (ctx , producerJobID , replicationJobID , emptyCutoverTime , false )
1045+ } else {
1046+ // Cancel replication.
1047+ c .DestSysSQL .Exec (t , fmt .Sprintf ("CANCEL JOB %d" , replicationJobID ))
1048+ jobutils .WaitForJobToCancel (t , c .DestSysSQL , jobspb .JobID (replicationJobID ))
1049+ }
11161050
1117- c .DestSysSQL .CheckQueryResults (t ,
1118- fmt .Sprintf ("SELECT count(*) FROM system.tenants WHERE id = %s" , args .DestTenantID ),
1119- [][]string {{"0" }})
1120- })
1051+ // Verify PTS is released after job completion/cancellation.
1052+ checkNoDestinationProtection (t , replicationJobID )
11211053 })
11221054}
11231055
0 commit comments