Skip to content

Commit cae868a

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
Integrate Max Stream Size Chunking in Velox Writer (facebookincubator#249)
Summary: Pull Request resolved: facebookincubator#249 This is the last feature of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Here, we break down large streams into multiple chunks of size up to `maxStreamChunkRawSize`. This protects the reader from attempting to materialize huge chunks. We included StreamData support for this in the previous diff. In this diff, we integrate with the VeloxWriter. With this change, while memory pressure is detected, we: 1. Chunk large streams above `maxStreamChunkRawSize`, retaining stream data below the limit. 2. If there is still memory pressure after the first step, chunk streams with size above `minStreamChunkRawSize`. During stripe flush, we chunk all remaining data, breaking down streams above `maxStreamChunkRawSize` into smaller chunks. --- The general chunking policy has two phases: ## **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: - When memory usage exceeds the maximum threshold, it initiates chunking to reduce memory footprint while continuing data ingestion. - When previous chunking attempts succeeded and memory remains above the minimum threshold, it continues chunking to further reduce memory usage. - When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, it forces a full stripe flush to guarantee memory relief. ## **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: - Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data. - Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold. Differential Revision: D82175496
1 parent 2c18ca7 commit cae868a

File tree

7 files changed

+388
-344
lines changed

7 files changed

+388
-344
lines changed

dwio/nimble/velox/StreamChunker.cpp

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,27 @@ std::unique_ptr<StreamChunker> encodeStreamTyped(
88
StreamData& streamData,
99
uint64_t maxChunkSize,
1010
uint64_t minChunkSize,
11-
bool lastChunk) {
11+
bool emptyStreamContent,
12+
bool isNullStream) {
1213
const auto& streamDataType = streamData.type();
1314
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
1415
return std::make_unique<ContentStreamChunker<T>>(
1516
static_cast<ContentStreamData<T>&>(streamData),
1617
maxChunkSize,
17-
minChunkSize,
18-
lastChunk);
18+
minChunkSize);
1919
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
2020
return std::make_unique<NullsStreamChunker>(
2121
static_cast<NullsStreamData&>(streamData),
2222
maxChunkSize,
2323
minChunkSize,
24-
lastChunk);
24+
emptyStreamContent);
2525
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
2626
return std::make_unique<NullableContentStreamChunker<T>>(
2727
static_cast<NullableContentStreamData<T>&>(streamData),
2828
maxChunkSize,
2929
minChunkSize,
30-
lastChunk);
30+
emptyStreamContent,
31+
isNullStream);
3132
}
3233
NIMBLE_UNREACHABLE(
3334
fmt::format("Unsupported streamData type {}", streamDataType))
@@ -37,37 +38,74 @@ std::unique_ptr<StreamChunker> getStreamChunker(
3738
StreamData& streamData,
3839
uint64_t maxChunkSize,
3940
uint64_t minChunkSize,
40-
bool lastChunk) {
41+
bool emptyStreamContent,
42+
bool isNullStream) {
4143
const auto scalarKind = streamData.descriptor().scalarKind();
4244
switch (scalarKind) {
4345
case ScalarKind::Bool:
4446
return encodeStreamTyped<bool>(
45-
streamData, maxChunkSize, minChunkSize, lastChunk);
47+
streamData,
48+
maxChunkSize,
49+
minChunkSize,
50+
emptyStreamContent,
51+
isNullStream);
4652
case ScalarKind::Int8:
4753
return encodeStreamTyped<int8_t>(
48-
streamData, maxChunkSize, minChunkSize, lastChunk);
54+
streamData,
55+
maxChunkSize,
56+
minChunkSize,
57+
emptyStreamContent,
58+
isNullStream);
4959
case ScalarKind::Int16:
5060
return encodeStreamTyped<int16_t>(
51-
streamData, maxChunkSize, minChunkSize, lastChunk);
61+
streamData,
62+
maxChunkSize,
63+
minChunkSize,
64+
emptyStreamContent,
65+
isNullStream);
5266
case ScalarKind::Int32:
5367
return encodeStreamTyped<int32_t>(
54-
streamData, maxChunkSize, minChunkSize, lastChunk);
68+
streamData,
69+
maxChunkSize,
70+
minChunkSize,
71+
emptyStreamContent,
72+
isNullStream);
5573
case ScalarKind::UInt32:
5674
return encodeStreamTyped<uint32_t>(
57-
streamData, maxChunkSize, minChunkSize, lastChunk);
75+
streamData,
76+
maxChunkSize,
77+
minChunkSize,
78+
emptyStreamContent,
79+
isNullStream);
5880
case ScalarKind::Int64:
5981
return encodeStreamTyped<int64_t>(
60-
streamData, maxChunkSize, minChunkSize, lastChunk);
82+
streamData,
83+
maxChunkSize,
84+
minChunkSize,
85+
emptyStreamContent,
86+
isNullStream);
6187
case ScalarKind::Float:
6288
return encodeStreamTyped<float>(
63-
streamData, maxChunkSize, minChunkSize, lastChunk);
89+
streamData,
90+
maxChunkSize,
91+
minChunkSize,
92+
emptyStreamContent,
93+
isNullStream);
6494
case ScalarKind::Double:
6595
return encodeStreamTyped<double>(
66-
streamData, maxChunkSize, minChunkSize, lastChunk);
96+
streamData,
97+
maxChunkSize,
98+
minChunkSize,
99+
emptyStreamContent,
100+
isNullStream);
67101
case ScalarKind::String:
68102
case ScalarKind::Binary:
69103
return encodeStreamTyped<std::string_view>(
70-
streamData, maxChunkSize, minChunkSize, lastChunk);
104+
streamData,
105+
maxChunkSize,
106+
minChunkSize,
107+
emptyStreamContent,
108+
isNullStream);
71109
default:
72110
NIMBLE_UNREACHABLE(
73111
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));

0 commit comments

Comments
 (0)