Skip to content

Commit fed0025

Browse files
committed
kv/bulk: extract fields into batch
This extracts per batch state tracking into a `batch` struct. The intent is to make it clear what state needs to be reset before accepting writes for the next batch. Release note: none Part of: #146828
1 parent 975861f commit fed0025

File tree

3 files changed

+191
-155
lines changed

3 files changed

+191
-155
lines changed

pkg/kv/bulk/buffering_adder.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,18 @@ func MakeBulkAdder(
124124
settings: settings,
125125
skipDuplicates: opts.SkipDuplicates,
126126
disallowShadowingBelow: opts.DisallowShadowingBelow,
127-
batchTS: opts.BatchTimestamp,
128-
writeAtBatchTS: opts.WriteAtBatchTimestamp,
129-
mem: bulkMon.MakeConcurrentBoundAccount(),
130-
limiter: sendLimiter,
127+
batch: batch{
128+
// TODO(jeffswenson): As far as I can tell, setting the batch timestamp here does nothing. The API
129+
// is actively misleading.
130+
//
131+
// It used to do something before #75275 was merged, but now it is always test to hlc.Timestamp{}
132+
// by reset before the first flush. I think we should clean this up, but cleaning it up properly
133+
// requires cleanups throughout the schema changer code base to replace "WriteAsOf" with "ReadAsOf".
134+
ts: opts.BatchTimestamp,
135+
},
136+
writeAtBatchTS: opts.WriteAtBatchTimestamp,
137+
mem: bulkMon.MakeConcurrentBoundAccount(),
138+
limiter: sendLimiter,
131139
},
132140
timestamp: timestamp,
133141
maxBufferLimit: opts.MaxBufferSize,
@@ -201,7 +209,7 @@ func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte)
201209
return b.curBuf.append(key, value)
202210
}
203211

204-
b.sink.currentStats.FlushesDueToSize++
212+
b.sink.batch.stats.FlushesDueToSize++
205213
log.VEventf(ctx, 3, "%s adder triggering flush of %s of KVs in %s buffer",
206214
b.name, b.curBuf.KVSize(), b.bufferedMemSize())
207215

@@ -264,7 +272,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
264272
}
265273

266274
func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
267-
b.sink.currentStats.FillWait += timeutil.Since(b.lastFlush)
275+
b.sink.batch.stats.FillWait += timeutil.Since(b.lastFlush)
268276

269277
if b.bufferedKeys() == 0 {
270278
if b.onFlush != nil {
@@ -275,7 +283,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
275283
return nil
276284
}
277285
b.sink.Reset(ctx)
278-
b.sink.currentStats.BufferFlushes++
286+
b.sink.batch.stats.BufferFlushes++
279287

280288
var before *bulkpb.IngestionPerformanceStats
281289
var beforeSize int64
@@ -284,7 +292,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
284292
b.sink.mu.Lock()
285293
before = b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
286294
before.Combine(&b.sink.mu.totalStats)
287-
before.Combine(&b.sink.currentStats)
295+
before.Combine(&b.sink.batch.stats)
288296
beforeSize = b.sink.mu.totalBulkOpSummary.DataSize
289297
b.sink.mu.Unlock()
290298
}
@@ -297,7 +305,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
297305
mvccKey := storage.MVCCKey{Timestamp: b.timestamp}
298306

299307
beforeFlush := timeutil.Now()
300-
b.sink.currentStats.SortWait += beforeFlush.Sub(beforeSort)
308+
b.sink.batch.stats.SortWait += beforeFlush.Sub(beforeSort)
301309

302310
// If this is the first flush and is due to size, if it was unsorted then
303311
// create initial splits if requested before flushing.
@@ -340,14 +348,14 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
340348
}
341349
}
342350

343-
b.sink.currentStats.FlushWait += timeutil.Since(beforeFlush)
351+
b.sink.batch.stats.FlushWait += timeutil.Since(beforeFlush)
344352

345353
if log.V(3) && before != nil {
346354
b.sink.mu.Lock()
347355
written := b.sink.mu.totalBulkOpSummary.DataSize - beforeSize
348356
afterStats := b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
349357
afterStats.Combine(&b.sink.mu.totalStats)
350-
afterStats.Combine(&b.sink.currentStats)
358+
afterStats.Combine(&b.sink.batch.stats)
351359
b.sink.mu.Unlock()
352360

353361
files := afterStats.Batches - before.Batches
@@ -457,24 +465,24 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
457465
splitsWait := beforeScatters.Sub(beforeSplits)
458466
log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer",
459467
b.name, len(toScatter), timing(splitsWait), b.curBuf.Len(), b.curBuf.MemSize())
460-
b.sink.currentStats.Splits += int64(len(toScatter))
461-
b.sink.currentStats.SplitWait += splitsWait
468+
b.sink.batch.stats.Splits += int64(len(toScatter))
469+
b.sink.batch.stats.SplitWait += splitsWait
462470

463471
for _, splitKey := range toScatter {
464472
resp, err := b.sink.db.AdminScatter(ctx, splitKey, 0 /* maxSize */)
465473
if err != nil {
466474
log.Warningf(ctx, "failed to scatter: %v", err)
467475
continue
468476
}
469-
b.sink.currentStats.Scatters++
470-
b.sink.currentStats.ScatterMoved += resp.ReplicasScatteredBytes
477+
b.sink.batch.stats.Scatters++
478+
b.sink.batch.stats.ScatterMoved += resp.ReplicasScatteredBytes
471479
if resp.ReplicasScatteredBytes > 0 {
472480
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
473481
sz(resp.ReplicasScatteredBytes), resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
474482
}
475483
}
476484
scattersWait := timeutil.Since(beforeScatters)
477-
b.sink.currentStats.ScatterWait += scattersWait
485+
b.sink.batch.stats.ScatterWait += scattersWait
478486
log.Infof(ctx, "%s adder scattered %d initial split spans in %v",
479487
b.name, len(toScatter), timing(scattersWait))
480488

0 commit comments

Comments
 (0)