Skip to content

Commit 0f0ee05

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Chunking in StreamData (facebookincubator#248)
Summary: Another feature in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1) is the ability to split large streams above a specified limit into smaller chunks. In this diff, we implement a `popChunk` method in each `StreamData` class to handle this functionality. With this feature we are not forced to encode extremely large streams into a single chunk. Integration will happen in the next diff. Differential Revision: D81824143
1 parent 5c7ecee commit 0f0ee05

File tree

6 files changed

+1123
-1
lines changed

6 files changed

+1123
-1
lines changed

dwio/nimble/common/Vector.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,16 @@ class Vector {
182182
}
183183
}
184184

185+
// Removes the first |count| elements from the vector.
186+
// Does NOT shrink capacity to fit the new size.
187+
void trimLeft(uint64_t count) {
188+
NIMBLE_DASSERT(
189+
count <= size_, "Cannot trim more elements than exist in vector");
190+
const uint64_t remainingSize = size_ - count;
191+
std::move(begin() + count, end(), begin());
192+
resize(remainingSize);
193+
}
194+
185195
// Add |copies| copies of |value| to the end of the vector.
186196
void extend(uint64_t copies, T value) {
187197
reserve(size_ + copies);
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
2+
3+
#include "dwio/nimble/velox/StreamChunker.h"
4+
5+
namespace facebook::nimble {
6+
template <typename T>
7+
std::unique_ptr<StreamChunker> encodeStreamTyped(
8+
StreamData& streamData,
9+
uint64_t maxChunkSize,
10+
uint64_t minChunkSize,
11+
bool emptyStreamContent,
12+
bool isNullStream) {
13+
const auto& streamDataType = streamData.type();
14+
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
15+
return std::make_unique<ContentStreamChunker<T>>(
16+
static_cast<ContentStreamData<T>&>(streamData),
17+
maxChunkSize,
18+
minChunkSize);
19+
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
20+
return std::make_unique<NullsStreamChunker>(
21+
static_cast<NullsStreamData&>(streamData),
22+
maxChunkSize,
23+
minChunkSize,
24+
emptyStreamContent);
25+
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
26+
return std::make_unique<NullableContentStreamChunker<T>>(
27+
static_cast<NullableContentStreamData<T>&>(streamData),
28+
maxChunkSize,
29+
minChunkSize,
30+
emptyStreamContent,
31+
isNullStream);
32+
}
33+
NIMBLE_UNREACHABLE(
34+
fmt::format("Unsupported streamData type {}", streamDataType))
35+
}
36+
37+
std::unique_ptr<StreamChunker> getStreamChunker(
38+
StreamData& streamData,
39+
uint64_t maxChunkSize,
40+
uint64_t minChunkSize,
41+
bool emptyStreamContent,
42+
bool isNullStream) {
43+
const auto scalarKind = streamData.descriptor().scalarKind();
44+
switch (scalarKind) {
45+
case ScalarKind::Bool:
46+
return encodeStreamTyped<bool>(
47+
streamData,
48+
maxChunkSize,
49+
minChunkSize,
50+
emptyStreamContent,
51+
isNullStream);
52+
case ScalarKind::Int8:
53+
return encodeStreamTyped<int8_t>(
54+
streamData,
55+
maxChunkSize,
56+
minChunkSize,
57+
emptyStreamContent,
58+
isNullStream);
59+
case ScalarKind::Int16:
60+
return encodeStreamTyped<int16_t>(
61+
streamData,
62+
maxChunkSize,
63+
minChunkSize,
64+
emptyStreamContent,
65+
isNullStream);
66+
case ScalarKind::Int32:
67+
return encodeStreamTyped<int32_t>(
68+
streamData,
69+
maxChunkSize,
70+
minChunkSize,
71+
emptyStreamContent,
72+
isNullStream);
73+
case ScalarKind::UInt32:
74+
return encodeStreamTyped<uint32_t>(
75+
streamData,
76+
maxChunkSize,
77+
minChunkSize,
78+
emptyStreamContent,
79+
isNullStream);
80+
case ScalarKind::Int64:
81+
return encodeStreamTyped<int64_t>(
82+
streamData,
83+
maxChunkSize,
84+
minChunkSize,
85+
emptyStreamContent,
86+
isNullStream);
87+
case ScalarKind::Float:
88+
return encodeStreamTyped<float>(
89+
streamData,
90+
maxChunkSize,
91+
minChunkSize,
92+
emptyStreamContent,
93+
isNullStream);
94+
case ScalarKind::Double:
95+
return encodeStreamTyped<double>(
96+
streamData,
97+
maxChunkSize,
98+
minChunkSize,
99+
emptyStreamContent,
100+
isNullStream);
101+
case ScalarKind::String:
102+
case ScalarKind::Binary:
103+
return encodeStreamTyped<std::string_view>(
104+
streamData,
105+
maxChunkSize,
106+
minChunkSize,
107+
emptyStreamContent,
108+
isNullStream);
109+
default:
110+
NIMBLE_UNREACHABLE(
111+
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
112+
}
113+
}
114+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)