Skip to content

Commit 0c0f246

Browse files
craig[bot]kev-caomsbutlerRaduBerinde
committed
144308: roachtest: fix backup-restore compaction overlap bug r=msbutler a=kev-cao The previous implementation of the roachtest did not remove backup end times from the list of candidate end times for compaction after performing a compaction. This could result in scenarios where if multiple compactions were attempted, it could choose a start time of a backup that had been compacted. Because the compaction job compacts backups from an elided backup chain, those compacted backups would not appear in the list of backups and therefore the job would fail due to not being able to find the starting backup. Informs: #144216 Release note: None 144318: crosscluster/logical: validate schema on resume r=jeffswenson a=msbutler Because the user can toggle the use of the kv writer via a cluster setting, we need to validate that their replicating tables comply with their writer mode whenever we spin up the LDR distsql flow. This patch also slightly refactors the input to the validation checker during planning to read from the writer type cluster setting. This means immediate mode using the the sql writer can replicate even if the column ids in the tables don't match. Epic: none Release note: none 144362: scripts: add 25.2 to scripts/check-pebble-dep.sh r=RaduBerinde a=RaduBerinde Epic: none Release note: None Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: Michael Butler <[email protected]> Co-authored-by: Radu Berinde <[email protected]>
4 parents c33f7aa + 6d85793 + 57830f4 + 4f9d938 commit 0c0f246

File tree

10 files changed

+77
-19
lines changed

10 files changed

+77
-19
lines changed

pkg/cmd/roachtest/tests/mixed_version_backup.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"path/filepath"
1616
"reflect"
1717
"regexp"
18+
"slices"
1819
"sort"
1920
"strings"
2021
"sync"
@@ -2076,7 +2077,7 @@ func (d *BackupRestoreTestDriver) createBackupCollection(
20762077
numIncrementals = 2
20772078
}
20782079
l.Printf("creating %d incremental backups", numIncrementals)
2079-
for i := 0; i < numIncrementals; i++ {
2080+
for i := range numIncrementals {
20802081
d.randomWait(l, rng)
20812082
if err := d.testUtils.runJobOnOneOf(ctx, l, incBackupSpec.Execute.Nodes, func() error {
20822083
var err error
@@ -2117,6 +2118,14 @@ func (d *BackupRestoreTestDriver) createBackupCollection(
21172118
}); err != nil {
21182119
return nil, err
21192120
}
2121+
// Since a compacted backup was made, then the backup end times of the
2122+
// backups it compacted should be removed from the slice. This prevents a
2123+
// scenario where a later compaction attempts to pick a start time from
2124+
// one of the backups that were compacted. Since compaction looks at the
2125+
// elided backup chain, these compacted backups are replaced by the
2126+
// compacted backup and compaction will fail due to being unable to find
2127+
// the starting backup.
2128+
backupEndTimes = slices.Delete(backupEndTimes, startIdx+1, endIdx)
21202129
}
21212130
}
21222131

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,11 @@ func createLogicalReplicationStreamPlanHook(
323323
ReverseStreamCommand: reverseStreamCmd,
324324
ParentID: int64(options.ParentID),
325325
Command: stmt.String(),
326+
SkipSchemaCheck: options.SkipSchemaCheck(),
326327
},
327328
Progress: progress,
328329
}
329-
if err := doLDRPlan(ctx, p.User(), p.ExecCfg(), jr, spec.ExternalCatalog, resolvedDestObjects, options.SkipSchemaCheck()); err != nil {
330+
if err := doLDRPlan(ctx, p.User(), p.ExecCfg(), jr, spec.ExternalCatalog, resolvedDestObjects); err != nil {
330331
return err
331332
}
332333
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
@@ -430,7 +431,6 @@ func doLDRPlan(
430431
jr jobs.Record,
431432
srcExternalCatalog externalpb.ExternalCatalog,
432433
resolvedDestObjects ResolvedDestObjects,
433-
skipSchemaCheck bool,
434434
) error {
435435
details := jr.Details.(jobspb.LogicalReplicationDetails)
436436
return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
@@ -475,16 +475,21 @@ func doLDRPlan(
475475
return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length")
476476
}
477477
}
478+
479+
writer, err := getWriterType(ctx, details.Mode, execCfg.Settings)
480+
if err != nil {
481+
return err
482+
}
483+
478484
for i := range srcExternalCatalog.Tables {
479485
destTableDesc := dstTableDescs[i]
480-
mayUseKVWriter := false
481486
if details.Mode != jobspb.LogicalReplicationDetails_Validated {
482487
if len(destTableDesc.OutboundForeignKeys()) > 0 || len(destTableDesc.InboundForeignKeys()) > 0 {
483488
return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'")
484489
}
485-
mayUseKVWriter = true
486490
}
487-
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), skipSchemaCheck || details.CreateTable, mayUseKVWriter)
491+
492+
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), details.SkipSchemaCheck || details.CreateTable, writer == writerTypeLegacyKV)
488493
if err != nil {
489494
return err
490495
}

pkg/crosscluster/logical/logical_replication_dist.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func constructLogicalReplicationWriterSpecs(
3636
discard jobspb.LogicalReplicationDetails_Discard,
3737
mode jobspb.LogicalReplicationDetails_ApplyMode,
3838
metricsLabel string,
39+
writer writerType,
3940
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
4041
spanGroup := roachpb.SpanGroup{}
4142
baseSpec := execinfrapb.LogicalReplicationWriterSpec{
@@ -50,6 +51,7 @@ func constructLogicalReplicationWriterSpecs(
5051
Mode: mode,
5152
MetricsLabel: metricsLabel,
5253
TypeDescriptors: srcTypes,
54+
WriterType: string(writer),
5355
}
5456

5557
writerSpecs := make(map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, len(destSQLInstances))

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,10 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
510510
if defaultFnID := payload.DefaultConflictResolution.FunctionId; defaultFnID != 0 {
511511
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
512512
}
513-
513+
writer, err := getWriterType(ctx, payload.Mode, execCfg.Settings)
514+
if err != nil {
515+
return nil, nil, info, err
516+
}
514517
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes)
515518
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
516519
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
@@ -536,6 +539,10 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
536539
return errors.Wrapf(err, "failed to look up schema descriptor for table %d", pair.DstDescriptorID)
537540
}
538541

542+
if err := tabledesc.CheckLogicalReplicationCompatibility(&srcTableDesc, dstTableDesc.TableDesc(), payload.SkipSchemaCheck || payload.CreateTable, writer == writerTypeLegacyKV); err != nil {
543+
return err
544+
}
545+
539546
var fnOID oid.Oid
540547
if pair.DstFunctionID != 0 {
541548
fnOID = catid.FuncIDToOID(catid.DescID(pair.DstFunctionID))
@@ -590,6 +597,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
590597
payload.Discard,
591598
payload.Mode,
592599
payload.MetricsLabel,
600+
writer,
593601
)
594602
if err != nil {
595603
return nil, nil, info, err

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2428,7 +2428,17 @@ func TestMismatchColIDs(t *testing.T) {
24282428
defer log.Scope(t).Close(t)
24292429

24302430
ctx := context.Background()
2431-
tc, s, sqlA, sqlB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2431+
2432+
streamingKnobs := &sql.StreamingTestingKnobs{
2433+
DistSQLRetryPolicy: &retry.Options{
2434+
InitialBackoff: time.Microsecond,
2435+
MaxBackoff: 2 * time.Microsecond,
2436+
MaxRetries: 1,
2437+
},
2438+
}
2439+
args := testClusterBaseClusterArgs
2440+
args.ServerArgs.Knobs.Streaming = streamingKnobs
2441+
tc, s, sqlA, sqlB := setupLogicalTestServer(t, ctx, args, 1)
24322442
defer tc.Stopper().Stop(ctx)
24332443

24342444
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -2449,17 +2459,29 @@ func TestMismatchColIDs(t *testing.T) {
24492459
sqlB.Exec(t, "ALTER TABLE foo DROP COLUMN bar")
24502460
sqlB.Exec(t, "INSERT INTO foo VALUES (4, 'world')")
24512461

2452-
// LDR immediate mode creation should fail because of mismatched column IDs.
2462+
// When using the kv writer, LDR immediate mode creation should fail because of mismatched column IDs.
2463+
sqlA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
24532464
sqlA.ExpectErr(t,
24542465
"destination table foo column baz has ID 3, but the source table foo has ID 4",
24552466
"CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'immediate'", dbBURL.String())
24562467

2457-
// LDR validated mode creation should succeed because the SQL writer supports mismatched column IDs.
24582468
var jobID jobspb.JobID
24592469
sqlA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'validated'", dbBURL.String()).Scan(&jobID)
2460-
24612470
now := s.Clock().Now()
24622471
WaitUntilReplicatedTime(t, now, sqlA, jobID)
2472+
2473+
sqlA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'sql'")
2474+
sqlA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'immediate'", dbBURL.String()).Scan(&jobID)
2475+
now = s.Clock().Now()
2476+
WaitUntilReplicatedTime(t, now, sqlA, jobID)
2477+
2478+
// Ensure the validation works on resumption as well.
2479+
sqlA.Exec(t, "PAUSE JOB $1", jobID)
2480+
jobutils.WaitForJobToPause(t, sqlA, jobID)
2481+
2482+
sqlA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
2483+
sqlA.Exec(t, "RESUME JOB $1", jobID)
2484+
jobutils.WaitForJobToPause(t, sqlA, jobID)
24632485
}
24642486

24652487
// TestLogicalReplicationCreationChecks verifies that we check that the table

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -711,17 +711,24 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
711711
b.Close(lrw.Ctx())
712712
}
713713

714+
writer := writerType(lrw.spec.WriterType)
715+
if writer == "" && !lrw.FlowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V25_2) {
716+
var err error
717+
writer, err = getWriterType(
718+
ctx, lrw.spec.Mode, lrw.FlowCtx.Cfg.Settings,
719+
)
720+
if err != nil {
721+
return err
722+
}
723+
}
724+
714725
flowCtx := lrw.FlowCtx
715726
lrw.bh = make([]BatchHandler, poolSize)
716727
for i := range lrw.bh {
717728
var rp BatchHandler
718729
var err error
719730
sd := sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */)
720731

721-
writer, err := getWriterType(ctx, lrw.spec.Mode, flowCtx.Cfg.Settings)
722-
if err != nil {
723-
return err
724-
}
725732
switch writer {
726733
case writerTypeSQL:
727734
rp, err = makeSQLProcessor(
@@ -744,7 +751,7 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
744751
return err
745752
}
746753
default:
747-
return errors.AssertionFailedf("unknown logical replication writer type: %s", writer)
754+
return errors.AssertionFailedf("unknown logical replication writer type: %s", lrw.spec.WriterType)
748755
}
749756

750757
if streamingKnobs, ok := flowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {

pkg/jobs/jobspb/jobs.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,8 @@ message LogicalReplicationDetails {
292292

293293
string command = 16;
294294

295+
bool skip_schema_check = 17;
296+
295297
// Next ID: 17.
296298
}
297299

pkg/sql/catalog/tabledesc/logical_replication_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func checkSrcDstColsMatch(
218218
}
219219

220220
if requireKvWriterCompatible && srcCol.ID != dstCol.ID {
221-
return errors.Newf("destination table %s column %s has ID %d, but the source table %s has ID %d",
221+
return errors.Newf("destination table %s column %s has ID %d, but the source table %s has ID %d. To circumvent this check, unset logical_replication.consumer.immediate_mode_writer from 'legacy-kv'",
222222
dst.Name, dstCol.Name, dstCol.ID, src.Name, srcCol.ID,
223223
)
224224
}

pkg/sql/execinfrapb/processors_bulk_io.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,10 @@ message LogicalReplicationWriterSpec {
536536
optional string metrics_label = 11 [(gogoproto.nullable) = false];
537537

538538
repeated cockroach.sql.sqlbase.TypeDescriptor type_descriptors = 13;
539-
// Next ID: 14.
539+
540+
optional string writer_type = 14 [(gogoproto.nullable) = false];
541+
542+
// Next ID: 15.
540543
}
541544

542545
message LogicalReplicationOfflineScanSpec {

scripts/check-pebble-dep.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
set -euo pipefail
1010
#set -x
1111

12-
RELEASES="23.2 24.1 24.3 25.1 master"
12+
RELEASES="23.2 24.1 24.3 25.1 25.2 master"
1313

1414
for REL in $RELEASES; do
1515
if [ "$REL" == "master" ]; then

0 commit comments

Comments
 (0)