Skip to content

Commit 1e839ac

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Chunking in StreamData (facebookincubator#248)
Summary: Another feature in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1) is the ability to split large streams above a specified limit into smaller chunks. In this diff, we implement a `popChunk` method in each `StreamData` class to handle this functionality. With this feature we are not forced to encode extremely large streams into a single chunk. Integration will happen in the next diff. Differential Revision: D81824143
1 parent 699e7eb commit 1e839ac

File tree

6 files changed

+1119
-1
lines changed

6 files changed

+1119
-1
lines changed

dwio/nimble/common/Vector.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,16 @@ class Vector {
182182
}
183183
}
184184

185+
// Removes the first |count| elements from the vector.
186+
// Does NOT shrink capacity to fit the new size.
187+
void trimLeft(uint64_t count) {
188+
NIMBLE_DASSERT(
189+
count <= size_, "Cannot trim more elements than exist in vector");
190+
const uint64_t remainingSize = size_ - count;
191+
std::move(begin() + count, end(), begin());
192+
resize(remainingSize);
193+
}
194+
185195
// Add |copies| copies of |value| to the end of the vector.
186196
void extend(uint64_t copies, T value) {
187197
reserve(size_ + copies);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
2+
3+
#include "dwio/nimble/velox/StreamChunker.h"
4+
5+
namespace facebook::nimble {
6+
template <typename T>
7+
std::unique_ptr<StreamChunker> encodeStreamTyped(
8+
StreamData& streamData,
9+
uint64_t maxChunkSize,
10+
uint64_t minChunkSize,
11+
bool lastChunk) {
12+
const auto& streamDataType = streamData.type();
13+
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
14+
return std::make_unique<ContentStreamChunker<T>>(
15+
static_cast<ContentStreamData<T>&>(streamData),
16+
maxChunkSize,
17+
minChunkSize,
18+
lastChunk);
19+
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
20+
return std::make_unique<NullsStreamChunker>(
21+
static_cast<NullsStreamData&>(streamData),
22+
maxChunkSize,
23+
minChunkSize,
24+
lastChunk);
25+
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
26+
return std::make_unique<NullableContentStreamChunker<T>>(
27+
static_cast<NullableContentStreamData<T>&>(streamData),
28+
maxChunkSize,
29+
minChunkSize,
30+
lastChunk);
31+
}
32+
NIMBLE_UNREACHABLE(
33+
fmt::format("Unsupported streamData type {}", streamDataType))
34+
}
35+
36+
std::unique_ptr<StreamChunker> getStreamChunker(
37+
StreamData& streamData,
38+
uint64_t maxChunkSize,
39+
uint64_t minChunkSize,
40+
bool lastChunk) {
41+
const auto scalarKind = streamData.descriptor().scalarKind();
42+
switch (scalarKind) {
43+
case ScalarKind::Bool:
44+
return encodeStreamTyped<bool>(
45+
streamData, maxChunkSize, minChunkSize, lastChunk);
46+
case ScalarKind::Int8:
47+
return encodeStreamTyped<int8_t>(
48+
streamData, maxChunkSize, minChunkSize, lastChunk);
49+
case ScalarKind::Int16:
50+
return encodeStreamTyped<int16_t>(
51+
streamData, maxChunkSize, minChunkSize, lastChunk);
52+
case ScalarKind::Int32:
53+
return encodeStreamTyped<int32_t>(
54+
streamData, maxChunkSize, minChunkSize, lastChunk);
55+
case ScalarKind::UInt32:
56+
return encodeStreamTyped<uint32_t>(
57+
streamData, maxChunkSize, minChunkSize, lastChunk);
58+
case ScalarKind::Int64:
59+
return encodeStreamTyped<int64_t>(
60+
streamData, maxChunkSize, minChunkSize, lastChunk);
61+
case ScalarKind::Float:
62+
return encodeStreamTyped<float>(
63+
streamData, maxChunkSize, minChunkSize, lastChunk);
64+
case ScalarKind::Double:
65+
return encodeStreamTyped<double>(
66+
streamData, maxChunkSize, minChunkSize, lastChunk);
67+
case ScalarKind::String:
68+
case ScalarKind::Binary:
69+
return encodeStreamTyped<std::string_view>(
70+
streamData, maxChunkSize, minChunkSize, lastChunk);
71+
default:
72+
NIMBLE_UNREACHABLE(
73+
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
74+
}
75+
}
76+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)