Skip to content

Commit 33e1034

Browse files
craig[bot]jeffswenson
andcommitted
Merge #148395
148395: kv/bulk: enable async flushing r=jeffswenson a=jeffswenson This change re-enables async flushing in the SST batcher. Async flushing is a throughput win for large (1 Tib+) out of order writers. This occurs during index builds, out of order imports, and PCR. Previously, async flushing was a special case in the batcher. This change reworks async flushing so that all flushes are async. This ensures that context handling and error handling are consistent. This should reduce the probability of an async only bug in the sst writer that is hard to exercise. Fixes: #146828 Co-authored-by: Jeff Swenson <[email protected]>
2 parents 40c4c0d + 107f155 commit 33e1034

File tree

7 files changed

+523
-82
lines changed

7 files changed

+523
-82
lines changed

pkg/backup/restore_data_processor.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
514514
// tests to fail.
515515
rd.FlowCtx.Cfg.BackupMonitor.MakeConcurrentBoundAccount(),
516516
rd.FlowCtx.Cfg.BulkSenderLimiter,
517+
nil,
517518
)
518519
if err != nil {
519520
return summary, err
@@ -717,7 +718,6 @@ func reserveRestoreWorkerMemory(
717718
// implement a mock SSTBatcher used purely for job progress tracking.
718719
type SSTBatcherExecutor interface {
719720
AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error
720-
Reset(ctx context.Context)
721721
Flush(ctx context.Context) error
722722
Close(ctx context.Context)
723723
GetSummary() kvpb.BulkOpSummary
@@ -735,9 +735,6 @@ func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, va
735735
return b.totalRows.Count(key.Key)
736736
}
737737

738-
// Reset resets the counter
739-
func (b *sstBatcherNoop) Reset(ctx context.Context) {}
740-
741738
// Flush noops.
742739
func (b *sstBatcherNoop) Flush(ctx context.Context) error {
743740
return nil

pkg/crosscluster/logical/offline_initial_scan_processor.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ func (o *offlineInitialScanProcessor) setup(ctx context.Context) error {
141141
true, /* writeAtBatchTs */
142142
true, /* splitAndScatterRanges */
143143
o.FlowCtx.Cfg.BackupMonitor.MakeConcurrentBoundAccount(),
144-
o.FlowCtx.Cfg.BulkSenderLimiter)
144+
o.FlowCtx.Cfg.BulkSenderLimiter,
145+
nil,
146+
)
145147
if err != nil {
146148
return err
147149
}
@@ -377,7 +379,9 @@ func (o *offlineInitialScanProcessor) checkpoint(
377379
if err := o.flushBatch(ctx); err != nil {
378380
return errors.Wrap(err, "flushing batcher on checkpoint")
379381
}
380-
o.batcher.Reset(ctx)
382+
if err := o.batcher.Reset(ctx); err != nil {
383+
return errors.Wrap(err, "resetting batcher on checkpoint")
384+
}
381385

382386
select {
383387
case o.checkpointCh <- offlineCheckpoint{
@@ -409,7 +413,9 @@ func (o *offlineInitialScanProcessor) flushBatch(ctx context.Context) error {
409413
if err := o.batcher.Flush(ctx); err != nil {
410414
return err
411415
}
412-
o.batcher.Reset(ctx)
416+
if err := o.batcher.Reset(ctx); err != nil {
417+
return errors.Wrap(err, "resetting batcher after flush")
418+
}
413419
o.lastKeyAdded = roachpb.Key{}
414420
return nil
415421
}

pkg/crosscluster/physical/stream_ingestion_processor.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,11 +1273,15 @@ type flushableBuffer struct {
12731273

12741274
// flushBuffer flushes the given streamIngestionBuffer via the SST
12751275
// batchers and returns the underlying streamIngestionBuffer to the pool.
1276-
func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.ResolvedSpans, error) {
1276+
func (sip *streamIngestionProcessor) flushBuffer(
1277+
b flushableBuffer,
1278+
) (_ *jobspb.ResolvedSpans, err error) {
12771279
ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush")
12781280
defer sp.Finish()
1279-
// Ensure the batcher is always reset, even on early error returns.
1280-
defer sip.batcher.Reset(ctx)
1281+
defer func() {
1282+
// Ensure the batcher is always reset, even on early error returns.
1283+
err = errors.CombineErrors(err, sip.batcher.Reset(ctx))
1284+
}()
12811285

12821286
// First process the point KVs.
12831287
//

pkg/kv/bulk/BUILD.bazel

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/storage",
2626
"//pkg/storage/enginepb",
2727
"//pkg/util/admission/admissionpb",
28+
"//pkg/util/buildutil",
2829
"//pkg/util/ctxgroup",
2930
"//pkg/util/hlc",
3031
"//pkg/util/humanizeutil",
@@ -57,15 +58,23 @@ go_test(
5758
"//pkg/kv/kvclient/kvcoord",
5859
"//pkg/kv/kvclient/rangecache",
5960
"//pkg/kv/kvpb",
61+
"//pkg/kv/kvserver",
6062
"//pkg/kv/kvserver/kvserverbase",
6163
"//pkg/roachpb",
6264
"//pkg/security/securityassets",
6365
"//pkg/security/securitytest",
6466
"//pkg/server",
6567
"//pkg/settings/cluster",
68+
"//pkg/sql/catalog",
69+
"//pkg/sql/catalog/descpb",
70+
"//pkg/sql/catalog/desctestutils",
71+
"//pkg/sql/catalog/tabledesc",
72+
"//pkg/sql/rowenc",
73+
"//pkg/sql/sem/tree",
6674
"//pkg/storage",
6775
"//pkg/storage/enginepb",
6876
"//pkg/testutils/serverutils",
77+
"//pkg/testutils/sqlutils",
6978
"//pkg/testutils/storageutils",
7079
"//pkg/testutils/testcluster",
7180
"//pkg/util/encoding",
@@ -77,6 +86,8 @@ go_test(
7786
"//pkg/util/randutil",
7887
"//pkg/util/timeutil",
7988
"//pkg/util/tracing",
89+
"@com_github_cockroachdb_apd_v3//:apd",
90+
"@com_github_cockroachdb_errors//:errors",
8091
"@com_github_stretchr_testify//require",
8192
],
8293
)

pkg/kv/bulk/buffering_adder.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func MakeBulkAdder(
145145
lastFlush: timeutil.Now(),
146146
curBufSummary: kvpb.BulkOpSummary{},
147147
}
148+
b.sink.init(ctx)
148149

149150
// Register a callback with the underlying sink to accumulate the summary for
150151
// the current buffered KVs. The curBufSummary is reset when the buffering
@@ -282,7 +283,9 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
282283
b.curBufSummary.Reset()
283284
return nil
284285
}
285-
b.sink.Reset(ctx)
286+
if err := b.sink.Reset(ctx); err != nil {
287+
return errors.Wrapf(err, "failed to reset %s adder before flush", b.name)
288+
}
286289
b.sink.batch.stats.BufferFlushes++
287290

288291
var before *bulkpb.IngestionPerformanceStats

0 commit comments

Comments
 (0)