Skip to content

Commit 7a46917

Browse files
craig[bot]dt
andcommitted
Merge #151550
151550: kv/bulk: limit buffering when waiting for row boundaries in SSTBatcher r=dt a=dt Previously, SSTBatcher would wait indefinitely for row boundaries before flushing when over the target file size, which could lead to unbounded memory usage for very large rows. This change adds a safeguard that only waits for row boundaries when the SST size is less than 2x the flush limit, forcing a flush of partial rows when substantially over the limit. This prevents excessive memory usage while still maintaining the benefits of row-aligned file boundaries for most cases. Release note: none. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents 4dee3bc + 5ed9912 commit 7a46917

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

pkg/kv/bulk/sst_batcher.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
565565
return nil
566566
}
567567

568-
if b.batch.sstWriter.DataSize >= ingestFileSize(b.settings) {
568+
if flushLimit := ingestFileSize(b.settings); b.batch.sstWriter.DataSize >= flushLimit {
569569
// We're at/over size target, so we want to flush, but first check if we are
570570
// at a new row boundary. Having row-aligned boundaries is not actually
571571
// required by anything, but has the nice property of meaning a split will
@@ -576,13 +576,20 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
576576
// starts, so when we split at that row, that overhang into the RHS that we
577577
// just wrote will be rewritten by the subsequent scatter. By waiting for a
578578
// row boundary, we ensure any split is actually between files.
579-
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
580-
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
581-
if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) {
582-
// An error decoding either key implies it is not a valid row key and thus
583-
// not the same row for our purposes; we don't care what the error is.
584-
return nil // keep going to row boundary.
579+
//
580+
// That said, only do this if we are only moderately over the flush target;
581+
// if we are subtantially over the limit, just flush the partial row as we
582+
// cannot buffer indefinitely.
583+
if b.batch.sstWriter.DataSize < 2*flushLimit {
584+
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
585+
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
586+
if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) {
587+
// An error decoding either key implies it is not a valid row key and thus
588+
// not the same row for our purposes; we don't care what the error is.
589+
return nil // keep going to row boundary.
590+
}
585591
}
592+
586593
if b.mustSyncBeforeFlush {
587594
err := b.syncFlush()
588595
if err != nil {

0 commit comments

Comments
 (0)