Skip to content

Commit 2bd1c93

Browse files
backup: add support for execution locality to backup compactions
This patch teaches backup compactions about execution locality filters, allowing backup compactions to support backup schedules with execution locality set while respecting the locality filter. Fixes: #149592 Release note: None
1 parent 843d488 commit 2bd1c93

File tree

4 files changed

+313
-74
lines changed

4 files changed

+313
-74
lines changed

pkg/backup/compaction_dist.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
1212
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1313
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
1415
"github.com/cockroachdb/cockroach/pkg/cloud"
1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1617
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
@@ -152,10 +153,22 @@ func createCompactionPlan(
152153
return nil, nil, errors.Wrap(err, "counting number of restore span entries")
153154
}
154155

155-
// TODO (kev-cao): Add support for execution locality.
156+
evalCtx := execCtx.ExtendedEvalContext()
157+
locFilter := sql.SingleLocalityFilter(details.ExecutionLocality)
158+
oracle := physicalplan.DefaultReplicaChooser
159+
if useBulkOracle.Get(&evalCtx.Settings.SV) {
160+
oracle, err = kvfollowerreadsccl.NewLocalityFilteringBulkOracle(
161+
dsp.ReplicaOracleConfig(evalCtx.Locality),
162+
locFilter,
163+
)
164+
if err != nil {
165+
return nil, nil, errors.Wrap(err, "failed to create locality filtering bulk oracle")
166+
}
167+
}
168+
156169
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanningWithOracle(
157-
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(),
158-
physicalplan.DefaultReplicaChooser, []roachpb.Locality{}, sql.NoStrictLocalityFiltering,
170+
ctx, evalCtx, execCtx.ExecCfg(),
171+
oracle, locFilter, sql.NoStrictLocalityFiltering,
159172
)
160173
if err != nil {
161174
return nil, nil, err

pkg/backup/compaction_job.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ func maybeStartCompactionJob(
7373
}
7474

7575
switch {
76-
case len(triggerJob.ExecutionLocality.Tiers) != 0:
77-
return 0, errors.New("execution locality not supported for compaction")
7876
case triggerJob.ScheduleID == 0:
7977
return 0, errors.New("only scheduled backups can be compacted")
8078
case len(triggerJob.SpecificTenantIds) != 0 || triggerJob.IncludeAllSecondaryTenants:
@@ -197,13 +195,47 @@ func StartCompactionJob(
197195
scheduleID jobspb.ScheduleID,
198196
collectionURI []string,
199197
fullBackupPath string,
200-
encryptionOpts jobspb.BackupEncryptionOptions,
198+
options tree.BackupOptions,
201199
start, end hlc.Timestamp,
202200
) (jobspb.JobID, error) {
203201
planHook, ok := planner.(sql.PlanHookState)
204202
if !ok {
205203
return 0, errors.New("missing job execution context")
206204
}
205+
206+
var executionLocality roachpb.Locality
207+
if options.ExecutionLocality != nil {
208+
if strVal, ok := options.ExecutionLocality.(*tree.StrVal); ok {
209+
s := strVal.RawString()
210+
if s != "" {
211+
if err := executionLocality.Set(s); err != nil {
212+
return 0, errors.Wrap(err, "error setting execution locality")
213+
}
214+
}
215+
} else {
216+
return 0, errors.Newf(
217+
"expected string value, got %+v", options.ExecutionLocality,
218+
)
219+
}
220+
}
221+
222+
encryption := jobspb.BackupEncryptionOptions{
223+
Mode: jobspb.EncryptionMode_None,
224+
}
225+
if options.EncryptionPassphrase != nil {
226+
encryption.Mode = jobspb.EncryptionMode_Passphrase
227+
encryption.RawPassphrase = tree.AsStringWithFlags(
228+
options.EncryptionPassphrase,
229+
tree.FmtBareStrings,
230+
)
231+
}
232+
if options.EncryptionKMSURI != nil {
233+
if encryption.Mode != jobspb.EncryptionMode_None {
234+
return 0, errors.Newf("only one encryption mode can be specified")
235+
}
236+
encryption.RawKmsUris = builtins.ExprSliceToStrSlice(options.EncryptionKMSURI)
237+
}
238+
207239
details := jobspb.BackupDetails{
208240
ScheduleID: scheduleID,
209241
StartTime: start,
@@ -213,7 +245,8 @@ func StartCompactionJob(
213245
Subdir: fullBackupPath,
214246
Exists: true,
215247
},
216-
EncryptionOptions: &encryptionOpts,
248+
EncryptionOptions: &encryption,
249+
ExecutionLocality: executionLocality,
217250
Compact: true,
218251
}
219252
jobID := planHook.ExecCfg().JobRegistry.MakeJobID()
@@ -598,6 +631,7 @@ func updateCompactionBackupDetails(
598631
ResolvedCompleteDbs: lastBackup.CompleteDbs,
599632
FullCluster: lastBackup.DescriptorCoverage == tree.AllDescriptors,
600633
ScheduleID: initialDetails.ScheduleID,
634+
ExecutionLocality: initialDetails.ExecutionLocality,
601635
Compact: true,
602636
}
603637
return compactedDetails, nil

0 commit comments

Comments
 (0)