Skip to content

Commit ebcf35e

Browse files
authored
Merge pull request #151838 from fqazi/blathers/backport-release-25.3-150238
release-25.3: sql: add support for generating split points from table statistics
2 parents 91744d2 + a408fb1 commit ebcf35e

File tree

6 files changed

+277
-77
lines changed

6 files changed

+277
-77
lines changed

pkg/cmd/roachtest/tests/schemachange.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,10 @@ func makeSchemaChangeBulkIngestTest(
391391
db := c.Conn(ctx, t.L(), 1)
392392
defer db.Close()
393393

394+
t.L().Printf("Computing table statistics manually")
395+
if _, err := db.Exec("CREATE STATISTICS stats from bulkingest.bulkingest"); err != nil {
396+
t.Fatal(err)
397+
}
394398
if !c.IsLocal() {
395399
// Wait for the load generator to run for a few minutes before creating the index.
396400
sleepInterval := time.Minute * 5

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ go_library(
484484
"//pkg/sql/row",
485485
"//pkg/sql/rowcontainer",
486486
"//pkg/sql/rowenc",
487+
"//pkg/sql/rowenc/keyside",
487488
"//pkg/sql/rowexec",
488489
"//pkg/sql/rowinfra",
489490
"//pkg/sql/scheduledlogging",
@@ -695,6 +696,7 @@ go_test(
695696
"grant_revoke_test.go",
696697
"grant_role_test.go",
697698
"index_mutation_test.go",
699+
"index_split_scatter_test.go",
698700
"indexbackfiller_test.go",
699701
"instrumentation_test.go",
700702
"internal_test.go",

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,6 +1896,10 @@ type ExecutorTestingKnobs struct {
18961896
// AfterArbiterRead, if set, will be called after each row read from an arbiter index
18971897
// for an UPSERT or INSERT.
18981898
AfterArbiterRead func()
1899+
1900+
// BeforeIndexSplitAndScatter is invoked with the split and scatter of an index
1901+
// occurs.
1902+
BeforeIndexSplitAndScatter func(splitPoints [][]byte)
18991903
}
19001904

19011905
// PGWireTestingKnobs contains knobs for the pgwire module.

pkg/sql/index_split_scatter.go

Lines changed: 174 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
package sql
77

88
import (
9+
"bytes"
910
"context"
1011
"math/rand"
12+
"sort"
1113
"time"
1214

1315
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -19,31 +21,177 @@ import (
1921
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2022
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2123
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
24+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
2225
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
2326
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
27+
"github.com/cockroachdb/cockroach/pkg/sql/stats"
2428
"github.com/cockroachdb/cockroach/pkg/util/encoding"
2529
"github.com/cockroachdb/cockroach/pkg/util/log"
2630
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
31+
"github.com/cockroachdb/errors"
2732
)
2833

2934
type indexSplitAndScatter struct {
30-
db *kv.DB
31-
codec keys.SQLCodec
32-
sv *settings.Values
33-
rangeIter rangedesc.IteratorFactory
34-
nodeDescs kvclient.NodeDescStore
35+
db *kv.DB
36+
codec keys.SQLCodec
37+
sv *settings.Values
38+
rangeIter rangedesc.IteratorFactory
39+
nodeDescs kvclient.NodeDescStore
40+
statsCache *stats.TableStatisticsCache
41+
testingKnobs *ExecutorTestingKnobs
3542
}
3643

44+
var SplitAndScatterWithStats = settings.RegisterBoolSetting(
45+
settings.ApplicationLevel,
46+
"schemachanger.backfiller.split_with_stats.enabled",
47+
"when enabled the index backfiller will generate split and "+
48+
"scatter points based table statistics",
49+
false,
50+
)
51+
3752
// NewIndexSplitAndScatter creates a new scexec.IndexSpanSplitter implementation.
3853
func NewIndexSplitAndScatter(execCfg *ExecutorConfig) scexec.IndexSpanSplitter {
39-
4054
return &indexSplitAndScatter{
41-
db: execCfg.DB,
42-
codec: execCfg.Codec,
43-
sv: &execCfg.Settings.SV,
44-
rangeIter: execCfg.RangeDescIteratorFactory,
45-
nodeDescs: execCfg.NodeDescs,
55+
db: execCfg.DB,
56+
codec: execCfg.Codec,
57+
sv: &execCfg.Settings.SV,
58+
rangeIter: execCfg.RangeDescIteratorFactory,
59+
nodeDescs: execCfg.NodeDescs,
60+
statsCache: execCfg.TableStatsCache,
61+
testingKnobs: &execCfg.TestingKnobs,
62+
}
63+
}
64+
65+
func (is *indexSplitAndScatter) getSplitPointsWithStats(
66+
ctx context.Context, table catalog.TableDescriptor, indexToBackfill catalog.Index, nSplits int,
67+
) ([][]byte, error) {
68+
// Split and scatter with statistics is disabled.
69+
if !SplitAndScatterWithStats.Get(is.sv) {
70+
return nil, nil
71+
}
72+
// Fetch the current statistics for this table.
73+
tableStats, err := is.statsCache.GetTableStats(ctx, table, nil)
74+
if err != nil {
75+
return nil, err
76+
}
77+
// Nothing can be done since no stats exist.
78+
if len(tableStats) == 0 {
79+
return nil, errors.New("no stats exist for this table")
80+
}
81+
// Gather the latest stats for each column.
82+
keyCols := indexToBackfill.CollectKeyColumnIDs()
83+
statsForColumns := make(map[descpb.ColumnID]*stats.TableStatistic)
84+
keyCols.ForEach(func(col descpb.ColumnID) {
85+
for _, stat := range tableStats {
86+
// Skip stats that:
87+
// 1) Do not contain this column.
88+
// 2) Consist of multiple columns.
89+
// 3) Have no histogram information.
90+
if stat.Histogram == nil || len(stat.ColumnIDs) != 1 || stat.ColumnIDs[0] != col {
91+
continue
92+
}
93+
statsForColumns[col] = stat
94+
break
95+
}
96+
})
97+
rowsPerRange := tableStats[0].RowCount / uint64(nSplits)
98+
// Helper function that will append split points, and if necessary, downsample
99+
// them if they get too big.
100+
var splitPoints [][]byte
101+
appendAndShrinkSplitPoint := func(existing [][]byte, add []byte) [][]byte {
102+
maxSplitPoints := nSplits * 2
103+
if len(existing) < maxSplitPoints {
104+
return append(existing, add)
105+
}
106+
// Otherwise, we can sample these split points.
107+
sort.Slice(existing, func(i, j int) bool {
108+
return bytes.Compare(existing[i], existing[j]) < 0
109+
})
110+
// Next get this down to capacity again by taking a uniform sample of the
111+
// existing split points.
112+
newSplitPoints := make([][]byte, 0, nSplits+1)
113+
step := float64(len(existing)) / float64(nSplits)
114+
for i := 0; i < nSplits; i++ {
115+
newSplitPoints = append(newSplitPoints, existing[int(float64(i)*step)])
116+
}
117+
newSplitPoints = append(newSplitPoints, add)
118+
return newSplitPoints
46119
}
120+
// The following code generates split points for an index by iterating through
121+
// each column of the index. For each column, it uses histogram statistics to
122+
// identify points where the data can be divided into chunks of a target size
123+
// (`rowsPerRange`).
124+
//
125+
// For the first column, it creates initial split points. For each subsequent
126+
// column, it expands on the previously generated split points. It does this by
127+
// appending the new column's split values to each of the existing split points from
128+
// prior columns. This causes us to iterate combinatorially over all possible split points,
129+
// so the `appendAndShrinkSplitPoint` function is used to downsample and keep the total number
130+
// of points controlled.
131+
132+
// Note: Sadly, only the primary key or columns in indexes will have
133+
// detailed information that we can use. All other columns will have
134+
// limited splits.
135+
for colIdx := 0; colIdx < indexToBackfill.NumKeyColumns(); colIdx++ {
136+
lastSplitPoints := append([][]byte{}, splitPoints...)
137+
splitPoints = splitPoints[:0]
138+
keyColID := indexToBackfill.GetKeyColumnID(colIdx)
139+
// Look up the stats and skip if they are missing.
140+
stat, ok := statsForColumns[keyColID]
141+
if !ok {
142+
break
143+
}
144+
numInBucket := uint64(0)
145+
for bucketIdx, bucket := range stat.Histogram {
146+
numInBucket += uint64(bucket.NumRange) + uint64(bucket.NumEq)
147+
// If we have hit the target rows, then emit a split point. Or
148+
// if we are on the last bucket, we should always emit one.
149+
if numInBucket >= rowsPerRange || bucketIdx == len(stat.Histogram)-1 {
150+
var prevKeys [][]byte
151+
// For the first column, we are going to start fresh with the base index prefix.
152+
if colIdx == 0 {
153+
prevKeys = [][]byte{is.codec.IndexPrefix(uint32(table.GetID()), uint32(indexToBackfill.GetID()))}
154+
} else {
155+
// For later columns we are going to start with the previous sets of splits.
156+
prevKeys = lastSplitPoints
157+
}
158+
// We don't know where later columns fall, so we will encode these
159+
// against all the previous split points (sadly, this will have an exponential
160+
// cost). Our limit on the number of split points will resample these if they
161+
// become excessive.
162+
for _, prevKey := range prevKeys {
163+
// Copy the base value before appending the next part of the key.
164+
if colIdx > 0 {
165+
tempKey := make([]byte, len(prevKey), cap(prevKey))
166+
copy(tempKey, prevKey)
167+
prevKey = tempKey
168+
}
169+
newSplit, err := keyside.Encode(prevKey, bucket.UpperBound, encoding.Direction(indexToBackfill.GetKeyColumnDirection(colIdx)+1))
170+
if err != nil {
171+
return nil, err
172+
}
173+
splitPoints = appendAndShrinkSplitPoint(splitPoints, newSplit)
174+
}
175+
numInBucket = 0
176+
continue
177+
}
178+
}
179+
// Stop once enough partitions have been created. Or if no partitions exist,
180+
// then there is insufficient data for an educated guess. As we process later
181+
// columns, we end up creating all possible permutations of the previous split
182+
// points we selected, which means the statistical likelihood of a valid split
183+
// point getting selected only gets lower.
184+
if len(splitPoints) >= nSplits || len(splitPoints) == 0 {
185+
break
186+
}
187+
}
188+
// Always emit a split point at the start of the index span if
189+
// we generated any split points above
190+
if len(splitPoints) > 0 {
191+
splitPoints = append(splitPoints, is.codec.IndexPrefix(uint32(table.GetID()), uint32(indexToBackfill.GetID())))
192+
log.Infof(ctx, "generated %d split points from statistics for tableId=%d index=%d", len(splitPoints), table.GetID(), indexToBackfill.GetID())
193+
}
194+
return splitPoints, nil
47195
}
48196

49197
// MaybeSplitIndexSpans implements the scexec.IndexSpanSplitter interface.
@@ -121,6 +269,13 @@ func (is *indexSplitAndScatter) MaybeSplitIndexSpans(
121269
splitPoints = append(splitPoints, newStartKey)
122270
}
123271

272+
if len(splitPoints) == 0 {
273+
splitPoints, err = is.getSplitPointsWithStats(ctx, table, indexToBackfill, nSplits)
274+
if err != nil {
275+
log.Warningf(ctx, "unable to get split points for stats for tableID=%d index=%d due to %v", tableID, indexToBackfill.GetID(), err)
276+
}
277+
}
278+
124279
if len(splitPoints) == 0 {
125280
// If we can't sample splits from another index, just add one split.
126281
log.Infof(ctx, "making a single split point in tableId=%d index=%d", tableID, indexToBackfill.GetID())
@@ -130,6 +285,10 @@ func (is *indexSplitAndScatter) MaybeSplitIndexSpans(
130285
if err != nil {
131286
return err
132287
}
288+
// Execute the testing knob before adding a split.
289+
if is.testingKnobs.BeforeIndexSplitAndScatter != nil {
290+
is.testingKnobs.BeforeIndexSplitAndScatter([][]byte{splitKey})
291+
}
133292
// We split without scattering here because there is only one split point,
134293
// so scattering wouldn't spread that much load.
135294
return is.db.AdminSplit(ctx, splitKey, expirationTime)
@@ -143,6 +302,10 @@ func (is *indexSplitAndScatter) MaybeSplitIndexSpans(
143302
if step < 1 {
144303
step = 1
145304
}
305+
// Execute the testing knob before the split and scatter.
306+
if is.testingKnobs.BeforeIndexSplitAndScatter != nil {
307+
is.testingKnobs.BeforeIndexSplitAndScatter(splitPoints)
308+
}
146309
for i := 0; i < nSplits; i++ {
147310
// Evenly space out the ranges that we select from the ranges that are
148311
// returned.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package sql_test
7+
8+
import (
9+
"context"
10+
"sync/atomic"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/testutils"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
18+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
19+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
20+
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
// TestIndexSplitAndScatterWithStats tests the creation of indexes on tables with statistics,
25+
// where the splits will be generated using statistics on the table.
26+
func TestIndexSplitAndScatterWithStats(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
defer log.Scope(t).Close(t)
29+
// This test can be fairly slow and timeout under race / duress.
30+
skip.UnderDuress(t)
31+
32+
testutils.RunTrueAndFalse(t, "StatsCreated", func(t *testing.T, statsExist bool) {
33+
ctx := context.Background()
34+
var splitHookEnabled atomic.Bool
35+
var observedSplitPoints atomic.Int64
36+
const numNodes = 3
37+
cluster := serverutils.StartCluster(t, numNodes, base.TestClusterArgs{
38+
ServerArgs: base.TestServerArgs{
39+
Knobs: base.TestingKnobs{
40+
SQLExecutor: &sql.ExecutorTestingKnobs{
41+
BeforeIndexSplitAndScatter: func(splitPoints [][]byte) {
42+
if !splitHookEnabled.Load() {
43+
return
44+
}
45+
observedSplitPoints.Swap(int64(len(splitPoints)))
46+
},
47+
},
48+
},
49+
},
50+
})
51+
defer cluster.Stopper().Stop(ctx)
52+
runner := sqlutils.MakeSQLRunner(cluster.ServerConn(0))
53+
// Enable split and scatter with stats
54+
runner.Exec(t, "SET CLUSTER SETTING schemachanger.backfiller.split_with_stats.enabled = true")
55+
// Disable automatic statistics.
56+
runner.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false")
57+
// Create and populate the tables.
58+
runner.Exec(t, "CREATE TABLE multi_column_split (b bool, n uuid PRIMARY KEY)")
59+
runner.Exec(t, "INSERT INTO multi_column_split (SELECT true, uuid_generate_v1() FROM generate_series(1, 5000))")
60+
runner.Exec(t, "INSERT INTO multi_column_split (SELECT false, uuid_generate_v1() FROM generate_series(1, 5000))")
61+
// Generate statistics for these tables.
62+
if statsExist {
63+
runner.Exec(t, "CREATE STATISTICS st FROM multi_column_split")
64+
}
65+
// Next create indexes on both tables.
66+
splitHookEnabled.Store(true)
67+
observedSplitPoints.Store(0)
68+
runner.Exec(t, "CREATE INDEX ON multi_column_split (b, n)")
69+
// Assert that we generated the target number of split points
70+
// automatically.
71+
if !statsExist {
72+
require.Equal(t, int64(1), observedSplitPoints.Load())
73+
} else {
74+
expectedCount := sql.PreservedSplitCountMultiple.Get(&cluster.Server(0).ClusterSettings().SV) * numNodes
75+
require.Greaterf(t, observedSplitPoints.Load(), expectedCount,
76+
"expected %d split points, got %d", expectedCount, observedSplitPoints.Load())
77+
}
78+
splitHookEnabled.Swap(false)
79+
})
80+
81+
}

0 commit comments

Comments
 (0)