Skip to content

Commit 1fc8f79

Browse files
committed
sql/bulksst: coordinate distributed SST metadata merge with CombineFileInfo
Implements CombineFileInfo(), a coordinator utility that aggregates SST metadata from distributed workers and determines merge task spans based on sampled keys. The function combines SST file metadata from multiple workers and uses their row samples to split schema spans into merge task spans. This will be used by the new distributed merge pipeline. Resolves: #156662 Epic: CRDB-48845 Release note: none Co-authored by: @jeffswenson
1 parent 5b5000c commit 1fc8f79

File tree

4 files changed

+566
-0
lines changed

4 files changed

+566
-0
lines changed

pkg/sql/bulksst/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ go_proto_library(
2222
go_library(
2323
name = "bulksst",
2424
srcs = [
25+
"combine_file_info.go",
2526
"sst_file_allocator.go",
2627
"sst_writer.go",
2728
],
@@ -35,11 +36,13 @@ go_library(
3536
"//pkg/roachpb",
3637
"//pkg/settings",
3738
"//pkg/settings/cluster",
39+
"//pkg/sql/execinfrapb",
3840
"//pkg/storage",
3941
"//pkg/util/hlc",
4042
"//pkg/util/log",
4143
"//pkg/util/randutil",
4244
"//pkg/util/timeutil",
45+
"@com_github_cockroachdb_errors//:errors",
4346
"@com_github_cockroachdb_pebble//objstorage",
4447
"@com_github_cockroachdb_pebble//objstorage/objstorageprovider",
4548
],
@@ -48,6 +51,7 @@ go_library(
4851
go_test(
4952
name = "bulksst_test",
5053
srcs = [
54+
"combine_file_info_test.go",
5155
"main_test.go",
5256
"sst_file_allocator_test.go",
5357
"sst_writer_test.go",
@@ -64,6 +68,7 @@ go_test(
6468
"//pkg/security/securitytest",
6569
"//pkg/server",
6670
"//pkg/settings/cluster",
71+
"//pkg/sql/execinfrapb",
6772
"//pkg/storage",
6873
"//pkg/testutils/serverutils",
6974
"//pkg/testutils/testcluster",
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulksst
7+
8+
import (
9+
"bytes"
10+
"slices"
11+
12+
"github.com/cockroachdb/cockroach/pkg/roachpb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
14+
"github.com/cockroachdb/errors"
15+
)
16+
17+
// CombineFileInfo combines SST file metadata from multiple workers and
18+
// determines how to split merge work across processors.
19+
//
20+
// The function takes SST files and row samples collected earlier, and produces:
21+
// 1. A list of all SST files to be merged (consolidating metadata from all workers)
22+
// 2. Merge task spans that partition the work across merge processors
23+
//
24+
// The row samples represent keys seen during import/backfill and are used to
25+
// split schema spans into smaller merge tasks. Each merge processor will be
26+
// assigned one or more spans and will merge only the SST data within those
27+
// spans.
28+
//
29+
// For example, if schema spans are [a,z) and samples are [c, m, r], the merge
30+
// spans will be [a,c), [c,m), [m,r), [r,z), allowing four processors to work in
31+
// parallel on roughly equal amounts of data.
32+
func CombineFileInfo(
33+
files []SSTFiles, schemaSpans []roachpb.Span,
34+
) ([]execinfrapb.BulkMergeSpec_SST, []roachpb.Span, error) {
35+
// Validate that schema spans are properly ordered. This is a critical
36+
// precondition for the algorithm to work correctly.
37+
if err := validateSchemaSpansOrdered(schemaSpans); err != nil {
38+
return nil, nil, err
39+
}
40+
41+
result := make([]execinfrapb.BulkMergeSpec_SST, 0)
42+
samples := make([]roachpb.Key, 0)
43+
for _, file := range files {
44+
for _, sst := range file.SST {
45+
result = append(result, execinfrapb.BulkMergeSpec_SST{
46+
StartKey: string(sst.StartKey),
47+
EndKey: string(sst.EndKey),
48+
URI: sst.URI,
49+
})
50+
}
51+
for _, sample := range file.RowSamples {
52+
samples = append(samples, roachpb.Key(sample))
53+
}
54+
}
55+
// Sort samples to ensure merge spans are non-overlapping and contiguous.
56+
// Samples are collected from multiple workers and arrive in arbitrary order.
57+
// getMergeSpans uses these samples as split points to create merge task spans.
58+
// If samples are unsorted (e.g., ["k", "d", "a"]), getMergeSpans would create
59+
// overlapping spans that cause the same keys to be processed multiple times,
60+
// resulting in duplicate data in the output SSTs.
61+
slices.SortFunc(samples, func(i, j roachpb.Key) int {
62+
return bytes.Compare(i, j)
63+
})
64+
65+
mergeSpans, err := getMergeSpans(schemaSpans, samples)
66+
if err != nil {
67+
return nil, nil, err
68+
}
69+
70+
return result, mergeSpans, nil
71+
}
72+
73+
// getMergeSpans determines which spans should be used as merge tasks. The
74+
// output spans must fully cover the input spans. The samples are used to
75+
// determine where schema spans should be split.
76+
//
77+
// Precondition: schemaSpans must be sorted by start key and non-overlapping.
78+
// This precondition is validated in validateSchemaSpansOrdered.
79+
func getMergeSpans(schemaSpans []roachpb.Span, sortedSample []roachpb.Key) ([]roachpb.Span, error) {
80+
result := make([]roachpb.Span, 0, len(schemaSpans)+len(sortedSample))
81+
82+
for _, span := range schemaSpans {
83+
samples, consumed := getCoveredSamples(span, sortedSample)
84+
85+
// Validate: if we consumed more samples than we returned, it means some
86+
// samples were outside this span (either before it or in a gap). Since
87+
// schema spans are processed in order, any sample before this span indicates
88+
// a sample not covered by the schema spans.
89+
if consumed > len(samples) {
90+
// Find the first skipped sample for a clear error message
91+
for i := 0; i < consumed; i++ {
92+
if i >= len(samples) || !sortedSample[i].Equal(samples[i]) {
93+
return nil, errors.AssertionFailedf(
94+
"sample %q is before schema span [%q, %q); this indicates samples were collected for keys outside schema spans",
95+
sortedSample[i], span.Key, span.EndKey)
96+
}
97+
}
98+
}
99+
100+
sortedSample = sortedSample[consumed:]
101+
102+
startKey := span.Key
103+
for _, sample := range samples {
104+
// Skip samples that would create invalid (zero-length) spans.
105+
// This handles duplicates and samples at span boundaries.
106+
if bytes.Compare(sample, startKey) <= 0 {
107+
continue
108+
}
109+
result = append(result, roachpb.Span{
110+
Key: startKey,
111+
EndKey: sample,
112+
})
113+
startKey = sample
114+
}
115+
result = append(result, roachpb.Span{
116+
Key: startKey,
117+
EndKey: span.EndKey,
118+
})
119+
}
120+
121+
// Validate that all samples were contained within schema spans. Any remaining
122+
// samples indicate they were collected after the last span.
123+
if len(sortedSample) > 0 {
124+
return nil, errors.AssertionFailedf(
125+
"samples outside schema spans: %d samples remain after processing all spans, first uncovered sample: %q",
126+
len(sortedSample), sortedSample[0])
127+
}
128+
129+
return result, nil
130+
}
131+
132+
// getCoveredSamples returns the samples within the given span and the total
133+
// number of samples consumed (including any that were before the span start).
134+
func getCoveredSamples(schemaSpan roachpb.Span, sortedSamples []roachpb.Key) ([]roachpb.Key, int) {
135+
// Count how many samples are before the span start.
136+
// Since sortedSamples are sorted and schema spans are processed in order,
137+
// samples before this span's start either:
138+
// 1. Should have been covered by a previous span, or
139+
// 2. Fall in a gap between spans.
140+
startIdx := 0
141+
for startIdx < len(sortedSamples) && bytes.Compare(sortedSamples[startIdx], schemaSpan.Key) < 0 {
142+
startIdx++
143+
}
144+
145+
// Find samples within this span: [schemaSpan.Key, schemaSpan.EndKey)
146+
endIdx := startIdx
147+
for endIdx < len(sortedSamples) && bytes.Compare(sortedSamples[endIdx], schemaSpan.EndKey) < 0 {
148+
endIdx++
149+
}
150+
151+
return sortedSamples[startIdx:endIdx], endIdx
152+
}
153+
154+
// validateSchemaSpansOrdered checks that schema spans are sorted by start key
155+
// and non-overlapping. This is a precondition for getMergeSpans to work correctly.
156+
func validateSchemaSpansOrdered(schemaSpans []roachpb.Span) error {
157+
for i := 1; i < len(schemaSpans); i++ {
158+
if bytes.Compare(schemaSpans[i-1].Key, schemaSpans[i].Key) >= 0 {
159+
return errors.AssertionFailedf(
160+
"schema spans not ordered: span %d [%q, %q) >= span %d [%q, %q)",
161+
i-1, schemaSpans[i-1].Key, schemaSpans[i-1].EndKey,
162+
i, schemaSpans[i].Key, schemaSpans[i].EndKey)
163+
}
164+
if bytes.Compare(schemaSpans[i-1].EndKey, schemaSpans[i].Key) > 0 {
165+
return errors.AssertionFailedf(
166+
"schema spans overlapping: span %d ends at %q but span %d starts at %q",
167+
i-1, schemaSpans[i-1].EndKey, i, schemaSpans[i].Key)
168+
}
169+
}
170+
return nil
171+
}

0 commit comments

Comments
 (0)