Skip to content

Commit 559572d

Browse files
committed
crosscluster/physical: slim down TestProtectedTimestampManagement
This test was doing far too much for a unit test, running multiple test clusters and also waiting for multiple systems like PCR, GC, etc to all interact. This slashes the test's scope to just verify PCR's direct behavior. Release note: none. Epic: none.
1 parent 7302175 commit 559572d

File tree

3 files changed

+151
-153
lines changed

3 files changed

+151
-153
lines changed

pkg/crosscluster/physical/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ go_test(
151151
"//pkg/kv/kvpb",
152152
"//pkg/kv/kvserver",
153153
"//pkg/kv/kvserver/protectedts",
154-
"//pkg/kv/kvserver/protectedts/ptpb",
155154
"//pkg/repstream/streampb",
156155
"//pkg/roachpb",
157156
"//pkg/security/securityassets",

pkg/crosscluster/physical/replication_stream_e2e_test.go

Lines changed: 84 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/keys"
2727
"github.com/cockroachdb/cockroach/pkg/kv"
2828
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
29-
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
3029
"github.com/cockroachdb/cockroach/pkg/roachpb"
3130
"github.com/cockroachdb/cockroach/pkg/security/username"
3231
"github.com/cockroachdb/cockroach/pkg/server"
@@ -952,172 +951,105 @@ func TestStreamingReplanOnLag(t *testing.T) {
952951
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))
953952
}
954953

955-
// TestProtectedTimestampManagement tests the active protected
956-
// timestamps management on the destination tenant's keyspan.
954+
// TestProtectedTimestampManagement tests that the destination tenant's
955+
// protected timestamp is created when the ingestion job starts and
956+
// released when the job completes or is canceled.
957+
//
958+
// Note: The PTS formula (protectedTS = replicatedTime - TTL) is unit tested
959+
// in TestResolveHeartbeatTime. Producer PTS management is tested in
960+
// TestTenantStreamingCancelIngestion.
957961
func TestProtectedTimestampManagement(t *testing.T) {
958962
defer leaktest.AfterTest(t)()
959963
defer log.Scope(t).Close(t)
960964

961-
skip.UnderRace(t, "slow test") // takes >1mn under race.
962-
963965
ctx := context.Background()
964966
args := replicationtestutils.DefaultTenantStreamingClustersArgs
965-
// Override the replication job details ReplicationTTLSeconds to a small value
966-
// so that every progress update results in a protected timestamp update.
967-
args.RetentionTTLSeconds = 1
968-
969-
testutils.RunTrueAndFalse(t, "pause-before-terminal", func(t *testing.T, pauseBeforeTerminal bool) {
970-
testutils.RunTrueAndFalse(t, "complete-replication", func(t *testing.T, completeReplication bool) {
971-
972-
// waitForProducerProtection asserts that there is a PTS record protecting
973-
// the source tenant. We ensure the PTS record is protecting a timestamp
974-
// greater or equal to the frontier we know we have replicated up until.
975-
waitForProducerProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, producerJobID int) {
976-
testutils.SucceedsSoon(t, func() error {
977-
srv := c.SrcSysServer
978-
job, err := srv.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobspb.JobID(producerJobID))
979-
if err != nil {
980-
return err
981-
}
982-
ptsRecordID := job.Payload().Details.(*jobspb.Payload_StreamReplication).StreamReplication.ProtectedTimestampRecordID
983-
ptsProvider := srv.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
984-
985-
var ptsRecord *ptpb.Record
986-
if err := srv.InternalDB().(descs.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
987-
var err error
988-
ptsRecord, err = ptsProvider.WithTxn(txn).GetRecord(ctx, ptsRecordID)
989-
return err
990-
}); err != nil {
991-
return err
992-
}
993-
if ptsRecord.Timestamp.Less(frontier) {
994-
return errors.Newf("protection is at %s, expected to be >= %s",
995-
ptsRecord.Timestamp.String(), frontier.String())
996-
}
997-
return nil
998-
})
999-
}
1000-
1001-
// checkNoDestinationProtections asserts that there is no PTS record
1002-
// protecting the destination tenant.
1003-
checkNoDestinationProtection := func(c *replicationtestutils.TenantStreamingClusters, replicationJobID int) {
1004-
execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig)
1005-
require.NoError(t, c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1006-
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
1007-
require.NoError(t, err)
1008-
payload := j.Payload()
1009-
replicationDetails := payload.GetStreamIngestion()
1010-
ptp := execCfg.ProtectedTimestampProvider.WithTxn(txn)
1011-
_, err = ptp.GetRecord(ctx, *replicationDetails.ProtectedTimestampRecordID)
1012-
require.EqualError(t, err, protectedts.ErrNotExists.Error())
1013-
return nil
1014-
}))
1015-
}
1016-
checkDestinationProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) {
1017-
execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig)
1018-
ptp := execCfg.ProtectedTimestampProvider
1019-
require.NoError(t, c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1020-
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
1021-
if err != nil {
1022-
return err
1023-
}
1024-
payload := j.Payload()
1025-
progress := j.Progress()
1026-
replicationDetails := payload.GetStreamIngestion()
1027-
1028-
require.NotNil(t, replicationDetails.ProtectedTimestampRecordID)
1029-
rec, err := ptp.WithTxn(txn).GetRecord(ctx, *replicationDetails.ProtectedTimestampRecordID)
1030-
if err != nil {
1031-
return err
1032-
}
1033-
1034-
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
1035-
require.True(t, frontier.LessEq(replicatedTime))
1036-
1037-
roundedReplicatedTime := replicatedTime.GoTime().Round(time.Millisecond)
1038-
roundedProtectedTime := rec.Timestamp.GoTime().Round(time.Millisecond)
1039-
window := roundedReplicatedTime.Sub(roundedProtectedTime)
1040-
require.Equal(t, time.Second, window)
1041-
return nil
1042-
}))
1043-
}
967+
// Disable metamorphic external connection since we reuse clusters across
968+
// subtests and the external connection would already exist on subsequent runs.
969+
args.NoMetamorphicExternalConnection = true
1044970

1045-
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
1046-
defer cleanup()
1047-
1048-
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
1049-
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")
1050-
1051-
producerJobID, replicationJobID := c.StartStreamReplication(ctx)
1052-
1053-
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
1054-
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
1055-
1056-
// Ensure that we wait at least a second so that the gap between the first
1057-
// time we write the protected timestamp (t1) during replication job
1058-
// startup, and the first progress update (t2) is greater than 1s. This is
1059-
// important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we
1060-
// will not update the PTS record.
1061-
now := c.SrcCluster.Server(0).SystemLayer().Clock().Now().Add(int64(time.Second)*2, 0)
1062-
c.WaitUntilReplicatedTime(now, jobspb.JobID(replicationJobID))
1063-
1064-
// Check that the producer and replication job have written a protected
1065-
// timestamp.
1066-
waitForProducerProtection(c, now, producerJobID)
1067-
checkDestinationProtection(c, now, replicationJobID)
1068-
1069-
now2 := now.Add(time.Second.Nanoseconds(), 0)
1070-
c.WaitUntilReplicatedTime(now2, jobspb.JobID(replicationJobID))
1071-
// Let the replication progress for a second before checking that the
1072-
// protected timestamp record has also been updated on the destination
1073-
// cluster. This update happens in the same txn in which we update the
1074-
// replication job's progress.
1075-
waitForProducerProtection(c, now2, producerJobID)
1076-
checkDestinationProtection(c, now2, replicationJobID)
1077-
1078-
if pauseBeforeTerminal {
1079-
c.DestSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", replicationJobID))
1080-
jobutils.WaitForJobToPause(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
1081-
}
971+
// Create clusters once and reuse them across all subtests.
972+
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
973+
defer cleanup()
1082974

1083-
if completeReplication {
1084-
c.DestSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", replicationJobID))
1085-
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
1086-
var emptyCutoverTime time.Time
1087-
c.Cutover(ctx, producerJobID, replicationJobID, emptyCutoverTime, false)
1088-
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION SOURCE EXPIRATION WINDOW ='100ms'`, c.Args.SrcTenantName))
1089-
}
975+
// checkDestinationPTSExists verifies that a PTS record exists for the
976+
// destination tenant's keyspan.
977+
checkDestinationPTSExists := func(t *testing.T, replicationJobID int) {
978+
t.Helper()
979+
execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig)
980+
testutils.SucceedsSoon(t, func() error {
981+
return c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
982+
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
983+
if err != nil {
984+
return err
985+
}
986+
payload := j.Payload()
987+
replicationDetails := payload.GetStreamIngestion()
988+
if replicationDetails.ProtectedTimestampRecordID == nil {
989+
return errors.New("ProtectedTimestampRecordID not yet set")
990+
}
991+
ptp := execCfg.ProtectedTimestampProvider.WithTxn(txn)
992+
_, err = ptp.GetRecord(ctx, *replicationDetails.ProtectedTimestampRecordID)
993+
return err
994+
})
995+
})
996+
}
1090997

1091-
// Set GC TTL low, so that the GC job completes quickly in the test.
1092-
c.DestSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
1093-
c.DestSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.Args.DestTenantName))
998+
// checkNoDestinationProtection asserts that the PTS record has been released.
999+
checkNoDestinationProtection := func(t *testing.T, replicationJobID int) {
1000+
t.Helper()
1001+
execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig)
1002+
testutils.SucceedsSoon(t, func() error {
1003+
return c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1004+
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
1005+
if err != nil {
1006+
return err
1007+
}
1008+
payload := j.Payload()
1009+
replicationDetails := payload.GetStreamIngestion()
1010+
ptp := execCfg.ProtectedTimestampProvider.WithTxn(txn)
1011+
_, err = ptp.GetRecord(ctx, *replicationDetails.ProtectedTimestampRecordID)
1012+
if err == nil {
1013+
return errors.New("PTS record still exists")
1014+
}
1015+
if !errors.Is(err, protectedts.ErrNotExists) {
1016+
return err
1017+
}
1018+
return nil
1019+
})
1020+
})
1021+
}
10941022

1095-
if !completeReplication {
1096-
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION SOURCE EXPIRATION WINDOW ='1ms'`, c.Args.SrcTenantName))
1097-
jobutils.WaitForJobToCancel(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
1098-
jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
1099-
}
1023+
// Use a counter to assign unique destination tenant IDs for each subtest.
1024+
destTenantCounter := 0
1025+
testutils.RunTrueAndFalse(t, "complete-replication", func(t *testing.T, completeReplication bool) {
1026+
// Assign a unique destination tenant for this subtest.
1027+
destTenantCounter++
1028+
destTenantID := roachpb.MustMakeTenantID(uint64(destTenantCounter + 1))
1029+
destTenantName := roachpb.TenantName(fmt.Sprintf("destination%d", destTenantCounter))
1030+
c.Args.DestTenantID = destTenantID
1031+
c.Args.DestTenantName = destTenantName
11001032

1101-
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID))
1033+
producerJobID, replicationJobID := c.StartStreamReplication(ctx)
11021034

1103-
// Check if the replication job has released protected timestamp.
1104-
checkNoDestinationProtection(c, replicationJobID)
1035+
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
1036+
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
11051037

1106-
// Wait for the GC job to finish, this should happen once the protected
1107-
// timestamp has been released.
1108-
c.DestSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'")
1038+
// Verify PTS exists on the destination.
1039+
checkDestinationPTSExists(t, replicationJobID)
11091040

1110-
// Check if dest tenant key range is cleaned up.
1111-
destTenantSpan := keys.MakeTenantSpan(args.DestTenantID)
1112-
rows, err := c.DestSysServer.DB().
1113-
Scan(ctx, destTenantSpan.Key, destTenantSpan.EndKey, 10)
1114-
require.NoError(t, err)
1115-
require.Empty(t, rows)
1041+
if completeReplication {
1042+
// Complete replication via cutover.
1043+
var emptyCutoverTime time.Time
1044+
c.Cutover(ctx, producerJobID, replicationJobID, emptyCutoverTime, false)
1045+
} else {
1046+
// Cancel replication.
1047+
c.DestSysSQL.Exec(t, fmt.Sprintf("CANCEL JOB %d", replicationJobID))
1048+
jobutils.WaitForJobToCancel(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
1049+
}
11161050

1117-
c.DestSysSQL.CheckQueryResults(t,
1118-
fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.DestTenantID),
1119-
[][]string{{"0"}})
1120-
})
1051+
// Verify PTS is released after job completion/cancellation.
1052+
checkNoDestinationProtection(t, replicationJobID)
11211053
})
11221054
}
11231055

pkg/crosscluster/replicationutils/utils_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,70 @@ func TestScanSST(t *testing.T) {
134134
checkScan(roachpb.Span{Key: roachpb.Key("da"), EndKey: roachpb.Key("e")},
135135
[]storage.MVCCKeyValue{}, []storage.MVCCRangeKey{})
136136
}
137+
138+
func TestResolveHeartbeatTime(t *testing.T) {
139+
defer leaktest.AfterTest(t)()
140+
defer log.Scope(t).Close(t)
141+
142+
// Helper to create timestamps more easily.
143+
ts := func(seconds int64) hlc.Timestamp {
144+
return hlc.Timestamp{WallTime: seconds * 1e9}
145+
}
146+
147+
testCases := []struct {
148+
name string
149+
replicatedTime hlc.Timestamp
150+
replicationStartTime hlc.Timestamp
151+
cutoverTime hlc.Timestamp
152+
ttlSeconds int32
153+
expected hlc.Timestamp
154+
}{
155+
{
156+
name: "basic formula: replicated - TTL",
157+
replicatedTime: ts(100),
158+
replicationStartTime: ts(50),
159+
cutoverTime: hlc.Timestamp{},
160+
ttlSeconds: 10,
161+
expected: ts(90),
162+
},
163+
{
164+
name: "clamped to start time when result would be earlier",
165+
replicatedTime: ts(55),
166+
replicationStartTime: ts(50),
167+
cutoverTime: hlc.Timestamp{},
168+
ttlSeconds: 10,
169+
expected: ts(50), // 55-10=45, but clamped to 50
170+
},
171+
{
172+
name: "clamped to cutover time when cutover is earlier",
173+
replicatedTime: ts(100),
174+
replicationStartTime: ts(50),
175+
cutoverTime: ts(85),
176+
ttlSeconds: 10,
177+
expected: ts(85), // 100-10=90, but clamped to cutover 85
178+
},
179+
{
180+
name: "cutover later than calculated time has no effect",
181+
replicatedTime: ts(100),
182+
replicationStartTime: ts(50),
183+
cutoverTime: ts(95),
184+
ttlSeconds: 10,
185+
expected: ts(90), // 100-10=90, cutover 95 doesn't affect
186+
},
187+
{
188+
name: "zero TTL returns replicated time",
189+
replicatedTime: ts(100),
190+
replicationStartTime: ts(50),
191+
cutoverTime: hlc.Timestamp{},
192+
ttlSeconds: 0,
193+
expected: ts(100),
194+
},
195+
}
196+
197+
for _, tc := range testCases {
198+
t.Run(tc.name, func(t *testing.T) {
199+
result := ResolveHeartbeatTime(tc.replicatedTime, tc.replicationStartTime, tc.cutoverTime, tc.ttlSeconds)
200+
require.Equal(t, tc.expected, result)
201+
})
202+
}
203+
}

0 commit comments

Comments
 (0)