Skip to content

Commit 9272cdc

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream Chunking to Relieve Memory Pressure (facebookincubator#243)
Summary: This is an implementation of a detail in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Rather than chunking all eligible streams, we chunk individual streams in the order of their raw size until memory pressure is relieved. For our unit tests, the maximum number of chunks produced is identical to the previous implementation. But there may be differences for large file sizes. This requires more experimentation and tuning to determine the right threshold value that takes advantage of this. Differential Revision: D81715655
1 parent e431a82 commit 9272cdc

File tree

4 files changed

+83
-7
lines changed

4 files changed

+83
-7
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,9 @@ void VeloxWriter::writeChunk(bool lastChunk) {
791791
<< ", chunk bytes: " << chunkSize;
792792
}
793793

794-
bool VeloxWriter::writeChunks(bool lastChunk) {
794+
bool VeloxWriter::writeChunks(
795+
bool lastChunk,
796+
std::span<const uint32_t> streamIndices) {
795797
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
796798
std::atomic<uint64_t> chunkSize = 0;
797799
std::atomic<uint64_t> logicalSizeBeforeEncoding = 0;
@@ -858,15 +860,26 @@ bool VeloxWriter::writeChunks(bool lastChunk) {
858860
}
859861
};
860862

863+
const auto& streams = context_->streams();
864+
std::vector<uint32_t> streamIndicesVec;
865+
if (streamIndices.empty()) {
866+
// Chunk all streams if no stream indices are provided
867+
streamIndicesVec.resize(streams.size());
868+
std::iota(streamIndicesVec.begin(), streamIndicesVec.end(), 0);
869+
streamIndices = streamIndicesVec;
870+
}
871+
861872
if (context_->options.encodingExecutor) {
862873
velox::dwio::common::ExecutorBarrier barrier{
863874
context_->options.encodingExecutor};
864-
for (auto& streamData : context_->streams()) {
875+
for (auto streamIndex : streamIndices) {
876+
auto& streamData = streams[streamIndex];
865877
barrier.add([&] { processStream(*streamData); });
866878
}
867879
barrier.waitAll();
868880
} else {
869-
for (auto& streamData : context_->streams()) {
881+
for (auto streamIndex : streamIndices) {
882+
auto& streamData = streams[streamIndex];
870883
processStream(*streamData);
871884
}
872885
}
@@ -965,8 +978,41 @@ bool VeloxWriter::tryWriteStripe(bool force) {
965978

966979
try {
967980
// TODO: we can improve merge the last chunk write with stripe
968-
if (context_->options.enableChunking) {
969-
while (shouldChunk() == ChunkDecision::Chunk && writeChunks(false)) {
981+
if (context_->options.enableChunking &&
982+
shouldChunk() == ChunkDecision::Chunk) {
983+
const auto& streams = context_->streams();
984+
const size_t streamCount = streams.size();
985+
// Sort streams for chunking based on raw memory usage.
986+
// TODO(T240072104): Improve performance by bucketing the streams by size
987+
// (most significant bit) instead of sorting.
988+
std::vector<uint32_t> streamIndices(streamCount);
989+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
990+
std::sort(
991+
streamIndices.begin(),
992+
streamIndices.end(),
993+
[&](const uint32_t& a, const uint32_t& b) {
994+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
995+
});
996+
997+
// Chunk streams in batches.
998+
size_t currentIndex = 0;
999+
ChunkDecision decision = ChunkDecision::Chunk;
1000+
NIMBLE_DASSERT(
1001+
context_->options.chunkedStreamBatchSize > 0,
1002+
"streamEncodingBatchSize must be greater than 0");
1003+
while (currentIndex < streams.size() &&
1004+
decision == ChunkDecision::Chunk) {
1005+
size_t endStreamIndex = std::min(
1006+
streamCount,
1007+
currentIndex + context_->options.chunkedStreamBatchSize);
1008+
std::span<const uint32_t> batchIndices(
1009+
streamIndices.data() + currentIndex, endStreamIndex - currentIndex);
1010+
// Stop attempting chunking once streams are too small to chunk.
1011+
if (!writeChunks(false, batchIndices)) {
1012+
break;
1013+
}
1014+
currentIndex = endStreamIndex;
1015+
decision = shouldChunk();
9701016
}
9711017
}
9721018

dwio/nimble/velox/VeloxWriter.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ class VeloxWriter {
8888
bool tryWriteStripe(bool force = false);
8989
void writeChunk(bool lastChunk = true);
9090
// Returns 'true' if chunks were written.
91-
bool writeChunks(bool lastChunk = true);
91+
bool writeChunks(
92+
bool lastChunk = true,
93+
std::span<const uint32_t> streamIndices = {});
9294
uint32_t writeStripe();
9395
};
9496

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ struct VeloxWriterOptions {
9696
// Note: this threshold is ignored when it is time to flush a stripe.
9797
uint64_t minStreamChunkRawSize = 1024;
9898

99+
// Number of streams to try chunking between memory pressure evaluations.
100+
// Note: this is ignored when it is time to flush a stripe.
101+
size_t chunkedStreamBatchSize = 1024;
102+
99103
// The factory function that produces the root encoding selection policy.
100104
// Encoding selection policy is the way to balance the tradeoffs of
101105
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ struct ChunkFlushPolicyTestCase {
19651965
const uint32_t expectedStripeCount{0};
19661966
const uint32_t expectedMaxChunkCount{0};
19671967
const uint32_t expectedMinChunkCount{0};
1968+
const uint32_t chunkedStreamBatchSize{2};
19681969
};
19691970

19701971
class ChunkFlushPolicyTest
@@ -1976,6 +1977,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19761977
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19771978
nimble::VeloxWriterOptions writerOptions{
19781979
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1980+
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19791981
.flushPolicyFactory = GetParam().enableChunking
19801982
? []() -> std::unique_ptr<nimble::FlushPolicy> {
19811983
return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2097,6 +2099,7 @@ INSTANTIATE_TEST_CASE_P(
20972099
.expectedStripeCount = 4,
20982100
.expectedMaxChunkCount = 1,
20992101
.expectedMinChunkCount = 1,
2102+
.chunkedStreamBatchSize = 10,
21002103
},
21012104
// Base case with default settings (has chunking)
21022105
ChunkFlushPolicyTestCase{
@@ -2110,6 +2113,7 @@ INSTANTIATE_TEST_CASE_P(
21102113
.expectedStripeCount = 7,
21112114
.expectedMaxChunkCount = 2,
21122115
.expectedMinChunkCount = 1,
2116+
.chunkedStreamBatchSize = 10,
21132117
},
21142118
// High memory regression threshold
21152119
// Produces file identical to RawStripeSizeFlushPolicy
@@ -2124,6 +2128,7 @@ INSTANTIATE_TEST_CASE_P(
21242128
.expectedStripeCount = 4,
21252129
.expectedMaxChunkCount = 1,
21262130
.expectedMinChunkCount = 1,
2131+
.chunkedStreamBatchSize = 10,
21272132
},
21282133
// Low memory regression threshold
21292134
// Produces file with more chunks per stripe
@@ -2138,6 +2143,7 @@ INSTANTIATE_TEST_CASE_P(
21382143
.expectedStripeCount = 10,
21392144
.expectedMaxChunkCount = 2,
21402145
.expectedMinChunkCount = 2,
2146+
.chunkedStreamBatchSize = 10,
21412147
},
21422148
// High target stripe size bytes (with disabled memory pressure
21432149
// optimization) produces fewer stripes. Single chunks.
@@ -2152,6 +2158,8 @@ INSTANTIATE_TEST_CASE_P(
21522158
.expectedStripeCount = 1, // -2 stripes
21532159
.expectedMaxChunkCount = 1,
21542160
.expectedMinChunkCount = 1,
2161+
.chunkedStreamBatchSize = 10,
2162+
21552163
},
21562164
// Low target stripe size bytes (with disabled memory pressure
21572165
// optimization) produces more stripes. Single chunks.
@@ -2166,5 +2174,21 @@ INSTANTIATE_TEST_CASE_P(
21662174
.expectedStripeCount = 7, // +6 stripes
21672175
.expectedMaxChunkCount = 1,
21682176
.expectedMinChunkCount = 1,
2169-
}));
2177+
.chunkedStreamBatchSize = 10,
2178+
2179+
},
2180+
// Higher chunked stream batch size (no change in policy)
2181+
ChunkFlushPolicyTestCase{
2182+
.batchCount = 20,
2183+
.enableChunking = true,
2184+
.targetStripeSizeBytes = 250 << 10, // 250KB
2185+
.writerMemoryHighThresholdBytes = 80 << 10,
2186+
.writerMemoryLowThresholdBytes = 75 << 10,
2187+
.estimatedCompressionFactor = 1.0,
2188+
.minStreamChunkRawSize = 100,
2189+
.expectedStripeCount = 7,
2190+
.expectedMaxChunkCount = 2,
2191+
.expectedMinChunkCount = 1,
2192+
.chunkedStreamBatchSize = 3} // +1
2193+
));
21702194
} // namespace facebook

0 commit comments

Comments
 (0)