Skip to content

Commit 9feed6a

Browse files
macvincentfacebook-github-bot
authored andcommitted
feat(Nimble): New Flush Policy Implementation With Chunking (facebookincubator#240)
Summary: X-link: facebookincubator/velox#14846 This is an implementation of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). It has two phases: **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: * When memory usage exceeds the maximum threshold, initiates chunking to reduce memory footprint while continuing data ingestion * When previous chunking attempts succeeded and memory remains above the minimum threshold, continues chunking to further reduce memory usage **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: * When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, forces a full stripe flush to guarantee memory relief * Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data * Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold `shouldChunk` is also now a separate method required by all flush policies. We updated all previous tests and code references NOTE: The Velox repo change here is just test copied into an experimental directory that references the flush policy. Differential Revision: D81516697
1 parent 2af921d commit 9feed6a

File tree

8 files changed

+387
-69
lines changed

8 files changed

+387
-69
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,42 @@ FlushDecision StripeRawSizeFlushPolicy::shouldFlush(
2323
: FlushDecision::None;
2424
}
2525

26+
// Relieve memory pressure with chunking. Tracks state between calls.
27+
ChunkDecision ChunkFlushPolicy::shouldChunk(
28+
const StripeProgress& stripeProgress) {
29+
const uint64_t inMemoryBytes =
30+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
31+
const auto writerMemoryThreshold = (lastChunkDecision_ == ChunkDecision::None)
32+
? config_.writerMemoryHighThresholdBytes
33+
: config_.writerMemoryLowThresholdBytes;
34+
lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold
35+
? ChunkDecision::Chunk
36+
: ChunkDecision::None;
37+
return lastChunkDecision_;
38+
}
39+
40+
// Optimize for expected storage stripe size.
41+
// Does not track state between calls.
42+
FlushDecision ChunkFlushPolicy::shouldFlush(
43+
const StripeProgress& stripeProgress) {
44+
// When chunking is unable to relieve memory pressure, we flush stripe.
45+
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
46+
config_.writerMemoryHighThresholdBytes) {
47+
return FlushDecision::Stripe;
48+
}
49+
50+
double compressionFactor = config_.estimatedCompressionFactor;
51+
// Use historical compression ratio as a heuristic when available.
52+
if (stripeProgress.stripeEncodedSize > 0) {
53+
compressionFactor =
54+
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
55+
stripeProgress.stripeEncodedSize;
56+
}
57+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
58+
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
59+
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes)
60+
? FlushDecision::Stripe
61+
: FlushDecision::None;
62+
}
63+
2664
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,45 @@
2020

2121
namespace facebook::nimble {
2222

23+
// TODO: Set default values for these parameters based on DISCO experiments.
24+
// Use abitrary values for now.
25+
struct ChunkFlushPolicyConfig {
26+
// Threshold to trigger chunking to relieve memory pressure
27+
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
28+
// Threshold below which chunking stops and stripe size optimization resumes
29+
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
30+
// Target size for encoded stripes
31+
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
32+
// Expected ratio of raw to encoded data
33+
const double estimatedCompressionFactor{1.3};
34+
};
35+
36+
enum class FlushDecision : bool {
37+
None = false,
38+
Stripe = true,
39+
};
40+
41+
enum class ChunkDecision : bool {
42+
None = false,
43+
Chunk = true,
44+
};
45+
2346
struct StripeProgress {
2447
// Size of the stripe data when it's fully decompressed and decoded
2548
const uint64_t stripeRawSize;
2649
// Size of the stripe after buffered data is encoded and optionally compressed
2750
const uint64_t stripeEncodedSize;
28-
};
29-
30-
enum class FlushDecision : uint8_t {
31-
None = 0,
32-
Stripe = 1,
33-
Chunk = 2,
51+
// Logical size of the now encoded stripe data
52+
const uint64_t stripeEncodedLogicalSize;
3453
};
3554

3655
class FlushPolicy {
3756
public:
3857
virtual ~FlushPolicy() = default;
3958
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
59+
virtual ChunkDecision shouldChunk(const StripeProgress&) {
60+
return ChunkDecision::None;
61+
}
4062
};
4163

4264
class StripeRawSizeFlushPolicy final : public FlushPolicy {
@@ -53,15 +75,41 @@ class StripeRawSizeFlushPolicy final : public FlushPolicy {
5375
class LambdaFlushPolicy : public FlushPolicy {
5476
public:
5577
explicit LambdaFlushPolicy(
56-
std::function<FlushDecision(const StripeProgress&)> lambda)
57-
: lambda_{std::move(lambda)} {}
78+
std::function<FlushDecision(const StripeProgress&)> flushLambda =
79+
[](const StripeProgress&) { return FlushDecision::None; },
80+
std::function<ChunkDecision(const StripeProgress&)> chunkLambda =
81+
[](const StripeProgress&) { return ChunkDecision::None; })
82+
: flushLambda_{std::move(flushLambda)},
83+
chunkLambda_{std::move(chunkLambda)} {}
5884

5985
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
60-
return lambda_(stripeProgress);
86+
return flushLambda_(stripeProgress);
6187
}
6288

89+
ChunkDecision shouldChunk(const StripeProgress& stripeProgress) override {
90+
return chunkLambda_(stripeProgress);
91+
}
92+
93+
private:
94+
std::function<FlushDecision(const StripeProgress&)> flushLambda_;
95+
std::function<ChunkDecision(const StripeProgress&)> chunkLambda_;
96+
};
97+
98+
class ChunkFlushPolicy : public FlushPolicy {
99+
public:
100+
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
101+
: config_{std::move(config)}, lastChunkDecision_{ChunkDecision::None} {}
102+
103+
// Optimize for expected storage stripe size.
104+
// Does not track state between calls.
105+
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
106+
107+
// Relieve memory pressure with chunking. Tracks state between calls.
108+
ChunkDecision shouldChunk(const StripeProgress& stripeProgress) override;
109+
63110
private:
64-
std::function<FlushDecision(const StripeProgress&)> lambda_;
111+
const ChunkFlushPolicyConfig config_;
112+
ChunkDecision lastChunkDecision_;
65113
};
66114

67115
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -846,19 +846,20 @@ bool VeloxWriter::tryWriteStripe(bool force) {
846846
.stripeEncodedSize = context_->stripeSize});
847847
};
848848

849-
auto decision = force ? FlushDecision::Stripe : shouldFlush();
850-
if (decision == FlushDecision::None) {
851-
return false;
852-
}
849+
auto shouldChunk = [&]() {
850+
return context_->flushPolicy->shouldChunk(StripeProgress{
851+
.stripeRawSize = context_->memoryUsed,
852+
.stripeEncodedSize = context_->stripeSize});
853+
};
853854

854855
try {
855856
// TODO: we can improve merge the last chunk write with stripe
856-
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
857+
if (context_->options.enableChunking &&
858+
shouldChunk() == ChunkDecision::Chunk) {
857859
writeChunk(false);
858-
decision = shouldFlush();
859860
}
860861

861-
if (decision != FlushDecision::Stripe) {
862+
if (!(force || shouldFlush() == FlushDecision::Stripe)) {
862863
return false;
863864
}
864865

dwio/nimble/velox/selective/tests/E2EFilterTest.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,14 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase {
128128
writeSchema_ = rowType_;
129129
VeloxWriterOptions options;
130130
options.enableChunking = true;
131-
options.flushPolicyFactory = [] {
131+
auto i = 0;
132+
options.flushPolicyFactory = [&] {
132133
return std::make_unique<LambdaFlushPolicy>(
133-
[i = 0](const StripeProgress&) mutable {
134-
if (i++ % 3 == 2) {
135-
return FlushDecision::Stripe;
136-
}
137-
return FlushDecision::Chunk;
134+
[&](const StripeProgress&) {
135+
return (i++ % 3 == 2) ? FlushDecision::Stripe : FlushDecision::None;
136+
},
137+
[&](const StripeProgress&) {
138+
return (i++ % 3 == 2) ? ChunkDecision::Chunk : ChunkDecision::None;
138139
});
139140
};
140141
if (!flatMapColumns_.empty()) {

dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) {
566566
options.minStreamChunkRawSize = 0;
567567
options.flushPolicyFactory = [] {
568568
return std::make_unique<LambdaFlushPolicy>(
569-
[](const StripeProgress&) { return FlushDecision::Chunk; });
569+
[](const StripeProgress&) { return FlushDecision::None; },
570+
[](const StripeProgress&) { return ChunkDecision::Chunk; });
570571
};
571572
auto file = test::createNimbleFile(
572573
*rootPool(), {chunk1, chunk2, chunk3}, options, false);
@@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) {
612613
options.minStreamChunkRawSize = 0;
613614
options.flushPolicyFactory = [] {
614615
return std::make_unique<LambdaFlushPolicy>(
615-
[](const StripeProgress&) { return FlushDecision::Chunk; });
616+
[](const StripeProgress&) { return FlushDecision::None; },
617+
[](const StripeProgress&) { return ChunkDecision::Chunk; });
616618
};
617619
auto file =
618620
test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false);

0 commit comments

Comments
 (0)