Skip to content

Commit 9a7b7cc

Browse files
macvincentfacebook-github-bot
authored andcommitted
feat(Nimble): New Flush Policy Implementation With Chunking (facebookincubator#240)
Summary: X-link: facebookincubator/velox#14846 This is an implementation of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). It has two phases: **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: * When memory usage exceeds the maximum threshold, initiates chunking to reduce memory footprint while continuing data ingestion * When previous chunking attempts succeeded and memory remains above the minimum threshold, continues chunking to further reduce memory usage **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: * When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, forces a full stripe flush to guarantee memory relief * Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data * Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold `shouldChunk` is also now a separate method required by all flush policies. We updated all previous tests and code references NOTE: The Velox repo change here is just test copied into an experimental directory that references the flush policy. Differential Revision: D81516697
1 parent 3b619cb commit 9a7b7cc

File tree

8 files changed

+359
-78
lines changed

8 files changed

+359
-78
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,41 @@
1717

1818
namespace facebook::nimble {
1919

20-
FlushDecision StripeRawSizeFlushPolicy::shouldFlush(
20+
bool StripeRawSizeFlushPolicy::shouldFlush(
2121
const StripeProgress& stripeProgress) {
22-
return stripeProgress.stripeRawSize >= stripeRawSize_ ? FlushDecision::Stripe
23-
: FlushDecision::None;
22+
return stripeProgress.stripeRawSize >= stripeRawSize_;
23+
}
24+
25+
// Relieve memory pressure with chunking. Tracks state between calls.
26+
bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) {
27+
const uint64_t inMemoryBytes =
28+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
29+
const auto writerMemoryThreshold = (lastChunkDecision_ == false)
30+
? config_.writerMemoryHighThresholdBytes
31+
: config_.writerMemoryLowThresholdBytes;
32+
lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold;
33+
return lastChunkDecision_;
34+
}
35+
36+
// Optimize for expected storage stripe size.
37+
// Does not track state between calls.
38+
bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) {
39+
// When chunking is unable to relieve memory pressure, we flush stripe.
40+
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
41+
config_.writerMemoryHighThresholdBytes) {
42+
return true;
43+
}
44+
45+
double compressionFactor = config_.estimatedCompressionFactor;
46+
// Use historical compression ratio as a heuristic when available.
47+
if (stripeProgress.stripeEncodedSize > 0) {
48+
compressionFactor =
49+
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
50+
stripeProgress.stripeEncodedSize;
51+
}
52+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
53+
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
54+
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes);
2455
}
2556

2657
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,43 @@
2020

2121
namespace facebook::nimble {
2222

23+
// TODO: Set default values for these parameters based on DISCO experiments.
24+
// Use abitrary values for now.
25+
struct ChunkFlushPolicyConfig {
26+
// Threshold to trigger chunking to relieve memory pressure
27+
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
28+
// Threshold below which chunking stops and stripe size optimization resumes
29+
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
30+
// Target size for encoded stripes
31+
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
32+
// Expected ratio of raw to encoded data
33+
const double estimatedCompressionFactor{1.3};
34+
};
35+
2336
struct StripeProgress {
2437
// Size of the stripe data when it's fully decompressed and decoded
2538
const uint64_t stripeRawSize;
2639
// Size of the stripe after buffered data is encoded and optionally compressed
2740
const uint64_t stripeEncodedSize;
28-
};
29-
30-
enum class FlushDecision : uint8_t {
31-
None = 0,
32-
Stripe = 1,
33-
Chunk = 2,
41+
// Logical size of the now encoded stripe data
42+
const uint64_t stripeEncodedLogicalSize;
3443
};
3544

3645
class FlushPolicy {
3746
public:
3847
virtual ~FlushPolicy() = default;
39-
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
48+
virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0;
49+
virtual bool shouldChunk(const StripeProgress&) {
50+
return false;
51+
}
4052
};
4153

4254
class StripeRawSizeFlushPolicy final : public FlushPolicy {
4355
public:
4456
explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize)
4557
: stripeRawSize_{stripeRawSize} {}
4658

47-
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
59+
bool shouldFlush(const StripeProgress& stripeProgress) override;
4860

4961
private:
5062
const uint64_t stripeRawSize_;
@@ -53,15 +65,41 @@ class StripeRawSizeFlushPolicy final : public FlushPolicy {
5365
class LambdaFlushPolicy : public FlushPolicy {
5466
public:
5567
explicit LambdaFlushPolicy(
56-
std::function<FlushDecision(const StripeProgress&)> lambda)
57-
: lambda_{std::move(lambda)} {}
68+
std::function<bool(const StripeProgress&)> flushLambda =
69+
[](const StripeProgress&) { return false; },
70+
std::function<bool(const StripeProgress&)> chunkLambda =
71+
[](const StripeProgress&) { return false; })
72+
: flushLambda_{std::move(flushLambda)},
73+
chunkLambda_{std::move(chunkLambda)} {}
74+
75+
bool shouldFlush(const StripeProgress& stripeProgress) override {
76+
return flushLambda_(stripeProgress);
77+
}
5878

59-
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
60-
return lambda_(stripeProgress);
79+
bool shouldChunk(const StripeProgress& stripeProgress) override {
80+
return chunkLambda_(stripeProgress);
6181
}
6282

6383
private:
64-
std::function<FlushDecision(const StripeProgress&)> lambda_;
84+
std::function<bool(const StripeProgress&)> flushLambda_;
85+
std::function<bool(const StripeProgress&)> chunkLambda_;
86+
};
87+
88+
class ChunkFlushPolicy : public FlushPolicy {
89+
public:
90+
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
91+
: config_{std::move(config)}, lastChunkDecision_{false} {}
92+
93+
// Optimize for expected storage stripe size.
94+
// Does not track state between calls.
95+
bool shouldFlush(const StripeProgress& stripeProgress) override;
96+
97+
// Relieve memory pressure with chunking. Tracks state between calls.
98+
bool shouldChunk(const StripeProgress& stripeProgress) override;
99+
100+
private:
101+
const ChunkFlushPolicyConfig config_;
102+
bool lastChunkDecision_;
65103
};
66104

67105
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -846,19 +846,19 @@ bool VeloxWriter::tryWriteStripe(bool force) {
846846
.stripeEncodedSize = context_->stripeSize});
847847
};
848848

849-
auto decision = force ? FlushDecision::Stripe : shouldFlush();
850-
if (decision == FlushDecision::None) {
851-
return false;
852-
}
849+
auto shouldChunk = [&]() {
850+
return context_->flushPolicy->shouldChunk(StripeProgress{
851+
.stripeRawSize = context_->memoryUsed,
852+
.stripeEncodedSize = context_->stripeSize});
853+
};
853854

854855
try {
855856
// TODO: we can improve merge the last chunk write with stripe
856-
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
857+
if (context_->options.enableChunking && shouldChunk()) {
857858
writeChunk(false);
858-
decision = shouldFlush();
859859
}
860860

861-
if (decision != FlushDecision::Stripe) {
861+
if (!(force || shouldFlush())) {
862862
return false;
863863
}
864864

dwio/nimble/velox/selective/tests/E2EFilterTest.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,11 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase {
128128
writeSchema_ = rowType_;
129129
VeloxWriterOptions options;
130130
options.enableChunking = true;
131-
options.flushPolicyFactory = [] {
131+
auto i = 0;
132+
options.flushPolicyFactory = [&] {
132133
return std::make_unique<LambdaFlushPolicy>(
133-
[i = 0](const StripeProgress&) mutable {
134-
if (i++ % 3 == 2) {
135-
return FlushDecision::Stripe;
136-
}
137-
return FlushDecision::Chunk;
138-
});
134+
[&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; },
135+
[&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; });
139136
};
140137
if (!flatMapColumns_.empty()) {
141138
setUpFlatMapColumns();

dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) {
566566
options.minStreamChunkRawSize = 0;
567567
options.flushPolicyFactory = [] {
568568
return std::make_unique<LambdaFlushPolicy>(
569-
[](const StripeProgress&) { return FlushDecision::Chunk; });
569+
[](const StripeProgress&) { return false; },
570+
[](const StripeProgress&) { return true; });
570571
};
571572
auto file = test::createNimbleFile(
572573
*rootPool(), {chunk1, chunk2, chunk3}, options, false);
@@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) {
612613
options.minStreamChunkRawSize = 0;
613614
options.flushPolicyFactory = [] {
614615
return std::make_unique<LambdaFlushPolicy>(
615-
[](const StripeProgress&) { return FlushDecision::Chunk; });
616+
[](const StripeProgress&) { return false; },
617+
[](const StripeProgress&) { return true; });
616618
};
617619
auto file =
618620
test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false);

0 commit comments

Comments
 (0)