Skip to content

Commit 127f0a7

Browse files
committed
kv/bulk: extract flush implementation into sst_adder.go
This commit moves parts of the SSTBatcher that will become the sstAdder into a stand alone file. Release note: none Part of: #146828
1 parent 680f175 commit 127f0a7

File tree

3 files changed

+307
-283
lines changed

3 files changed

+307
-283
lines changed

pkg/kv/bulk/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"bulk_metrics.go",
88
"kv_buf.go",
99
"setting.go",
10+
"sst_adder.go",
1011
"sst_batcher.go",
1112
],
1213
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",

pkg/kv/bulk/sst_adder.go

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulk
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
16+
"github.com/cockroachdb/cockroach/pkg/storage"
17+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
18+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/cockroach/pkg/util/retry"
21+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
22+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/errors"
24+
)
25+
26+
type sstSpan struct {
27+
start, end roachpb.Key // [inclusive, exclusive)
28+
sstBytes []byte
29+
stats enginepb.MVCCStats
30+
}
31+
32+
// addSSTable retries db.AddSSTable if retryable errors occur, including if the
33+
// SST spans a split, in which case it is iterated and split into two SSTs, one
34+
// for each side of the split in the error, and each are retried.
35+
func (b *SSTBatcher) addSSTable(
36+
ctx context.Context,
37+
batchTS hlc.Timestamp,
38+
start, end roachpb.Key,
39+
sstBytes []byte,
40+
stats enginepb.MVCCStats,
41+
updatesLastRange bool,
42+
ingestionPerformanceStats *bulkpb.IngestionPerformanceStats,
43+
) error {
44+
ctx, sp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable")
45+
defer sp.Finish()
46+
47+
sendStart := timeutil.Now()
48+
if ingestionPerformanceStats == nil {
49+
return errors.AssertionFailedf("ingestionPerformanceStats should not be nil")
50+
}
51+
52+
// Currently, the SSTBatcher cannot ingest range keys, so it is safe to
53+
// ComputeStats with an iterator that only surfaces point keys.
54+
iterOpts := storage.IterOptions{
55+
KeyTypes: storage.IterKeyTypePointsOnly,
56+
LowerBound: start,
57+
UpperBound: end,
58+
}
59+
iter, err := storage.NewMemSSTIterator(sstBytes, true, iterOpts)
60+
if err != nil {
61+
return err
62+
}
63+
defer iter.Close()
64+
65+
if (stats == enginepb.MVCCStats{}) {
66+
iter.SeekGE(storage.MVCCKey{Key: start})
67+
// NB: even though this ComputeStatsForIter call exhausts the iterator, we
68+
// can reuse/re-seek on the iterator, as part of the MVCCIterator contract.
69+
stats, err = storage.ComputeStatsForIter(iter, sendStart.UnixNano())
70+
if err != nil {
71+
return errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
72+
}
73+
}
74+
75+
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, stats: stats}}
76+
var files int
77+
for len(work) > 0 {
78+
item := work[0]
79+
work = work[1:]
80+
if err := func() error {
81+
var err error
82+
opts := retry.Options{
83+
InitialBackoff: 30 * time.Millisecond,
84+
Multiplier: 2,
85+
MaxRetries: 10,
86+
}
87+
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
88+
log.VEventf(ctx, 4, "sending %s AddSSTable [%s,%s)", sz(len(item.sstBytes)), item.start, item.end)
89+
// If this SST is "too small", the fixed costs associated with adding an
90+
// SST – in terms of triggering flushes, extra compactions, etc – would
91+
// exceed the savings we get from skipping regular, key-by-key writes,
92+
// and we're better off just putting its contents in a regular batch.
93+
// This isn't perfect: We're still incurring extra overhead constructing
94+
// SSTables just for use as a wire-format, but the rest of the
95+
// implementation of bulk-ingestion assumes certainly semantics of the
96+
// AddSSTable API - like ingest at arbitrary timestamps or collision
97+
// detection - making it is simpler to just always use the same API
98+
// and just switch how it writes its result.
99+
ingestAsWriteBatch := false
100+
if b.settings != nil && int64(len(item.sstBytes)) < tooSmallSSTSize.Get(&b.settings.SV) {
101+
log.VEventf(ctx, 3, "ingest data is too small (%d keys/%d bytes) for SSTable, adding via regular batch", item.stats.KeyCount, len(item.sstBytes))
102+
ingestAsWriteBatch = true
103+
ingestionPerformanceStats.AsWrites++
104+
}
105+
106+
req := &kvpb.AddSSTableRequest{
107+
RequestHeader: kvpb.RequestHeader{Key: item.start, EndKey: item.end},
108+
Data: item.sstBytes,
109+
DisallowShadowingBelow: b.disallowShadowingBelow,
110+
MVCCStats: &item.stats,
111+
IngestAsWrites: ingestAsWriteBatch,
112+
ReturnFollowingLikelyNonEmptySpanStart: true,
113+
}
114+
if b.writeAtBatchTS {
115+
req.SSTTimestampToRequestTimestamp = batchTS
116+
}
117+
118+
ba := &kvpb.BatchRequest{
119+
Header: kvpb.Header{Timestamp: batchTS, ClientRangeInfo: roachpb.ClientRangeInfo{ExplicitlyRequested: true}},
120+
AdmissionHeader: kvpb.AdmissionHeader{
121+
Priority: int32(b.priority),
122+
CreateTime: timeutil.Now().UnixNano(),
123+
Source: kvpb.AdmissionHeader_FROM_SQL,
124+
NoMemoryReservedAtSource: true,
125+
},
126+
}
127+
ba.Add(req)
128+
beforeSend := timeutil.Now()
129+
130+
sendCtx, sendSp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable/Send")
131+
br, pErr := b.db.NonTransactionalSender().Send(sendCtx, ba)
132+
sendSp.Finish()
133+
134+
sendTime := timeutil.Since(beforeSend)
135+
136+
ingestionPerformanceStats.SendWait += sendTime
137+
if br != nil && len(br.BatchResponse_Header.RangeInfos) > 0 {
138+
// Should only ever really be one iteration but if somehow it isn't,
139+
// e.g. if a request was redirected, go ahead and count it against all
140+
// involved stores; if it is small this edge case is immaterial, and
141+
// if it is large, it's probably one big one but we don't know which
142+
// so just blame them all (averaging it out could hide one big delay).
143+
for i := range br.BatchResponse_Header.RangeInfos {
144+
ingestionPerformanceStats.SendWaitByStore[br.BatchResponse_Header.RangeInfos[i].Lease.Replica.StoreID] += sendTime
145+
}
146+
}
147+
148+
if pErr == nil {
149+
resp := br.Responses[0].GetInner().(*kvpb.AddSSTableResponse)
150+
b.mu.Lock()
151+
if b.writeAtBatchTS {
152+
b.mu.maxWriteTS.Forward(br.Timestamp)
153+
}
154+
b.mu.Unlock()
155+
// If this was sent async then, by the time the reply gets back, it
156+
// might not be the last range anymore. We can just discard the last
157+
// range reply in this case though because async sends are only used
158+
// for SSTs sent due to range boundaries, i.e. when we are done with
159+
// with that range anyway.
160+
if updatesLastRange {
161+
b.lastRange.span = resp.RangeSpan
162+
if resp.RangeSpan.Valid() {
163+
b.lastRange.remaining = sz(resp.AvailableBytes)
164+
b.lastRange.nextExistingKey = resp.FollowingLikelyNonEmptySpanStart
165+
}
166+
}
167+
files++
168+
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, sendTime)
169+
return nil
170+
}
171+
172+
err = pErr.GoError()
173+
// Retry on AmbiguousResult.
174+
if errors.HasType(err, (*kvpb.AmbiguousResultError)(nil)) {
175+
log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v", start, end, r.CurrentAttempt(), err)
176+
continue
177+
}
178+
// This range has split -- we need to split the SST to try again.
179+
if m := (*kvpb.RangeKeyMismatchError)(nil); errors.As(err, &m) {
180+
// TODO(andrei): We just use the first of m.Ranges; presumably we
181+
// should be using all of them to avoid further retries.
182+
mr, err := m.MismatchedRange()
183+
if err != nil {
184+
return err
185+
}
186+
split := mr.Desc.EndKey.AsRawKey()
187+
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
188+
left, right, err := createSplitSSTable(ctx, item.start, split, iter, b.settings)
189+
if err != nil {
190+
return err
191+
}
192+
if err := addStatsToSplitTables(left, right, item, sendStart); err != nil {
193+
return err
194+
}
195+
// Add more work.
196+
work = append([]*sstSpan{left, right}, work...)
197+
return nil
198+
}
199+
}
200+
return err
201+
}(); err != nil {
202+
return errors.Wrapf(err, "addsstable [%s,%s)", item.start, item.end)
203+
}
204+
// explicitly deallocate SST. This will not deallocate the
205+
// top level SST which is kept around to iterate over.
206+
item.sstBytes = nil
207+
}
208+
ingestionPerformanceStats.SplitRetries += int64(files - 1)
209+
210+
log.VEventf(ctx, 3, "AddSSTable [%v, %v) added %d files and took %v", start, end, files, timeutil.Since(sendStart))
211+
return nil
212+
}
213+
214+
// createSplitSSTable is a helper for splitting up SSTs. The iterator
215+
// passed in is over the top level SST passed into AddSSTTable().
216+
func createSplitSSTable(
217+
ctx context.Context,
218+
start, splitKey roachpb.Key,
219+
iter storage.SimpleMVCCIterator,
220+
settings *cluster.Settings,
221+
) (*sstSpan, *sstSpan, error) {
222+
sstFile := &storage.MemObject{}
223+
if start.Compare(splitKey) >= 0 {
224+
return nil, nil, errors.AssertionFailedf("start key %s of original sst must be greater than than split key %s", start, splitKey)
225+
}
226+
w := storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
227+
defer w.Close()
228+
229+
split := false
230+
var first, last roachpb.Key
231+
var left, right *sstSpan
232+
233+
iter.SeekGE(storage.MVCCKey{Key: start})
234+
for {
235+
if ok, err := iter.Valid(); err != nil {
236+
return nil, nil, err
237+
} else if !ok {
238+
break
239+
}
240+
241+
key := iter.UnsafeKey()
242+
243+
if !split && key.Key.Compare(splitKey) >= 0 {
244+
err := w.Finish()
245+
if err != nil {
246+
return nil, nil, err
247+
}
248+
249+
left = &sstSpan{start: first, end: last.Next(), sstBytes: sstFile.Data()}
250+
*sstFile = storage.MemObject{}
251+
w = storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
252+
split = true
253+
first = nil
254+
last = nil
255+
}
256+
257+
if len(first) == 0 {
258+
first = append(first[:0], key.Key...)
259+
}
260+
last = append(last[:0], key.Key...)
261+
262+
v, err := iter.UnsafeValue()
263+
if err != nil {
264+
return nil, nil, err
265+
}
266+
if err := w.Put(key, v); err != nil {
267+
return nil, nil, err
268+
}
269+
270+
iter.Next()
271+
}
272+
273+
err := w.Finish()
274+
if err != nil {
275+
return nil, nil, err
276+
}
277+
if !split {
278+
return nil, nil, errors.AssertionFailedf("split key %s after last key %s", splitKey, last.Next())
279+
}
280+
right = &sstSpan{start: first, end: last.Next(), sstBytes: sstFile.Data()}
281+
return left, right, nil
282+
}
283+
284+
// addStatsToSplitTables computes the stats of the new lhs and rhs SSTs by
285+
// computing the rhs sst stats, then computing the lhs stats as
286+
// originalStats-rhsStats.
287+
func addStatsToSplitTables(left, right, original *sstSpan, sendStartTimestamp time.Time) error {
288+
// Needs a new iterator with new bounds.
289+
statsIter, err := storage.NewMemSSTIterator(original.sstBytes, true, storage.IterOptions{
290+
KeyTypes: storage.IterKeyTypePointsOnly,
291+
LowerBound: right.start,
292+
UpperBound: right.end,
293+
})
294+
if err != nil {
295+
return err
296+
}
297+
statsIter.SeekGE(storage.MVCCKey{Key: right.start})
298+
right.stats, err = storage.ComputeStatsForIter(statsIter, sendStartTimestamp.Unix())
299+
statsIter.Close()
300+
if err != nil {
301+
return err
302+
}
303+
left.stats = original.stats
304+
left.stats.Subtract(right.stats)
305+
return nil
306+
}

0 commit comments

Comments
 (0)