Skip to content

Commit 5ed9912

Browse files
committed
kv/bulk: limit buffering when waiting for row boundaries in SSTBatcher
Previously, SSTBatcher would wait indefinitely for row boundaries before flushing when over the target file size, which could lead to unbounded memory usage for very large rows. This change adds a safeguard that only waits for row boundaries when the SST size is less than 2x the flush limit, forcing a flush of partial rows when substantially over the limit. This prevents excessive memory usage while still maintaining the benefits of row-aligned file boundaries for most cases. Release note: none. Epic: none.
1 parent 43c294e commit 5ed9912

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

pkg/kv/bulk/sst_batcher.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
565565
return nil
566566
}
567567

568-
if b.batch.sstWriter.DataSize >= ingestFileSize(b.settings) {
568+
if flushLimit := ingestFileSize(b.settings); b.batch.sstWriter.DataSize >= flushLimit {
569569
// We're at/over size target, so we want to flush, but first check if we are
570570
// at a new row boundary. Having row-aligned boundaries is not actually
571571
// required by anything, but has the nice property of meaning a split will
@@ -576,13 +576,20 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
576576
// starts, so when we split at that row, that overhang into the RHS that we
577577
// just wrote will be rewritten by the subsequent scatter. By waiting for a
578578
// row boundary, we ensure any split is actually between files.
579-
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
580-
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
581-
if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) {
582-
// An error decoding either key implies it is not a valid row key and thus
583-
// not the same row for our purposes; we don't care what the error is.
584-
return nil // keep going to row boundary.
579+
//
580+
// That said, only do this if we are only moderately over the flush target;
581+
// if we are subtantially over the limit, just flush the partial row as we
582+
// cannot buffer indefinitely.
583+
if b.batch.sstWriter.DataSize < 2*flushLimit {
584+
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
585+
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
586+
if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) {
587+
// An error decoding either key implies it is not a valid row key and thus
588+
// not the same row for our purposes; we don't care what the error is.
589+
return nil // keep going to row boundary.
590+
}
585591
}
592+
586593
if b.mustSyncBeforeFlush {
587594
err := b.syncFlush()
588595
if err != nil {

0 commit comments

Comments
 (0)