Skip to content

Commit d9c3a94

Browse files
craig[bot]jeffswenson
andcommitted
Merge #148152
148152: kv/bulk: extract sst add logic from sst batcher r=jeffswenson a=jeffswenson ## kv/bulk: extract sst add logic from sst batcher This extracts the logic related to flushing a single sst from the logic that controls batching and flushing. Release note: none Part of: #146828 ## kv/bulk: extract flush implementation into sst_adder.go This commit moves parts of the SSTBatcher that will become the sstAdder into a stand alone file. Release note: none Part of: #146828 Co-authored-by: Jeff Swenson <[email protected]>
2 parents fea6fe5 + 3641703 commit d9c3a94

File tree

4 files changed

+385
-302
lines changed

4 files changed

+385
-302
lines changed

pkg/kv/bulk/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"bulk_metrics.go",
88
"kv_buf.go",
99
"setting.go",
10+
"sst_adder.go",
1011
"sst_batcher.go",
1112
],
1213
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",

pkg/kv/bulk/buffering_adder.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,23 @@ func MakeBulkAdder(
111111
name: opts.Name,
112112
importEpoch: opts.ImportEpoch,
113113
sink: SSTBatcher{
114-
name: opts.Name,
115-
db: db,
116-
rc: rangeCache,
114+
name: opts.Name,
115+
db: db,
116+
rc: rangeCache,
117+
adder: newSSTAdder(
118+
db,
119+
settings,
120+
opts.WriteAtBatchTimestamp,
121+
opts.DisallowShadowingBelow,
122+
admissionpb.BulkNormalPri,
123+
),
117124
settings: settings,
118125
skipDuplicates: opts.SkipDuplicates,
119126
disallowShadowingBelow: opts.DisallowShadowingBelow,
120127
batchTS: opts.BatchTimestamp,
121128
writeAtBatchTS: opts.WriteAtBatchTimestamp,
122129
mem: bulkMon.MakeConcurrentBoundAccount(),
123130
limiter: sendLimiter,
124-
priority: admissionpb.BulkNormalPri,
125131
},
126132
timestamp: timestamp,
127133
maxBufferLimit: opts.MaxBufferSize,

pkg/kv/bulk/sst_adder.go

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulk
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv"
13+
"github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
15+
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/storage"
18+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
19+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
20+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/cockroach/pkg/util/retry"
23+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
24+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
25+
"github.com/cockroachdb/errors"
26+
)
27+
28+
type sstAdder struct {
29+
settings *cluster.Settings
30+
db *kv.DB
31+
32+
// disallowShadowingBelow is described on kvpb.AddSSTableRequest.
33+
disallowShadowingBelow hlc.Timestamp
34+
35+
// priority is the admission priority used for AddSSTable
36+
// requests.
37+
priority admissionpb.WorkPriority
38+
39+
// writeAtBatchTS is true if the SST should be written at the
40+
// batch timestamp. If this is set to true, then all the keys in
41+
//the sst must have the same timestamp and they must be equal to
42+
// the batch timestamp.
43+
// TODO(jeffswenson): remove this from `sstAdder` in a stand alone PR.
44+
writeAtBatchTS bool
45+
}
46+
47+
func newSSTAdder(
48+
db *kv.DB,
49+
settings *cluster.Settings,
50+
writeAtBatchTS bool,
51+
disallowShadowingBelow hlc.Timestamp,
52+
priority admissionpb.WorkPriority,
53+
) *sstAdder {
54+
return &sstAdder{
55+
db: db,
56+
disallowShadowingBelow: disallowShadowingBelow,
57+
priority: priority,
58+
settings: settings,
59+
writeAtBatchTS: writeAtBatchTS,
60+
}
61+
}
62+
63+
type sstSpan struct {
64+
start, end roachpb.Key // [inclusive, exclusive)
65+
sstBytes []byte
66+
stats enginepb.MVCCStats
67+
}
68+
69+
type addSSTResult struct {
70+
timestamp hlc.Timestamp
71+
availableBytes int64
72+
followingLikelyNonEmptySpanStart roachpb.Key
73+
rangeSpan roachpb.Span
74+
}
75+
76+
// addSSTable retries db.AddSSTable if retryable errors occur, including if the
77+
// SST spans a split, in which case it is iterated and split into two SSTs, one
78+
// for each side of the split in the error, and each are retried.
79+
func (a *sstAdder) AddSSTable(
80+
ctx context.Context,
81+
batchTS hlc.Timestamp,
82+
start, end roachpb.Key,
83+
sstBytes []byte,
84+
stats enginepb.MVCCStats,
85+
ingestionPerformanceStats *bulkpb.IngestionPerformanceStats,
86+
) ([]addSSTResult, error) {
87+
ctx, sp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable")
88+
defer sp.Finish()
89+
90+
sendStart := timeutil.Now()
91+
if ingestionPerformanceStats == nil {
92+
return nil, errors.AssertionFailedf("ingestionPerformanceStats should not be nil")
93+
}
94+
95+
// Currently, the SSTBatcher cannot ingest range keys, so it is safe to
96+
// ComputeStats with an iterator that only surfaces point keys.
97+
iterOpts := storage.IterOptions{
98+
KeyTypes: storage.IterKeyTypePointsOnly,
99+
LowerBound: start,
100+
UpperBound: end,
101+
}
102+
iter, err := storage.NewMemSSTIterator(sstBytes, true, iterOpts)
103+
if err != nil {
104+
return nil, err
105+
}
106+
defer iter.Close()
107+
108+
if stats == (enginepb.MVCCStats{}) {
109+
// TODO(jeffswenson): Audit AddSST callers to see if they generate
110+
// server side stats now. Accurately computing stats in the face of replays
111+
// requires the server to do it.
112+
iter.SeekGE(storage.MVCCKey{Key: start})
113+
// NB: even though this ComputeStatsForIter call exhausts the iterator, we
114+
// can reuse/re-seek on the iterator, as part of the MVCCIterator contract.
115+
stats, err = storage.ComputeStatsForIter(iter, sendStart.UnixNano())
116+
if err != nil {
117+
return nil, errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
118+
}
119+
}
120+
121+
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, stats: stats}}
122+
var results []addSSTResult
123+
var files int
124+
for len(work) > 0 {
125+
item := work[0]
126+
work = work[1:]
127+
if err := func() error {
128+
var err error
129+
opts := retry.Options{
130+
InitialBackoff: 30 * time.Millisecond,
131+
Multiplier: 2,
132+
MaxRetries: 10,
133+
}
134+
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
135+
log.VEventf(ctx, 4, "sending %s AddSSTable [%s,%s)", sz(len(item.sstBytes)), item.start, item.end)
136+
// If this SST is "too small", the fixed costs associated with adding an
137+
// SST – in terms of triggering flushes, extra compactions, etc – would
138+
// exceed the savings we get from skipping regular, key-by-key writes,
139+
// and we're better off just putting its contents in a regular batch.
140+
// This isn't perfect: We're still incurring extra overhead constructing
141+
// SSTables just for use as a wire-format, but the rest of the
142+
// implementation of bulk-ingestion assumes certainly semantics of the
143+
// AddSSTable API - like ingest at arbitrary timestamps or collision
144+
// detection - making it is simpler to just always use the same API
145+
// and just switch how it writes its result.
146+
ingestAsWriteBatch := false
147+
if a.settings != nil && int64(len(item.sstBytes)) < tooSmallSSTSize.Get(&a.settings.SV) {
148+
log.VEventf(ctx, 3, "ingest data is too small (%d keys/%d bytes) for SSTable, adding via regular batch", item.stats.KeyCount, len(item.sstBytes))
149+
ingestAsWriteBatch = true
150+
ingestionPerformanceStats.AsWrites++
151+
}
152+
153+
req := &kvpb.AddSSTableRequest{
154+
RequestHeader: kvpb.RequestHeader{Key: item.start, EndKey: item.end},
155+
Data: item.sstBytes,
156+
DisallowShadowingBelow: a.disallowShadowingBelow,
157+
MVCCStats: &item.stats,
158+
IngestAsWrites: ingestAsWriteBatch,
159+
ReturnFollowingLikelyNonEmptySpanStart: true,
160+
}
161+
if a.writeAtBatchTS {
162+
req.SSTTimestampToRequestTimestamp = batchTS
163+
}
164+
165+
ba := &kvpb.BatchRequest{
166+
Header: kvpb.Header{Timestamp: batchTS, ClientRangeInfo: roachpb.ClientRangeInfo{ExplicitlyRequested: true}},
167+
AdmissionHeader: kvpb.AdmissionHeader{
168+
Priority: int32(a.priority),
169+
CreateTime: timeutil.Now().UnixNano(),
170+
Source: kvpb.AdmissionHeader_FROM_SQL,
171+
NoMemoryReservedAtSource: true,
172+
},
173+
}
174+
175+
ba.Add(req)
176+
beforeSend := timeutil.Now()
177+
178+
sendCtx, sendSp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable/Send")
179+
br, pErr := a.db.NonTransactionalSender().Send(sendCtx, ba)
180+
sendSp.Finish()
181+
182+
sendTime := timeutil.Since(beforeSend)
183+
184+
ingestionPerformanceStats.SendWait += sendTime
185+
if br != nil && len(br.BatchResponse_Header.RangeInfos) > 0 {
186+
// Should only ever really be one iteration but if somehow it isn't,
187+
// e.g. if a request was redirected, go ahead and count it against all
188+
// involved stores; if it is small this edge case is immaterial, and
189+
// if it is large, it's probably one big one but we don't know which
190+
// so just blame them all (averaging it out could hide one big delay).
191+
for i := range br.BatchResponse_Header.RangeInfos {
192+
ingestionPerformanceStats.SendWaitByStore[br.BatchResponse_Header.RangeInfos[i].Lease.Replica.StoreID] += sendTime
193+
}
194+
}
195+
196+
if pErr == nil {
197+
resp := br.Responses[0].GetInner().(*kvpb.AddSSTableResponse)
198+
results = append(results, addSSTResult{
199+
timestamp: br.Timestamp,
200+
availableBytes: resp.AvailableBytes,
201+
followingLikelyNonEmptySpanStart: resp.FollowingLikelyNonEmptySpanStart,
202+
rangeSpan: resp.RangeSpan,
203+
})
204+
files++
205+
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, sendTime)
206+
return nil
207+
}
208+
209+
err = pErr.GoError()
210+
// Retry on AmbiguousResult.
211+
if errors.HasType(err, (*kvpb.AmbiguousResultError)(nil)) {
212+
log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v", start, end, r.CurrentAttempt(), err)
213+
continue
214+
}
215+
// This range has split -- we need to split the SST to try again.
216+
if m := (*kvpb.RangeKeyMismatchError)(nil); errors.As(err, &m) {
217+
// TODO(andrei): We just use the first of m.Ranges; presumably we
218+
// should be using all of them to avoid further retries.
219+
mr, err := m.MismatchedRange()
220+
if err != nil {
221+
return err
222+
}
223+
split := mr.Desc.EndKey.AsRawKey()
224+
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
225+
left, right, err := createSplitSSTable(ctx, item.start, split, iter, a.settings)
226+
if err != nil {
227+
return err
228+
}
229+
if err := addStatsToSplitTables(left, right, item, sendStart); err != nil {
230+
return err
231+
}
232+
// Add more work.
233+
work = append([]*sstSpan{left, right}, work...)
234+
return nil
235+
}
236+
}
237+
return err
238+
}(); err != nil {
239+
return nil, errors.Wrapf(err, "addsstable [%s,%s)", item.start, item.end)
240+
}
241+
// explicitly deallocate SST. This will not deallocate the
242+
// top level SST which is kept around to iterate over.
243+
item.sstBytes = nil
244+
}
245+
ingestionPerformanceStats.SplitRetries += int64(files - 1)
246+
247+
log.VEventf(ctx, 3, "AddSSTable [%v, %v) added %d files and took %v", start, end, files, timeutil.Since(sendStart))
248+
return results, nil
249+
}
250+
251+
// createSplitSSTable is a helper for splitting up SSTs. The iterator
252+
// passed in is over the top level SST passed into AddSSTTable().
253+
func createSplitSSTable(
254+
ctx context.Context,
255+
start, splitKey roachpb.Key,
256+
iter storage.SimpleMVCCIterator,
257+
settings *cluster.Settings,
258+
) (*sstSpan, *sstSpan, error) {
259+
sstFile := &storage.MemObject{}
260+
if start.Compare(splitKey) >= 0 {
261+
return nil, nil, errors.AssertionFailedf("start key %s of original sst must be greater than than split key %s", start, splitKey)
262+
}
263+
w := storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
264+
defer w.Close()
265+
266+
split := false
267+
var first, last roachpb.Key
268+
var left, right *sstSpan
269+
270+
iter.SeekGE(storage.MVCCKey{Key: start})
271+
for {
272+
if ok, err := iter.Valid(); err != nil {
273+
return nil, nil, err
274+
} else if !ok {
275+
break
276+
}
277+
278+
key := iter.UnsafeKey()
279+
280+
if !split && key.Key.Compare(splitKey) >= 0 {
281+
err := w.Finish()
282+
if err != nil {
283+
return nil, nil, err
284+
}
285+
286+
left = &sstSpan{start: first, end: last.Next(), sstBytes: sstFile.Data()}
287+
*sstFile = storage.MemObject{}
288+
w = storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
289+
split = true
290+
first = nil
291+
last = nil
292+
}
293+
294+
if len(first) == 0 {
295+
first = append(first[:0], key.Key...)
296+
}
297+
last = append(last[:0], key.Key...)
298+
299+
v, err := iter.UnsafeValue()
300+
if err != nil {
301+
return nil, nil, err
302+
}
303+
if err := w.Put(key, v); err != nil {
304+
return nil, nil, err
305+
}
306+
307+
iter.Next()
308+
}
309+
310+
err := w.Finish()
311+
if err != nil {
312+
return nil, nil, err
313+
}
314+
if !split {
315+
return nil, nil, errors.AssertionFailedf("split key %s after last key %s", splitKey, last.Next())
316+
}
317+
right = &sstSpan{start: first, end: last.Next(), sstBytes: sstFile.Data()}
318+
return left, right, nil
319+
}
320+
321+
// addStatsToSplitTables computes the stats of the new lhs and rhs SSTs by
322+
// computing the rhs sst stats, then computing the lhs stats as
323+
// originalStats-rhsStats.
324+
func addStatsToSplitTables(left, right, original *sstSpan, sendStartTimestamp time.Time) error {
325+
// Needs a new iterator with new bounds.
326+
statsIter, err := storage.NewMemSSTIterator(original.sstBytes, true, storage.IterOptions{
327+
KeyTypes: storage.IterKeyTypePointsOnly,
328+
LowerBound: right.start,
329+
UpperBound: right.end,
330+
})
331+
if err != nil {
332+
return err
333+
}
334+
statsIter.SeekGE(storage.MVCCKey{Key: right.start})
335+
right.stats, err = storage.ComputeStatsForIter(statsIter, sendStartTimestamp.Unix())
336+
statsIter.Close()
337+
if err != nil {
338+
return err
339+
}
340+
left.stats = original.stats
341+
left.stats.Subtract(right.stats)
342+
return nil
343+
}

0 commit comments

Comments
 (0)