Skip to content

Commit 2891020

Browse files
macvincentfacebook-github-bot
authored andcommitted
Integrate Max Stream Size Chunking in Velox Writer (facebookincubator#249)
Summary: This is the last feature of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Here, we break down large streams into multiple chunks of size up to `maxStreamChunkRawSize`. This protects the reader from attempting to materialize huge chunks. We included StreamData support for this in the previous diff. In this diff, we integrate with the VeloxWriter. With this change, while memory pressure is detected, we: 1. Chunk large streams above `maxStreamChunkRawSize`, retaining stream data below the limit. 2. If there is still memory pressure after the first step, chunk streams with size above `minStreamChunkRawSize`. During stripe flush, we chunk all remaining data, breaking down streams above `maxStreamChunkRawSize` into smaller chunks. --- The general chunking policy has two phases: ## **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: - When memory usage exceeds the maximum threshold, it initiates chunking to reduce memory footprint while continuing data ingestion. - When previous chunking attempts succeeded and memory remains above the minimum threshold, it continues chunking to further reduce memory usage. - When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, it forces a full stripe flush to guarantee memory relief. ## **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: - Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data. - Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold. Differential Revision: D82175496
1 parent 54f4862 commit 2891020

File tree

3 files changed

+102
-65
lines changed

3 files changed

+102
-65
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 68 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "dwio/nimble/velox/SchemaSerialization.h"
3434
#include "dwio/nimble/velox/SchemaTypes.h"
3535
#include "dwio/nimble/velox/StatsGenerated.h"
36+
#include "dwio/nimble/velox/StreamChunker.h"
3637
#include "velox/common/time/CpuWallTimer.h"
3738
#include "velox/dwio/common/ExecutorBarrier.h"
3839
#include "velox/type/Type.h"
@@ -813,45 +814,34 @@ bool VeloxWriter::writeChunks(
813814
streams_.resize(context_->schemaBuilder.nodeCount());
814815

815816
auto processStream = [&](StreamData& streamData) {
816-
// TODO: Breakdown large streams above a threshold into smaller chunks.
817-
const auto minStreamSize =
818-
lastChunk ? 0 : context_->options.minStreamChunkRawSize;
819817
const auto* context =
820818
streamData.descriptor().context<WriterStreamContext>();
821819
bool isNullStream = context && context->isNullStream;
822-
bool shouldChunkStream = false;
823-
if (isNullStream) {
824-
// We apply the same null logic, where if all values
825-
// are non-nulls, we omit the entire stream.
826-
shouldChunkStream = streamData.hasNulls() &&
827-
streamData.nonNulls().size() > minStreamSize;
828-
} else {
829-
shouldChunkStream = streamData.data().size() > minStreamSize;
830-
}
831-
832-
// If we have previous written chunks for this stream, during final
833-
// chunk, always write any remaining data.
834-
const auto offset = streamData.descriptor().offset();
820+
const auto& offset = streamData.descriptor().offset();
821+
auto& streamSize = context_->columnStats[offset].physicalSize;
822+
logicalSizeBeforeEncoding += streamData.memoryUsed();
835823
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
836824
auto& stream = streams_[offset];
837-
if (lastChunk && !shouldChunkStream && !stream.content.empty()) {
838-
shouldChunkStream =
839-
!streamData.empty() || !streamData.nonNulls().empty();
840-
}
841-
842-
if (shouldChunkStream) {
825+
auto chunker = getStreamChunker(
826+
streamData,
827+
context_->options.maxStreamChunkRawSize,
828+
lastChunk ? 0 : context_->options.minStreamChunkRawSize,
829+
stream.content.empty(),
830+
isNullStream);
831+
while (chunker->hasNext()) {
832+
auto streamDataView = chunker->next();
843833
std::string_view encoded;
844834
if (isNullStream) {
845-
// For null streams we promote the null values to be written as
846-
// boolean data.
835+
// Null stream values are converted to boolean data for encoding.
847836
encoded = encodeStream(
848-
*context_, *encodingBuffer_, NullsAsDataStreamData{streamData});
837+
*context_,
838+
*encodingBuffer_,
839+
NullsAsDataStreamData{streamDataView});
849840
} else {
850-
encoded = encodeStream(*context_, *encodingBuffer_, streamData);
841+
encoded = encodeStream(*context_, *encodingBuffer_, streamDataView);
851842
}
852843

853844
if (!encoded.empty()) {
854-
auto& streamSize = context_->columnStats[offset].physicalSize;
855845
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
856846
for (auto& buffer : chunkWriter.encode(encoded)) {
857847
streamSize += buffer.size();
@@ -860,9 +850,10 @@ bool VeloxWriter::writeChunks(
860850
}
861851
}
862852
wroteChunk = true;
863-
logicalSizeBeforeEncoding += streamData.memoryUsed();
864-
streamData.reset();
865853
}
854+
// Remove processed stream data to reclaim memory.
855+
chunker->compact();
856+
logicalSizeBeforeEncoding -= streamData.memoryUsed();
866857
};
867858

868859
const auto& streams = context_->streams();
@@ -985,39 +976,56 @@ bool VeloxWriter::tryWriteStripe(bool force) {
985976
// TODO: we can improve merge the last chunk write with stripe
986977
if (context_->options.enableChunking &&
987978
shouldChunk() == ChunkDecision::Chunk) {
988-
const auto& streams = context_->streams();
989-
const size_t streamCount = streams.size();
990-
// Sort streams for chunking based on raw memory usage.
991-
// TODO(T240072104): Improve performance by bucketing the streams by size
992-
// (most significant bit) instead of sorting.
993-
std::vector<uint32_t> streamIndices(streamCount);
994-
std::iota(streamIndices.begin(), streamIndices.end(), 0);
995-
std::sort(
996-
streamIndices.begin(),
997-
streamIndices.end(),
998-
[&](const uint32_t& a, const uint32_t& b) {
999-
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1000-
});
979+
bool continueChunking = true;
980+
auto batchChunkStreams = [&](const std::vector<uint32_t>& indices) {
981+
size_t currentIndex = 0;
982+
size_t indicesCount = indices.size();
983+
NIMBLE_DASSERT(
984+
context_->options.chunkedStreamBatchSize > 0,
985+
"streamEncodingBatchSize must be greater than 0");
986+
while (currentIndex < indicesCount && continueChunking) {
987+
size_t endStreamIndex = std::min(
988+
indicesCount,
989+
currentIndex + context_->options.chunkedStreamBatchSize);
990+
std::span<const uint32_t> batchIndices(
991+
indices.begin() + currentIndex, endStreamIndex - currentIndex);
992+
// Stop attempting chunking once streams are too small to chunk.
993+
if (!writeChunks(
994+
/*lastChunk=*/false, batchIndices)) {
995+
continueChunking = false;
996+
break;
997+
}
998+
currentIndex = endStreamIndex;
999+
continueChunking = (shouldChunk() == ChunkDecision::Chunk);
1000+
}
1001+
};
10011002

1002-
// Chunk streams in batches.
1003-
size_t currentIndex = 0;
1004-
ChunkDecision decision = ChunkDecision::Chunk;
1005-
NIMBLE_DASSERT(
1006-
context_->options.chunkedStreamBatchSize > 0,
1007-
"streamEncodingBatchSize must be greater than 0");
1008-
while (currentIndex < streams.size() &&
1009-
decision == ChunkDecision::Chunk) {
1010-
size_t endStreamIndex = std::min(
1011-
streamCount,
1012-
currentIndex + context_->options.chunkedStreamBatchSize);
1013-
std::span<const uint32_t> batchIndices(
1014-
streamIndices.data() + currentIndex, endStreamIndex - currentIndex);
1015-
// Stop attempting chunking once streams are too small to chunk.
1016-
if (!writeChunks(false, batchIndices)) {
1017-
break;
1003+
// Relieve memory pressure by chunking streams above max size.
1004+
const auto& streams = context_->streams();
1005+
std::vector<uint32_t> streamsAboveMaxChunkSize;
1006+
streamsAboveMaxChunkSize.reserve(streams.size());
1007+
for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) {
1008+
if (streams[streamIndex]->memoryUsed() >=
1009+
context_->options.maxStreamChunkRawSize) {
1010+
streamsAboveMaxChunkSize.push_back(streamIndex);
10181011
}
1019-
currentIndex = endStreamIndex;
1020-
decision = shouldChunk();
1012+
}
1013+
batchChunkStreams(streamsAboveMaxChunkSize);
1014+
1015+
if (continueChunking) {
1016+
// Relieve memory pressure by chunking small streams.
1017+
// Sort streams for chunking based on raw memory usage.
1018+
// TODO(T240072104): Improve performance by bucketing the streams
1019+
// by size (by most significant bit) instead of sorting them.
1020+
std::vector<uint32_t> streamIndices(streams.size());
1021+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
1022+
std::sort(
1023+
streamIndices.begin(),
1024+
streamIndices.end(),
1025+
[&](const uint32_t& a, const uint32_t& b) {
1026+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1027+
});
1028+
batchChunkStreams(streamIndices);
10211029
}
10221030
}
10231031

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ struct VeloxWriterOptions {
100100
// Note: this is ignored when it is time to flush a stripe.
101101
size_t chunkedStreamBatchSize = 1024;
102102

103+
// When flushing data streams into chunks, streams with raw data size larger
104+
// than this threshold will be broken down into multiple smaller chunks. Each
105+
// chunk will be at most this size.
106+
uint64_t maxStreamChunkRawSize = 4 << 20;
107+
103108
// The factory function that produces the root encoding selection policy.
104109
// Encoding selection policy is the way to balance the tradeoffs of
105110
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1962,6 +1962,7 @@ struct ChunkFlushPolicyTestCase {
19621962
const uint64_t writerMemoryLowThresholdBytes{75 << 10};
19631963
const double estimatedCompressionFactor{1.0};
19641964
const uint32_t minStreamChunkRawSize{100};
1965+
const uint32_t maxStreamChunkRawSize{128 << 10};
19651966
const uint32_t expectedStripeCount{0};
19661967
const uint32_t expectedMaxChunkCount{0};
19671968
const uint32_t expectedMinChunkCount{0};
@@ -1977,6 +1978,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19771978
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19781979
nimble::VeloxWriterOptions writerOptions{
19791980
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1981+
.maxStreamChunkRawSize = GetParam().maxStreamChunkRawSize,
19801982
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19811983
.flushPolicyFactory = GetParam().enableChunking
19821984
? []() -> std::unique_ptr<nimble::FlushPolicy> {
@@ -2096,6 +2098,7 @@ INSTANTIATE_TEST_CASE_P(
20962098
.writerMemoryLowThresholdBytes = 75 << 10,
20972099
.estimatedCompressionFactor = 1.0,
20982100
.minStreamChunkRawSize = 100,
2101+
.maxStreamChunkRawSize = 128 << 10,
20992102
.expectedStripeCount = 4,
21002103
.expectedMaxChunkCount = 1,
21012104
.expectedMinChunkCount = 1,
@@ -2110,13 +2113,29 @@ INSTANTIATE_TEST_CASE_P(
21102113
.writerMemoryLowThresholdBytes = 75 << 10,
21112114
.estimatedCompressionFactor = 1.0,
21122115
.minStreamChunkRawSize = 100,
2116+
.maxStreamChunkRawSize = 128 << 10,
21132117
.expectedStripeCount = 7,
21142118
.expectedMaxChunkCount = 2,
21152119
.expectedMinChunkCount = 1,
21162120
.chunkedStreamBatchSize = 10,
21172121
},
2122+
// Reducing maxStreamChunkRawSize produces more chunks
2123+
ChunkFlushPolicyTestCase{
2124+
.batchCount = 20,
2125+
.enableChunking = true,
2126+
.targetStripeSizeBytes = 250 << 10, // 250KB
2127+
.writerMemoryHighThresholdBytes = 80 << 10,
2128+
.writerMemoryLowThresholdBytes = 75 << 10,
2129+
.estimatedCompressionFactor = 1.0,
2130+
.minStreamChunkRawSize = 100,
2131+
.maxStreamChunkRawSize = 12 << 10, // -126KB
2132+
.expectedStripeCount = 7,
2133+
.expectedMaxChunkCount = 9, // +7
2134+
.expectedMinChunkCount = 2, // +1
2135+
.chunkedStreamBatchSize = 10,
2136+
},
21182137
// High memory regression threshold
2119-
// Produces file identical to RawStripeSizeFlushPolicy
2138+
// Stripe count identical to RawStripeSizeFlushPolicy
21202139
ChunkFlushPolicyTestCase{
21212140
.batchCount = 20,
21222141
.enableChunking = true,
@@ -2125,8 +2144,9 @@ INSTANTIATE_TEST_CASE_P(
21252144
.writerMemoryLowThresholdBytes = 495 << 10, // +420KB
21262145
.estimatedCompressionFactor = 1.0,
21272146
.minStreamChunkRawSize = 100,
2147+
.maxStreamChunkRawSize = 128 << 10,
21282148
.expectedStripeCount = 4,
2129-
.expectedMaxChunkCount = 1,
2149+
.expectedMaxChunkCount = 2,
21302150
.expectedMinChunkCount = 1,
21312151
.chunkedStreamBatchSize = 10,
21322152
},
@@ -2140,13 +2160,14 @@ INSTANTIATE_TEST_CASE_P(
21402160
.writerMemoryLowThresholdBytes = 35 << 10, // -40KB
21412161
.estimatedCompressionFactor = 1.0,
21422162
.minStreamChunkRawSize = 100,
2163+
.maxStreamChunkRawSize = 128 << 10,
21432164
.expectedStripeCount = 10,
21442165
.expectedMaxChunkCount = 2,
21452166
.expectedMinChunkCount = 2,
21462167
.chunkedStreamBatchSize = 10,
21472168
},
21482169
// High target stripe size bytes (with disabled memory pressure
2149-
// optimization) produces fewer stripes. Single chunks.
2170+
// optimization) produces fewer stripes.
21502171
ChunkFlushPolicyTestCase{
21512172
.batchCount = 20,
21522173
.enableChunking = true,
@@ -2155,9 +2176,10 @@ INSTANTIATE_TEST_CASE_P(
21552176
.writerMemoryLowThresholdBytes = 1 << 20, // +1MB
21562177
.estimatedCompressionFactor = 1.0,
21572178
.minStreamChunkRawSize = 100,
2179+
.maxStreamChunkRawSize = 128 << 10,
21582180
.expectedStripeCount = 1, // -2 stripes
2159-
.expectedMaxChunkCount = 1,
2160-
.expectedMinChunkCount = 1,
2181+
.expectedMaxChunkCount = 5,
2182+
.expectedMinChunkCount = 2,
21612183
.chunkedStreamBatchSize = 10,
21622184

21632185
},
@@ -2171,6 +2193,7 @@ INSTANTIATE_TEST_CASE_P(
21712193
.writerMemoryLowThresholdBytes = 1 << 20, // +1MB
21722194
.estimatedCompressionFactor = 1.0,
21732195
.minStreamChunkRawSize = 100,
2196+
.maxStreamChunkRawSize = 128 << 10,
21742197
.expectedStripeCount = 7, // +6 stripes
21752198
.expectedMaxChunkCount = 1,
21762199
.expectedMinChunkCount = 1,
@@ -2186,6 +2209,7 @@ INSTANTIATE_TEST_CASE_P(
21862209
.writerMemoryLowThresholdBytes = 75 << 10,
21872210
.estimatedCompressionFactor = 1.0,
21882211
.minStreamChunkRawSize = 100,
2212+
.maxStreamChunkRawSize = 128 << 10,
21892213
.expectedStripeCount = 7,
21902214
.expectedMaxChunkCount = 2,
21912215
.expectedMinChunkCount = 1,

0 commit comments

Comments
 (0)