Skip to content

Commit e3f3439

Browse files
craig[bot]andrew-r-thomas
andcommitted
Merge #159758
159758: backup: add support for execution locality to backup compactions r=andrew-r-thomas a=andrew-r-thomas This patch teaches backup compactions about execution locality filters, allowing backup compactions to support backup schedules with execution locality set while respecting the locality filter. Fixes: #149592 Release note: None Co-authored-by: Andrew Thomas <[email protected]>
2 parents 854cbf1 + 2bd1c93 commit e3f3439

File tree

4 files changed

+313
-74
lines changed

4 files changed

+313
-74
lines changed

pkg/backup/compaction_dist.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
1212
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1313
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
1415
"github.com/cockroachdb/cockroach/pkg/cloud"
1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1617
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
@@ -152,10 +153,22 @@ func createCompactionPlan(
152153
return nil, nil, errors.Wrap(err, "counting number of restore span entries")
153154
}
154155

155-
// TODO (kev-cao): Add support for execution locality.
156+
evalCtx := execCtx.ExtendedEvalContext()
157+
locFilter := sql.SingleLocalityFilter(details.ExecutionLocality)
158+
oracle := physicalplan.DefaultReplicaChooser
159+
if useBulkOracle.Get(&evalCtx.Settings.SV) {
160+
oracle, err = kvfollowerreadsccl.NewLocalityFilteringBulkOracle(
161+
dsp.ReplicaOracleConfig(evalCtx.Locality),
162+
locFilter,
163+
)
164+
if err != nil {
165+
return nil, nil, errors.Wrap(err, "failed to create locality filtering bulk oracle")
166+
}
167+
}
168+
156169
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanningWithOracle(
157-
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(),
158-
physicalplan.DefaultReplicaChooser, []roachpb.Locality{}, sql.NoStrictLocalityFiltering,
170+
ctx, evalCtx, execCtx.ExecCfg(),
171+
oracle, locFilter, sql.NoStrictLocalityFiltering,
159172
)
160173
if err != nil {
161174
return nil, nil, err

pkg/backup/compaction_job.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ func maybeStartCompactionJob(
7373
}
7474

7575
switch {
76-
case len(triggerJob.ExecutionLocality.Tiers) != 0:
77-
return 0, errors.New("execution locality not supported for compaction")
7876
case triggerJob.ScheduleID == 0:
7977
return 0, errors.New("only scheduled backups can be compacted")
8078
case len(triggerJob.SpecificTenantIds) != 0 || triggerJob.IncludeAllSecondaryTenants:
@@ -197,13 +195,47 @@ func StartCompactionJob(
197195
scheduleID jobspb.ScheduleID,
198196
collectionURI []string,
199197
fullBackupPath string,
200-
encryptionOpts jobspb.BackupEncryptionOptions,
198+
options tree.BackupOptions,
201199
start, end hlc.Timestamp,
202200
) (jobspb.JobID, error) {
203201
planHook, ok := planner.(sql.PlanHookState)
204202
if !ok {
205203
return 0, errors.New("missing job execution context")
206204
}
205+
206+
var executionLocality roachpb.Locality
207+
if options.ExecutionLocality != nil {
208+
if strVal, ok := options.ExecutionLocality.(*tree.StrVal); ok {
209+
s := strVal.RawString()
210+
if s != "" {
211+
if err := executionLocality.Set(s); err != nil {
212+
return 0, errors.Wrap(err, "error setting execution locality")
213+
}
214+
}
215+
} else {
216+
return 0, errors.Newf(
217+
"expected string value, got %+v", options.ExecutionLocality,
218+
)
219+
}
220+
}
221+
222+
encryption := jobspb.BackupEncryptionOptions{
223+
Mode: jobspb.EncryptionMode_None,
224+
}
225+
if options.EncryptionPassphrase != nil {
226+
encryption.Mode = jobspb.EncryptionMode_Passphrase
227+
encryption.RawPassphrase = tree.AsStringWithFlags(
228+
options.EncryptionPassphrase,
229+
tree.FmtBareStrings,
230+
)
231+
}
232+
if options.EncryptionKMSURI != nil {
233+
if encryption.Mode != jobspb.EncryptionMode_None {
234+
return 0, errors.Newf("only one encryption mode can be specified")
235+
}
236+
encryption.RawKmsUris = builtins.ExprSliceToStrSlice(options.EncryptionKMSURI)
237+
}
238+
207239
details := jobspb.BackupDetails{
208240
ScheduleID: scheduleID,
209241
StartTime: start,
@@ -213,7 +245,8 @@ func StartCompactionJob(
213245
Subdir: fullBackupPath,
214246
Exists: true,
215247
},
216-
EncryptionOptions: &encryptionOpts,
248+
EncryptionOptions: &encryption,
249+
ExecutionLocality: executionLocality,
217250
Compact: true,
218251
}
219252
jobID := planHook.ExecCfg().JobRegistry.MakeJobID()
@@ -598,6 +631,7 @@ func updateCompactionBackupDetails(
598631
ResolvedCompleteDbs: lastBackup.CompleteDbs,
599632
FullCluster: lastBackup.DescriptorCoverage == tree.AllDescriptors,
600633
ScheduleID: initialDetails.ScheduleID,
634+
ExecutionLocality: initialDetails.ExecutionLocality,
601635
Compact: true,
602636
}
603637
return compactedDetails, nil

0 commit comments

Comments
 (0)