Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/backup/backupsink/sst_sink_key_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ type SSTSinkKeyWriter struct {
// Caching the targetFileSize from the cluster settings to avoid multiple
// lookups during writes.
targetFileSize int64

// The FileSSTSink gets its locality information from the ExportedSpan passed to Write.
// Since we're doing things more manually here, we need to keep this information ourselves.
LocalityKV string
}

func MakeSSTSinkKeyWriter(conf SSTSinkConf, dest cloud.ExternalStorage) (*SSTSinkKeyWriter, error) {
func MakeSSTSinkKeyWriter(
conf SSTSinkConf, dest cloud.ExternalStorage, localityKV string,
) (*SSTSinkKeyWriter, error) {
if conf.ElideMode == execinfrapb.ElidePrefix_None {
return nil, errors.New("KeyWriter does not support ElidePrefix_None")
}
return &SSTSinkKeyWriter{
FileSSTSink: *MakeFileSSTSink(conf, dest, nil),
targetFileSize: targetFileSize.Get(conf.Settings),
LocalityKV: localityKV,
}, nil
}

Expand Down Expand Up @@ -166,8 +173,9 @@ func (s *SSTSinkKeyWriter) Reset(ctx context.Context, newSpan roachpb.Span) erro
// the span is reused to optimize memory usage, we need to clone the keys
// to ensure that the BackupManifest_File's span is not unintentionally
// mutated outside of the SSTSinkKeyWriter.
Span: newSpan.Clone(),
Path: s.outName,
Span: newSpan.Clone(),
Path: s.outName,
LocalityKV: s.LocalityKV,
},
)
s.prevKey = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/backupsink/sst_sink_key_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func sstSinkKeyWriterTestSetup(
t *testing.T, settings *cluster.Settings, elideMode execinfrapb.ElidePrefix,
) (*SSTSinkKeyWriter, cloud.ExternalStorage) {
conf, store := sinkTestSetup(t, settings, elideMode)
sink, err := MakeSSTSinkKeyWriter(conf, store)
sink, err := MakeSSTSinkKeyWriter(conf, store, "" /* localityKV */)
require.NoError(t, err)
return sink, store
}
Expand Down
207 changes: 143 additions & 64 deletions pkg/backup/compaction_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package backup

import (
"context"
"net/url"
"slices"

"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
Expand Down Expand Up @@ -148,15 +150,11 @@ func createCompactionPlan(
targetSize int64,
maxFiles int64,
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
numEntries, err := countRestoreSpanEntries(ctx, genSpan)
if err != nil {
return nil, nil, errors.Wrap(err, "counting number of restore span entries")
}

evalCtx := execCtx.ExtendedEvalContext()
locFilter := sql.SingleLocalityFilter(details.ExecutionLocality)
oracle := physicalplan.DefaultReplicaChooser
locFilter := sql.SingleLocalityFilter(details.ExecutionLocality)
if useBulkOracle.Get(&evalCtx.Settings.SV) {
var err error
oracle, err = kvfollowerreadsccl.NewLocalityFilteringBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality),
locFilter,
Expand All @@ -173,15 +171,19 @@ func createCompactionPlan(
if err != nil {
return nil, nil, err
}

plan := planCtx.NewPhysicalPlan()
localitySets, err := buildLocalitySets(ctx, dsp, sqlInstanceIDs, genSpan)
if err != nil {
return nil, nil, err
}
corePlacements, err := createCompactionCorePlacements(
ctx, jobID, execCtx.User(), details, manifest.ElidedPrefix, genSpan,
spansToCompact, sqlInstanceIDs, targetSize, maxFiles, numEntries,
spansToCompact, localitySets, targetSize, maxFiles,
)
if err != nil {
return nil, nil, err
}

plan := planCtx.NewPhysicalPlan()
plan.AddNoInputStage(
corePlacements,
execinfrapb.PostProcessSpec{},
Expand All @@ -192,18 +194,40 @@ func createCompactionPlan(
return plan, planCtx, nil
}

// countRestoreSpanEntries counts the number of restore span entries that will be
// be delivered by the given generator.
func countRestoreSpanEntries(
type localitySet struct {
instanceIDs []base.SQLInstanceID
totalEntries int

currInstanceIdx int
currEntriesAdded int
}

// buildLocalitySets does a dry run of genSpan in order to build a locality to localitySet mapping
// which contains the available nodes and number of expected entries per locality.
// The localitySet keyed by "default" will contain all available nodes, and in a non locality-aware
// setting, the totalEntries count will be the global total number of entries.
func buildLocalitySets(
ctx context.Context,
dsp *sql.DistSQLPlanner,
instanceIDs []base.SQLInstanceID,
genSpan func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error,
) (int, error) {
) (map[string]*localitySet, error) {
localitySets := make(map[string]*localitySet)
localitySets["default"] = &localitySet{instanceIDs: instanceIDs}

countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
var numImportSpans int
countTasks := []func(ctx context.Context) error{
func(ctx context.Context) error {
for range countSpansCh {
numImportSpans++
for entry := range countSpansCh {
locality, err := entryLocality(entry)
if err != nil {
return err
}
if set, ok := localitySets[locality]; ok {
set.totalEntries += 1
} else {
localitySets[locality] = &localitySet{totalEntries: 1}
}
}
return nil
},
Expand All @@ -213,13 +237,39 @@ func countRestoreSpanEntries(
},
}
if err := ctxgroup.GoAndWait(ctx, countTasks...); err != nil {
return 0, errors.Wrapf(err, "counting number of spans to compact")
return nil, errors.Wrapf(err, "counting number of spans to compact")
}

instancesByLocality := make(map[string][]base.SQLInstanceID)
instancesByLocality["default"] = instanceIDs
for _, id := range instanceIDs {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're trying to avoid a double for loop (for localitySet;; for instance), by looping over the instances first, and then the locality set, but I think this code could more easily reuse the locality matching semantics (e.g. locality.Match()) if you just used a double for loop. partition spans loops over all instances for a given span, so we should too!
https://github.com/andrew-r-thomas/cockroach/blob/compaction-loc-aware/pkg/sql/distsql_physical_planner.go#L1356

desc, err := dsp.GetSQLInstanceInfo(id)
if err != nil {
return nil, errors.Wrapf(err, "getting sql instance info for %d", id)
}
for _, tier := range desc.Locality.Tiers {
tierStr := tier.String()
instancesByLocality[tierStr] = append(instancesByLocality[tierStr], id)
}
}
for locality, set := range localitySets {
instances := instancesByLocality[locality]
if len(instances) == 0 {
// In a non-strict setting, if we don't have any nodes available that match the locality
// filter, we just evenly distribute the work across all available nodes.
//
// TODO(at): add WITH STRICT STORAGE LOCALITY support
instances = instancesByLocality["default"]
}
slices.Sort(instances)
set.instanceIDs = instances
}
return numImportSpans, nil

return localitySets, nil
}

// createCompactionCorePlacements takes spans from a generator and evenly
// distributes them across nodes in the cluster, returning the core placements
// createCompactionCorePlacements takes spans from a generator and, per localitySet, evenly
// distributes matching spans across nodes in the set, returning the core placements
// reflecting that distribution.
func createCompactionCorePlacements(
ctx context.Context,
Expand All @@ -229,28 +279,18 @@ func createCompactionCorePlacements(
elideMode execinfrapb.ElidePrefix,
genSpan func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error,
spansToCompact roachpb.Spans,
sqlInstanceIDs []base.SQLInstanceID,
localitySets map[string]*localitySet,
targetSize int64,
maxFiles int64,
numEntries int,
) ([]physicalplan.ProcessorCorePlacement, error) {
numNodes := len(sqlInstanceIDs)
corePlacements := make([]physicalplan.ProcessorCorePlacement, numNodes)
for i := range corePlacements {
corePlacements[i].SQLInstanceID = sqlInstanceIDs[i]
corePlacements[i].Core.CompactBackups = &execinfrapb.CompactBackupsSpec{
JobID: int64(jobID),
DefaultURI: details.URI,
Destination: details.Destination,
Encryption: details.EncryptionOptions,
StartTime: details.StartTime,
EndTime: details.EndTime,
ElideMode: elideMode,
UserProto: user.EncodeProto(),
Spans: spansToCompact,
TargetSize: targetSize,
MaxFiles: maxFiles,
}
defaultSet, ok := localitySets["default"]
if !ok {
return nil, errors.AssertionFailedf("no default set avaliable in compaction placement planning")
}
instanceIDToIdx := make(map[base.SQLInstanceID]int)
instanceEntries := make([]roachpb.SpanGroup, len(defaultSet.instanceIDs))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not create a map[SQLInstanceID]roach.spanGroup and avoid the middle man (instandIDToIdx)?

for i, id := range defaultSet.instanceIDs {
instanceIDToIdx[id] = i
}

spanEntryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
Expand All @@ -260,45 +300,84 @@ func createCompactionCorePlacements(
return genSpan(ctx, spanEntryCh)
})
tasks = append(tasks, func(ctx context.Context) error {
numEntriesPerNode := numEntries / numNodes
leftoverEntries := numEntries % numNodes
getTargetNumEntries := func(nodeIdx int) int {
if nodeIdx < leftoverEntries {
// This more evenly distributes the leftover entries across the nodes
// after doing integer division to assign the entries to the nodes.
return numEntriesPerNode + 1
for entry := range spanEntryCh {
entryLocality, err := entryLocality(entry)
if err != nil {
return err
}
return numEntriesPerNode
}
currNode := 0
entriesAdded := 0
currEntries := roachpb.SpanGroup{}
targetNumEntries := getTargetNumEntries(currNode)

for entry := range spanEntryCh {
currEntries.Add(entry.Span)
entriesAdded++
if entriesAdded == targetNumEntries {
corePlacements[currNode].SQLInstanceID = sqlInstanceIDs[currNode]
corePlacements[currNode].Core.CompactBackups.AssignedSpans = currEntries.Slice()
set, ok := localitySets[entryLocality]
if !ok {
return errors.AssertionFailedf("no nodes match entry with locality %s", entryLocality)
}

instanceID := set.instanceIDs[set.currInstanceIdx]
instanceIdx := instanceIDToIdx[instanceID]
instanceEntries[instanceIdx].Add(entry.Span)
set.currEntriesAdded++

currNode++
targetNumEntries = getTargetNumEntries(currNode)
currEntries.Clear()
entriesAdded = 0
var targetNumEntries int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could all the logic below get tidied into a localitySet.MaybeUpdateTargetInstance() helper?

numEntriesPerNode := set.totalEntries / len(set.instanceIDs)
leftoverEntries := set.totalEntries % len(set.instanceIDs)
if set.currInstanceIdx < leftoverEntries {
// This more evenly distributes the leftover entries across the nodes
// after doing integer division to assign the entries to the nodes.
targetNumEntries = numEntriesPerNode + 1
} else {
targetNumEntries = numEntriesPerNode
}
if currNode == numNodes {
return nil
if set.currEntriesAdded == targetNumEntries {
set.currInstanceIdx++
set.currEntriesAdded = 0
}
}
return nil
})
if err := ctxgroup.GoAndWait(ctx, tasks...); err != nil {
return nil, errors.Wrapf(err, "distributing span entries to processors")
}

corePlacements := make([]physicalplan.ProcessorCorePlacement, len(defaultSet.instanceIDs))
for i, entries := range instanceEntries {
id := defaultSet.instanceIDs[i]
corePlacements[i].SQLInstanceID = id
corePlacements[i].Core.CompactBackups = &execinfrapb.CompactBackupsSpec{
JobID: int64(jobID),
DefaultURI: details.URI,
Destination: details.Destination,
Encryption: details.EncryptionOptions,
StartTime: details.StartTime,
EndTime: details.EndTime,
ElideMode: elideMode,
UserProto: user.EncodeProto(),
Spans: spansToCompact,
TargetSize: targetSize,
MaxFiles: maxFiles,
URIsByLocalityKV: details.URIsByLocalityKV,
AssignedSpans: entries.Slice(),
}
}

return corePlacements, nil
}

// entryLocality returns the locality filter string attached to the URI of the most recent file in
// the entry. If the URI does not have a locality filter attached to it, it will return "default".
func entryLocality(entry execinfrapb.RestoreSpanEntry) (string, error) {
// Note that the last file in entry.Files will be the most recent.
uri, err := url.Parse(entry.Files[len(entry.Files)-1].Dir.URI)
if err != nil {
return "", err
}
loc := uri.Query().Get("COCKROACH_LOCALITY")
switch loc {
case "", "default":
return "default", nil
default:
return loc, nil
}
}

// getSpansToCompact returns all remaining spans the backup manifest that
// need to be compacted.
func getSpansToCompact(
Expand Down
Loading