Skip to content

Commit 3641703

Browse files
committed
kv/bulk: extract sst add logic from sst batcher
This extracts the logic related to flushing a single sst from the logic that controls batching and flushing. Release note: none Part of: #146828
1 parent 127f0a7 commit 3641703

File tree

3 files changed

+110
-51
lines changed

3 files changed

+110
-51
lines changed

pkg/kv/bulk/buffering_adder.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,23 @@ func MakeBulkAdder(
111111
name: opts.Name,
112112
importEpoch: opts.ImportEpoch,
113113
sink: SSTBatcher{
114-
name: opts.Name,
115-
db: db,
116-
rc: rangeCache,
114+
name: opts.Name,
115+
db: db,
116+
rc: rangeCache,
117+
adder: newSSTAdder(
118+
db,
119+
settings,
120+
opts.WriteAtBatchTimestamp,
121+
opts.DisallowShadowingBelow,
122+
admissionpb.BulkNormalPri,
123+
),
117124
settings: settings,
118125
skipDuplicates: opts.SkipDuplicates,
119126
disallowShadowingBelow: opts.DisallowShadowingBelow,
120127
batchTS: opts.BatchTimestamp,
121128
writeAtBatchTS: opts.WriteAtBatchTimestamp,
122129
mem: bulkMon.MakeConcurrentBoundAccount(),
123130
limiter: sendLimiter,
124-
priority: admissionpb.BulkNormalPri,
125131
},
126132
timestamp: timestamp,
127133
maxBufferLimit: opts.MaxBufferSize,

pkg/kv/bulk/sst_adder.go

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99
"context"
1010
"time"
1111

12+
"github.com/cockroachdb/cockroach/pkg/kv"
1213
"github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
1516
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1617
"github.com/cockroachdb/cockroach/pkg/storage"
1718
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
19+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
1820
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1921
"github.com/cockroachdb/cockroach/pkg/util/log"
2022
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -23,30 +25,71 @@ import (
2325
"github.com/cockroachdb/errors"
2426
)
2527

28+
type sstAdder struct {
29+
settings *cluster.Settings
30+
db *kv.DB
31+
32+
// disallowShadowingBelow is described on kvpb.AddSSTableRequest.
33+
disallowShadowingBelow hlc.Timestamp
34+
35+
// priority is the admission priority used for AddSSTable
36+
// requests.
37+
priority admissionpb.WorkPriority
38+
39+
// writeAtBatchTS is true if the SST should be written at the
40+
// batch timestamp. If this is set to true, then all the keys in
41+
//the sst must have the same timestamp and they must be equal to
42+
// the batch timestamp.
43+
// TODO(jeffswenson): remove this from `sstAdder` in a stand alone PR.
44+
writeAtBatchTS bool
45+
}
46+
47+
func newSSTAdder(
48+
db *kv.DB,
49+
settings *cluster.Settings,
50+
writeAtBatchTS bool,
51+
disallowShadowingBelow hlc.Timestamp,
52+
priority admissionpb.WorkPriority,
53+
) *sstAdder {
54+
return &sstAdder{
55+
db: db,
56+
disallowShadowingBelow: disallowShadowingBelow,
57+
priority: priority,
58+
settings: settings,
59+
writeAtBatchTS: writeAtBatchTS,
60+
}
61+
}
62+
2663
type sstSpan struct {
2764
start, end roachpb.Key // [inclusive, exclusive)
2865
sstBytes []byte
2966
stats enginepb.MVCCStats
3067
}
3168

69+
type addSSTResult struct {
70+
timestamp hlc.Timestamp
71+
availableBytes int64
72+
followingLikelyNonEmptySpanStart roachpb.Key
73+
rangeSpan roachpb.Span
74+
}
75+
3276
// addSSTable retries db.AddSSTable if retryable errors occur, including if the
3377
// SST spans a split, in which case it is iterated and split into two SSTs, one
3478
// for each side of the split in the error, and each are retried.
35-
func (b *SSTBatcher) addSSTable(
79+
func (a *sstAdder) AddSSTable(
3680
ctx context.Context,
3781
batchTS hlc.Timestamp,
3882
start, end roachpb.Key,
3983
sstBytes []byte,
4084
stats enginepb.MVCCStats,
41-
updatesLastRange bool,
4285
ingestionPerformanceStats *bulkpb.IngestionPerformanceStats,
43-
) error {
86+
) ([]addSSTResult, error) {
4487
ctx, sp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable")
4588
defer sp.Finish()
4689

4790
sendStart := timeutil.Now()
4891
if ingestionPerformanceStats == nil {
49-
return errors.AssertionFailedf("ingestionPerformanceStats should not be nil")
92+
return nil, errors.AssertionFailedf("ingestionPerformanceStats should not be nil")
5093
}
5194

5295
// Currently, the SSTBatcher cannot ingest range keys, so it is safe to
@@ -58,21 +101,25 @@ func (b *SSTBatcher) addSSTable(
58101
}
59102
iter, err := storage.NewMemSSTIterator(sstBytes, true, iterOpts)
60103
if err != nil {
61-
return err
104+
return nil, err
62105
}
63106
defer iter.Close()
64107

65-
if (stats == enginepb.MVCCStats{}) {
108+
if stats == (enginepb.MVCCStats{}) {
109+
// TODO(jeffswenson): Audit AddSST callers to see if they generate
110+
// server side stats now. Accurately computing stats in the face of replays
111+
// requires the server to do it.
66112
iter.SeekGE(storage.MVCCKey{Key: start})
67113
// NB: even though this ComputeStatsForIter call exhausts the iterator, we
68114
// can reuse/re-seek on the iterator, as part of the MVCCIterator contract.
69115
stats, err = storage.ComputeStatsForIter(iter, sendStart.UnixNano())
70116
if err != nil {
71-
return errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
117+
return nil, errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
72118
}
73119
}
74120

75121
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, stats: stats}}
122+
var results []addSSTResult
76123
var files int
77124
for len(work) > 0 {
78125
item := work[0]
@@ -97,7 +144,7 @@ func (b *SSTBatcher) addSSTable(
97144
// detection - making it is simpler to just always use the same API
98145
// and just switch how it writes its result.
99146
ingestAsWriteBatch := false
100-
if b.settings != nil && int64(len(item.sstBytes)) < tooSmallSSTSize.Get(&b.settings.SV) {
147+
if a.settings != nil && int64(len(item.sstBytes)) < tooSmallSSTSize.Get(&a.settings.SV) {
101148
log.VEventf(ctx, 3, "ingest data is too small (%d keys/%d bytes) for SSTable, adding via regular batch", item.stats.KeyCount, len(item.sstBytes))
102149
ingestAsWriteBatch = true
103150
ingestionPerformanceStats.AsWrites++
@@ -106,29 +153,30 @@ func (b *SSTBatcher) addSSTable(
106153
req := &kvpb.AddSSTableRequest{
107154
RequestHeader: kvpb.RequestHeader{Key: item.start, EndKey: item.end},
108155
Data: item.sstBytes,
109-
DisallowShadowingBelow: b.disallowShadowingBelow,
156+
DisallowShadowingBelow: a.disallowShadowingBelow,
110157
MVCCStats: &item.stats,
111158
IngestAsWrites: ingestAsWriteBatch,
112159
ReturnFollowingLikelyNonEmptySpanStart: true,
113160
}
114-
if b.writeAtBatchTS {
161+
if a.writeAtBatchTS {
115162
req.SSTTimestampToRequestTimestamp = batchTS
116163
}
117164

118165
ba := &kvpb.BatchRequest{
119166
Header: kvpb.Header{Timestamp: batchTS, ClientRangeInfo: roachpb.ClientRangeInfo{ExplicitlyRequested: true}},
120167
AdmissionHeader: kvpb.AdmissionHeader{
121-
Priority: int32(b.priority),
168+
Priority: int32(a.priority),
122169
CreateTime: timeutil.Now().UnixNano(),
123170
Source: kvpb.AdmissionHeader_FROM_SQL,
124171
NoMemoryReservedAtSource: true,
125172
},
126173
}
174+
127175
ba.Add(req)
128176
beforeSend := timeutil.Now()
129177

130178
sendCtx, sendSp := tracing.ChildSpan(ctx, "*SSTBatcher.addSSTable/Send")
131-
br, pErr := b.db.NonTransactionalSender().Send(sendCtx, ba)
179+
br, pErr := a.db.NonTransactionalSender().Send(sendCtx, ba)
132180
sendSp.Finish()
133181

134182
sendTime := timeutil.Since(beforeSend)
@@ -147,23 +195,12 @@ func (b *SSTBatcher) addSSTable(
147195

148196
if pErr == nil {
149197
resp := br.Responses[0].GetInner().(*kvpb.AddSSTableResponse)
150-
b.mu.Lock()
151-
if b.writeAtBatchTS {
152-
b.mu.maxWriteTS.Forward(br.Timestamp)
153-
}
154-
b.mu.Unlock()
155-
// If this was sent async then, by the time the reply gets back, it
156-
// might not be the last range anymore. We can just discard the last
157-
// range reply in this case though because async sends are only used
158-
// for SSTs sent due to range boundaries, i.e. when we are done with
159-
// with that range anyway.
160-
if updatesLastRange {
161-
b.lastRange.span = resp.RangeSpan
162-
if resp.RangeSpan.Valid() {
163-
b.lastRange.remaining = sz(resp.AvailableBytes)
164-
b.lastRange.nextExistingKey = resp.FollowingLikelyNonEmptySpanStart
165-
}
166-
}
198+
results = append(results, addSSTResult{
199+
timestamp: br.Timestamp,
200+
availableBytes: resp.AvailableBytes,
201+
followingLikelyNonEmptySpanStart: resp.FollowingLikelyNonEmptySpanStart,
202+
rangeSpan: resp.RangeSpan,
203+
})
167204
files++
168205
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, sendTime)
169206
return nil
@@ -185,7 +222,7 @@ func (b *SSTBatcher) addSSTable(
185222
}
186223
split := mr.Desc.EndKey.AsRawKey()
187224
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
188-
left, right, err := createSplitSSTable(ctx, item.start, split, iter, b.settings)
225+
left, right, err := createSplitSSTable(ctx, item.start, split, iter, a.settings)
189226
if err != nil {
190227
return err
191228
}
@@ -199,7 +236,7 @@ func (b *SSTBatcher) addSSTable(
199236
}
200237
return err
201238
}(); err != nil {
202-
return errors.Wrapf(err, "addsstable [%s,%s)", item.start, item.end)
239+
return nil, errors.Wrapf(err, "addsstable [%s,%s)", item.start, item.end)
203240
}
204241
// explicitly deallocate SST. This will not deallocate the
205242
// top level SST which is kept around to iterate over.
@@ -208,7 +245,7 @@ func (b *SSTBatcher) addSSTable(
208245
ingestionPerformanceStats.SplitRetries += int64(files - 1)
209246

210247
log.VEventf(ctx, 3, "AddSSTable [%v, %v) added %d files and took %v", start, end, files, timeutil.Since(sendStart))
211-
return nil
248+
return results, nil
212249
}
213250

214251
// createSplitSSTable is a helper for splitting up SSTs. The iterator

pkg/kv/bulk/sst_batcher.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,12 @@ func MakeAndRegisterConcurrencyLimiter(sv *settings.Values) limit.ConcurrentRequ
9494
type SSTBatcher struct {
9595
name string
9696
db *kv.DB
97+
adder *sstAdder
9798
rc *rangecache.RangeCache
9899
settings *cluster.Settings
99100
mem *mon.ConcurrentBoundAccount
100101
limiter limit.ConcurrentRequestLimiter
101102

102-
// priority is the admission priority used for AddSSTable
103-
// requests.
104-
priority admissionpb.WorkPriority
105-
106103
// disallowShadowingBelow is described on kvpb.AddSSTableRequest.
107104
disallowShadowingBelow hlc.Timestamp
108105

@@ -222,13 +219,13 @@ func MakeSSTBatcher(
222219
b := &SSTBatcher{
223220
name: name,
224221
db: db,
222+
adder: newSSTAdder(db, settings, writeAtBatchTs, disallowShadowingBelow, admissionpb.BulkNormalPri),
225223
settings: settings,
226224
disallowShadowingBelow: disallowShadowingBelow,
227225
writeAtBatchTS: writeAtBatchTs,
228226
disableScatters: !scatterSplitRanges,
229227
mem: mem,
230228
limiter: sendLimiter,
231-
priority: admissionpb.BulkNormalPri,
232229
}
233230
b.mu.lastFlush = timeutil.Now()
234231
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -248,8 +245,13 @@ func MakeStreamSSTBatcher(
248245
onFlush func(summary kvpb.BulkOpSummary),
249246
) (*SSTBatcher, error) {
250247
b := &SSTBatcher{
251-
db: db,
252-
rc: rc,
248+
db: db,
249+
rc: rc,
250+
// We use NormalPri since anything lower than normal priority is assumed to
251+
// be able to handle reduced throughput. We are OK with his for now since
252+
// the consuming cluster of a replication stream does not have a latency
253+
// sensitive workload running against it.
254+
adder: newSSTAdder(db, settings, false /*writeAtBatchTS*/, hlc.Timestamp{}, admissionpb.BulkNormalPri),
253255
settings: settings,
254256
ingestAll: true,
255257
mem: mem,
@@ -263,11 +265,6 @@ func MakeStreamSSTBatcher(
263265
// does not however make sense to scatter that range as the RHS maybe
264266
// non-empty.
265267
disableScatters: true,
266-
// We use NormalPri since anything lower than normal priority is assumed to
267-
// be able to handle reduced throughput. We are OK with his for now since
268-
// the consuming cluster of a replication stream does not have a latency
269-
// sensitive workload running against it.
270-
priority: admissionpb.NormalPri,
271268
}
272269
b.mu.lastFlush = timeutil.Now()
273270
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -289,12 +286,12 @@ func MakeTestingSSTBatcher(
289286
) (*SSTBatcher, error) {
290287
b := &SSTBatcher{
291288
db: db,
289+
adder: newSSTAdder(db, settings, false, hlc.Timestamp{}, admissionpb.BulkNormalPri),
292290
settings: settings,
293291
skipDuplicates: skipDuplicates,
294292
ingestAll: ingestAll,
295293
mem: mem,
296294
limiter: sendLimiter,
297-
priority: admissionpb.BulkNormalPri,
298295
}
299296
b.Reset(ctx)
300297
return b, nil
@@ -724,15 +721,34 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
724721
fn := func(ctx context.Context) error {
725722
defer res.Release()
726723
defer b.mem.Shrink(ctx, reserved)
727-
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, !flushAsync, currentBatchStatsCopy); err != nil {
724+
results, err := b.adder.AddSSTable(ctx, batchTS, start, end, data, stats, currentBatchStatsCopy)
725+
if err != nil {
728726
return err
729727
}
730728

731729
// Now that we have completed ingesting the SSTables we take a lock and
732-
// update the statistics on the SSTBatcher.
730+
// process the flush results.
733731
b.mu.Lock()
734732
defer b.mu.Unlock()
735733

734+
for _, addResult := range results {
735+
if b.writeAtBatchTS {
736+
b.mu.maxWriteTS.Forward(addResult.timestamp)
737+
}
738+
if !flushAsync {
739+
// If this was sent async then, by the time the reply gets back, it
740+
// might not be the last range anymore. We can just discard the last
741+
// range reply in this case though because async sends are only used
742+
// for SSTs sent due to range boundaries, i.e. when we are done with
743+
// with that range anyway.
744+
b.lastRange.span = addResult.rangeSpan
745+
if addResult.rangeSpan.Valid() {
746+
b.lastRange.remaining = sz(addResult.availableBytes)
747+
b.lastRange.nextExistingKey = addResult.followingLikelyNonEmptySpanStart
748+
}
749+
}
750+
}
751+
736752
// Update the statistics associated with the current batch. We do this on
737753
// our captured copy of the currentBatchSummary instead of the
738754
// b.batchRowCounter since the caller may have reset the batcher by the time

0 commit comments

Comments
 (0)