Skip to content

Commit 3930c67

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
Support Per Stream Chunking to Relieve Memory Pressure (facebookincubator#243)
Summary: Pull Request resolved: facebookincubator#243 This is an implementation of a detail in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Rather than chunking all eligible streams, we chunk individual streams in the order of their raw size until memory pressure is relieved. For our unit tests, the maximum number of chunks produced is identical to the previous implementation. But there may be differences for large file sizes. This requires more experimentation and tuning to determine the right threshold value that takes advantage of this. Differential Revision: D81715655
1 parent 92978c7 commit 3930c67

File tree

4 files changed

+83
-7
lines changed

4 files changed

+83
-7
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,9 @@ void VeloxWriter::writeChunk(bool lastChunk) {
791791
<< ", chunk bytes: " << chunkSize;
792792
}
793793

794-
bool VeloxWriter::writeChunks(bool lastChunk) {
794+
bool VeloxWriter::writeChunks(
795+
bool lastChunk,
796+
std::span<const uint32_t> streamIndices) {
795797
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
796798
std::atomic<uint64_t> chunkSize = 0;
797799
std::atomic<uint64_t> logicalSizeBeforeEncoding = 0;
@@ -865,15 +867,26 @@ bool VeloxWriter::writeChunks(bool lastChunk) {
865867
}
866868
};
867869

870+
const auto& streams = context_->streams();
871+
std::vector<uint32_t> streamIndicesVec;
872+
if (streamIndices.empty()) {
873+
// Chunk all streams if no stream indices are provided
874+
streamIndicesVec.resize(streams.size());
875+
std::iota(streamIndicesVec.begin(), streamIndicesVec.end(), 0);
876+
streamIndices = streamIndicesVec;
877+
}
878+
868879
if (context_->options.encodingExecutor) {
869880
velox::dwio::common::ExecutorBarrier barrier{
870881
context_->options.encodingExecutor};
871-
for (auto& streamData : context_->streams()) {
882+
for (auto streamIndex : streamIndices) {
883+
auto& streamData = streams[streamIndex];
872884
barrier.add([&] { processStream(*streamData); });
873885
}
874886
barrier.waitAll();
875887
} else {
876-
for (auto& streamData : context_->streams()) {
888+
for (auto streamIndex : streamIndices) {
889+
auto& streamData = streams[streamIndex];
877890
processStream(*streamData);
878891
}
879892
}
@@ -972,8 +985,41 @@ bool VeloxWriter::tryWriteStripe(bool force) {
972985

973986
try {
974987
// TODO: we can improve merge the last chunk write with stripe
975-
if (context_->options.enableChunking) {
976-
while (shouldChunk() == ChunkDecision::Chunk && writeChunks(false)) {
988+
if (context_->options.enableChunking &&
989+
shouldChunk() == ChunkDecision::Chunk) {
990+
const auto& streams = context_->streams();
991+
const uint32_t streamCount = streams.size();
992+
// Sort streams for chunking based on raw memory usage.
993+
// TODO(T240072104): Improve performance by bucketing the streams by size
994+
// (most significant bit) instead of sorting.
995+
std::vector<uint32_t> streamIndices(streamCount);
996+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
997+
std::sort(
998+
streamIndices.begin(),
999+
streamIndices.end(),
1000+
[&](const uint32_t& a, const uint32_t& b) {
1001+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1002+
});
1003+
1004+
// Chunk streams in batches.
1005+
uint32_t currentIndex = 0;
1006+
ChunkDecision decision = ChunkDecision::Chunk;
1007+
NIMBLE_DASSERT(
1008+
context_->options.chunkedStreamBatchSize > 0,
1009+
"streamEncodingBatchSize must be greater than 0");
1010+
while (currentIndex < streams.size() &&
1011+
decision == ChunkDecision::Chunk) {
1012+
uint32_t endStreamIndex = std::min(
1013+
streamCount,
1014+
currentIndex + context_->options.chunkedStreamBatchSize);
1015+
std::span<const uint32_t> batchIndices(
1016+
streamIndices.data() + currentIndex, endStreamIndex - currentIndex);
1017+
// Stop attempting chunking once streams are too small to chunk.
1018+
if (!writeChunks(false, batchIndices)) {
1019+
break;
1020+
}
1021+
currentIndex = endStreamIndex;
1022+
decision = shouldChunk();
9771023
}
9781024
}
9791025

dwio/nimble/velox/VeloxWriter.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ class VeloxWriter {
8888
bool tryWriteStripe(bool force = false);
8989
void writeChunk(bool lastChunk = true);
9090
// Returns 'true' if chunks were written.
91-
bool writeChunks(bool lastChunk = true);
91+
bool writeChunks(
92+
bool lastChunk = true,
93+
std::span<const uint32_t> streamIndices = {});
9294
uint32_t writeStripe();
9395
};
9496

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ struct VeloxWriterOptions {
9696
// Note: this threshold is ignored when it is time to flush a stripe.
9797
uint64_t minStreamChunkRawSize = 1024;
9898

99+
// Number of streams to try chunking between memory pressure evaluations.
100+
// Note: this is ignored when it is time to flush a stripe.
101+
uint32_t chunkedStreamBatchSize = 1024;
102+
99103
// The factory function that produces the root encoding selection policy.
100104
// Encoding selection policy is the way to balance the tradeoffs of
101105
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ struct ChunkFlushPolicyTestCase {
19651965
const uint32_t expectedStripeCount{0};
19661966
const uint32_t expectedMaxChunkCount{0};
19671967
const uint32_t expectedMinChunkCount{0};
1968+
const uint32_t chunkedStreamBatchSize{2};
19681969
};
19691970

19701971
class ChunkFlushPolicyTest
@@ -1976,6 +1977,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19761977
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19771978
nimble::VeloxWriterOptions writerOptions{
19781979
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1980+
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19791981
.flushPolicyFactory = GetParam().enableChunking
19801982
? []() -> std::unique_ptr<nimble::FlushPolicy> {
19811983
return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2097,6 +2099,7 @@ INSTANTIATE_TEST_CASE_P(
20972099
.expectedStripeCount = 4,
20982100
.expectedMaxChunkCount = 1,
20992101
.expectedMinChunkCount = 1,
2102+
.chunkedStreamBatchSize = 10,
21002103
},
21012104
// Base case with default settings (has chunking)
21022105
ChunkFlushPolicyTestCase{
@@ -2110,6 +2113,7 @@ INSTANTIATE_TEST_CASE_P(
21102113
.expectedStripeCount = 7,
21112114
.expectedMaxChunkCount = 2,
21122115
.expectedMinChunkCount = 1,
2116+
.chunkedStreamBatchSize = 10,
21132117
},
21142118
// High memory regression threshold
21152119
// Produces file identical to RawStripeSizeFlushPolicy
@@ -2124,6 +2128,7 @@ INSTANTIATE_TEST_CASE_P(
21242128
.expectedStripeCount = 4,
21252129
.expectedMaxChunkCount = 1,
21262130
.expectedMinChunkCount = 1,
2131+
.chunkedStreamBatchSize = 10,
21272132
},
21282133
// Low memory regression threshold
21292134
// Produces file with more chunks per stripe
@@ -2138,6 +2143,7 @@ INSTANTIATE_TEST_CASE_P(
21382143
.expectedStripeCount = 10,
21392144
.expectedMaxChunkCount = 2,
21402145
.expectedMinChunkCount = 2,
2146+
.chunkedStreamBatchSize = 10,
21412147
},
21422148
// High target stripe size bytes (with disabled memory pressure
21432149
// optimization) produces fewer stripes. Single chunks.
@@ -2152,6 +2158,8 @@ INSTANTIATE_TEST_CASE_P(
21522158
.expectedStripeCount = 1, // -2 stripes
21532159
.expectedMaxChunkCount = 1,
21542160
.expectedMinChunkCount = 1,
2161+
.chunkedStreamBatchSize = 10,
2162+
21552163
},
21562164
// Low target stripe size bytes (with disabled memory pressure
21572165
// optimization) produces more stripes. Single chunks.
@@ -2166,5 +2174,21 @@ INSTANTIATE_TEST_CASE_P(
21662174
.expectedStripeCount = 7, // +6 stripes
21672175
.expectedMaxChunkCount = 1,
21682176
.expectedMinChunkCount = 1,
2169-
}));
2177+
.chunkedStreamBatchSize = 10,
2178+
2179+
},
2180+
// Higher chunked stream batch size (no change in policy)
2181+
ChunkFlushPolicyTestCase{
2182+
.batchCount = 20,
2183+
.enableChunking = true,
2184+
.targetStripeSizeBytes = 250 << 10, // 250KB
2185+
.writerMemoryHighThresholdBytes = 80 << 10,
2186+
.writerMemoryLowThresholdBytes = 75 << 10,
2187+
.estimatedCompressionFactor = 1.0,
2188+
.minStreamChunkRawSize = 100,
2189+
.expectedStripeCount = 7,
2190+
.expectedMaxChunkCount = 2,
2191+
.expectedMinChunkCount = 1,
2192+
.chunkedStreamBatchSize = 3} // +1
2193+
));
21702194
} // namespace facebook

0 commit comments

Comments
 (0)