Skip to content

Commit f1e8461

Browse files
committed
sql/bulkingest: add range split and scatter utilities
Adds split and scatter utilities for distributed merge from the prototype branch. These utilities prepare the KV keyspace for efficient bulk ingest by pre-splitting ranges at SST boundaries and distributing them across cluster nodes. Closes #156574 Epic: CRDB-48845 Release note: none Co-authored by: @jeffswenson
1 parent 2e5f8a0 commit f1e8461

File tree

11 files changed

+955
-7
lines changed

11 files changed

+955
-7
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ ALL_TESTS = [
397397
"//pkg/sql/appstatspb:appstatspb_test",
398398
"//pkg/sql/auditlogging:auditlogging_test",
399399
"//pkg/sql/backfill:backfill_test",
400+
"//pkg/sql/bulkingest:bulkingest_test",
400401
"//pkg/sql/bulksst:bulksst_test",
401402
"//pkg/sql/bulkutil:bulkutil_test",
402403
"//pkg/sql/cacheutil:cacheutil_test",
@@ -1886,6 +1887,8 @@ GO_TARGETS = [
18861887
"//pkg/sql/auditlogging:auditlogging_test",
18871888
"//pkg/sql/backfill:backfill",
18881889
"//pkg/sql/backfill:backfill_test",
1890+
"//pkg/sql/bulkingest:bulkingest",
1891+
"//pkg/sql/bulkingest:bulkingest_test",
18891892
"//pkg/sql/bulksst:bulksst",
18901893
"//pkg/sql/bulksst:bulksst_test",
18911894
"//pkg/sql/bulkutil:bulkutil",

pkg/kv/db.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,9 +676,15 @@ func (db *DB) AdminSplit(
676676
// if the range is large.
677677
func (db *DB) AdminScatter(
678678
ctx context.Context, key roachpb.Key, maxSize int64,
679+
) (*kvpb.AdminScatterResponse, error) {
680+
return db.sendAdminScatterRequest(ctx, roachpb.Span{Key: key, EndKey: key.Next()}, maxSize)
681+
}
682+
683+
func (db *DB) sendAdminScatterRequest(
684+
ctx context.Context, span roachpb.Span, maxSize int64,
679685
) (*kvpb.AdminScatterResponse, error) {
680686
scatterReq := &kvpb.AdminScatterRequest{
681-
RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{Key: key, EndKey: key.Next()}),
687+
RequestHeader: kvpb.RequestHeaderFromSpan(span),
682688
RandomizeLeases: true,
683689
MaxSize: maxSize,
684690
}
@@ -693,6 +699,13 @@ func (db *DB) AdminScatter(
693699
return resp, nil
694700
}
695701

702+
// AdminScatterSpan scatters the ranges that overlap the specified span.
703+
func (db *DB) AdminScatterSpan(
704+
ctx context.Context, span roachpb.Span,
705+
) (*kvpb.AdminScatterResponse, error) {
706+
return db.sendAdminScatterRequest(ctx, span, 0 /* maxSize */)
707+
}
708+
696709
// AdminUnsplit removes the sticky bit of the range specified by splitKey.
697710
//
698711
// splitKey is the start key of the range whose sticky bit should be removed.

pkg/sql/bulkingest/BUILD.bazel

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "bulkingest",
5+
srcs = [
6+
"split.go",
7+
"split_picker.go",
8+
],
9+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkingest",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//pkg/keys",
13+
"//pkg/kv",
14+
"//pkg/roachpb",
15+
"//pkg/sql/execinfrapb",
16+
"//pkg/util/log",
17+
"@com_github_cockroachdb_errors//:errors",
18+
],
19+
)
20+
21+
go_test(
22+
name = "bulkingest_test",
23+
srcs = [
24+
"main_test.go",
25+
"split_picker_test.go",
26+
"split_test.go",
27+
],
28+
embed = [":bulkingest"],
29+
deps = [
30+
"//pkg/base",
31+
"//pkg/keys",
32+
"//pkg/kv/kvclient/kvtenant",
33+
"//pkg/roachpb",
34+
"//pkg/security/securityassets",
35+
"//pkg/security/securitytest",
36+
"//pkg/server",
37+
"//pkg/sql/catalog/descpb",
38+
"//pkg/sql/execinfrapb",
39+
"//pkg/testutils/serverutils",
40+
"//pkg/testutils/sqlutils",
41+
"//pkg/testutils/testcluster",
42+
"//pkg/util/encoding",
43+
"//pkg/util/leaktest",
44+
"//pkg/util/log",
45+
"//pkg/util/randutil",
46+
"@com_github_stretchr_testify//require",
47+
],
48+
)

pkg/sql/bulkingest/main_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkingest
7+
8+
import (
9+
"os"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
13+
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
14+
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
15+
"github.com/cockroachdb/cockroach/pkg/server"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
18+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
19+
)
20+
21+
//go:generate ../util/leaktest/add-leaktest.sh *_test.go
22+
23+
func TestMain(m *testing.M) {
24+
securityassets.SetLoader(securitytest.EmbeddedAssets)
25+
randutil.SeedForTests()
26+
serverutils.InitTestServerFactory(server.TestServerFactory)
27+
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
28+
kvtenant.InitTestConnectorFactory()
29+
os.Exit(m.Run())
30+
}

pkg/sql/bulkingest/split.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkingest
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/util/log"
15+
)
16+
17+
func splitAndScatterSpans(ctx context.Context, db *kv.DB, spans []roachpb.Span) error {
18+
// TODO(#156857): when distributing work, we should re-scatter ranges if
19+
// there are a small number of nodes with remaining work.
20+
for _, span := range spans {
21+
expirationTime := db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
22+
if err := db.AdminSplit(ctx, span.Key, expirationTime); err != nil {
23+
return err
24+
}
25+
}
26+
for _, span := range spans {
27+
if _, err := db.AdminScatterSpan(ctx, span); err != nil {
28+
log.Dev.Errorf(ctx, "failed to scatter span [%s,%s): %+v",
29+
span.Key, span.EndKey, err)
30+
}
31+
}
32+
return nil
33+
}

pkg/sql/bulkingest/split_picker.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkingest
7+
8+
import (
9+
"github.com/cockroachdb/cockroach/pkg/keys"
10+
"github.com/cockroachdb/cockroach/pkg/roachpb"
11+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// pickSplits picks which spans to split on based on the input SSTs. The splits are chosen
16+
// so that each SST is contained within exactly one output span. The output spans are contiguous
17+
// and non-overlapping. Splits are chosen based on the start key of the following SST.
18+
//
19+
// Every SST must be contained within exactly one of the input spans.
20+
//
21+
// The input spans must be ordered by start key and be non-overlapping. The
22+
// input SSTs must be contained within the input spans, must be ordered by start
23+
// key, and must be non-overlapping.
24+
func pickSplits(
25+
spans []roachpb.Span, ssts []execinfrapb.BulkMergeSpec_SST,
26+
) ([]roachpb.Span, error) {
27+
if len(ssts) == 0 {
28+
return spans, nil
29+
}
30+
if len(spans) == 0 {
31+
return nil, errors.New("no spans provided")
32+
}
33+
34+
// Validate spans are ordered and non-overlapping
35+
for i := 1; i < len(spans); i++ {
36+
if !less(spans[i-1].Key, spans[i].Key) {
37+
return nil, errors.Newf("spans not ordered: %s >= %s", spans[i-1].Key, spans[i].Key)
38+
}
39+
if overlaps(spans[i-1], spans[i]) {
40+
return nil, errors.Newf("spans are overlapping: %s overlaps with %s", spans[i-1].EndKey, spans[i].Key)
41+
}
42+
}
43+
44+
// Validate SSTs are ordered and non-overlapping.
45+
for i := 1; i < len(ssts); i++ {
46+
prev, curr := roachpb.Key(ssts[i-1].StartKey), roachpb.Key(ssts[i].StartKey)
47+
if !less(prev, curr) {
48+
return nil, errors.Newf("SSTs not in order: %s >= %s", prev, curr)
49+
}
50+
if overlaps(spanFromSST(ssts[i-1]), spanFromSST(ssts[i])) {
51+
return nil, errors.Newf("overlapping SSTs: %s overlaps with %s", ssts[i-1].EndKey, ssts[i].StartKey)
52+
}
53+
}
54+
55+
result := make([]roachpb.Span, 0, len(ssts))
56+
sstIdx := 0
57+
58+
for _, span := range spans {
59+
spanSSTStartIdx := sstIdx
60+
for ; sstIdx < len(ssts); sstIdx++ {
61+
sstStart := roachpb.Key(ssts[sstIdx].StartKey)
62+
if !less(sstStart, span.EndKey) {
63+
break
64+
}
65+
66+
sstEnd := roachpb.Key(ssts[sstIdx].EndKey)
67+
if !less(sstEnd, span.EndKey) && !sstEnd.Equal(span.EndKey) {
68+
return nil, errors.Newf("SST ending at %s extends beyond containing span ending at %s",
69+
sstEnd, span.EndKey)
70+
}
71+
if less(sstStart, span.Key) {
72+
return nil, errors.Newf("SST starting at %s begins before containing span starting at %s",
73+
sstStart, span.Key)
74+
}
75+
}
76+
77+
if spanSSTStartIdx == sstIdx {
78+
result = append(result, span)
79+
continue
80+
}
81+
82+
spanSplits, err := pickSplitsForSpan(span, ssts[spanSSTStartIdx:sstIdx])
83+
if err != nil {
84+
return nil, err
85+
}
86+
result = append(result, spanSplits...)
87+
}
88+
89+
// Ensure all SSTs were contained within the input spans.
90+
if sstIdx < len(ssts) {
91+
return nil, errors.Newf("SST starting at %s not contained in any span", ssts[sstIdx].StartKey)
92+
}
93+
94+
return result, nil
95+
}
96+
97+
// pickSplitsForSpan splits a single span based on the SSTs that overlap with
98+
// it. The output spans cover the entire input span, are non-overlapping, and
99+
// are contiguous. Each output span is assigned exactly one SST.
100+
//
101+
// This function validates that SST boundaries are already at safe split points
102+
// (i.e., row boundaries, not column family boundaries).
103+
func pickSplitsForSpan(
104+
span roachpb.Span, ssts []execinfrapb.BulkMergeSpec_SST,
105+
) ([]roachpb.Span, error) {
106+
if len(ssts) == 0 {
107+
return []roachpb.Span{span}, nil
108+
}
109+
110+
result := make([]roachpb.Span, 0, len(ssts))
111+
112+
spanStart := span.Key
113+
114+
for i := 1; i < len(ssts); i++ {
115+
splitPoint := roachpb.Key(ssts[i].StartKey)
116+
117+
// Validate that the split point is already at a safe split point.
118+
safeSplitPoint, err := keys.EnsureSafeSplitKey(splitPoint)
119+
if err != nil {
120+
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "SST %d has unsafe start key %s", i, splitPoint)
121+
}
122+
if !safeSplitPoint.Equal(splitPoint) {
123+
return nil, errors.AssertionFailedf(
124+
"SST %d start key %s is not at a safe split point (safe point would be %s); "+
125+
"SST writer should have ensured safe boundaries",
126+
i, splitPoint, safeSplitPoint)
127+
}
128+
129+
result = append(result, roachpb.Span{
130+
Key: spanStart,
131+
EndKey: splitPoint,
132+
})
133+
spanStart = splitPoint
134+
}
135+
136+
// Create the final span covering from the last split point to the end
137+
result = append(result, roachpb.Span{
138+
Key: spanStart,
139+
EndKey: span.EndKey,
140+
})
141+
142+
return result, nil
143+
}
144+
145+
// less returns true if a is less than b
146+
func less(a, b roachpb.Key) bool {
147+
return a.Compare(b) < 0
148+
}
149+
150+
// overlaps returns true if span a overlaps with span b
151+
func overlaps(a, b roachpb.Span) bool {
152+
// Two spans overlap if one's end key is greater than the other's start key
153+
return a.EndKey.Compare(b.Key) > 0 && b.EndKey.Compare(a.Key) > 0
154+
}
155+
156+
// spanFromSST returns the span that matches the SST's start and end keys.
157+
func spanFromSST(sst execinfrapb.BulkMergeSpec_SST) roachpb.Span {
158+
return roachpb.Span{
159+
Key: roachpb.Key(sst.StartKey),
160+
EndKey: roachpb.Key(sst.EndKey),
161+
}
162+
}

0 commit comments

Comments
 (0)