Skip to content

Commit 87430bc

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Chunking in StreamData (facebookincubator#248)
Summary: Another feature in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1) is the ability to split large streams above a specified limit into smaller chunks. In this diff, we implement a `popChunk` method in each `StreamData` class to handle this functionality. With this feature we are not forced to encode extremely large streams into a single chunk. Integration will happen in the next diff. Differential Revision: D81824143
1 parent f6c51c6 commit 87430bc

File tree

6 files changed

+1151
-1
lines changed

6 files changed

+1151
-1
lines changed

dwio/nimble/common/Vector.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,16 @@ class Vector {
182182
}
183183
}
184184

185+
// Removes the first |count| elements from the vector.
186+
// Does NOT shrink capacity to fit the new size.
187+
void trimLeft(uint64_t count) {
188+
NIMBLE_DASSERT(
189+
count <= size_, "Cannot trim more elements than exist in vector");
190+
const uint64_t remainingSize = size_ - count;
191+
std::move(begin() + count, end(), begin());
192+
resize(remainingSize);
193+
}
194+
185195
// Add |copies| copies of |value| to the end of the vector.
186196
void extend(uint64_t copies, T value) {
187197
reserve(size_ + copies);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "dwio/nimble/velox/StreamChunker.h"
18+
19+
namespace facebook::nimble {
20+
template <typename T>
21+
std::unique_ptr<StreamChunker> encodeStreamTyped(
22+
StreamData& streamData,
23+
uint64_t maxChunkSize,
24+
uint64_t minChunkSize,
25+
bool emptyStreamContent,
26+
bool isNullStream) {
27+
const auto& streamDataType = streamData.type();
28+
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
29+
return std::make_unique<ContentStreamChunker<T>>(
30+
static_cast<ContentStreamData<T>&>(streamData),
31+
maxChunkSize,
32+
minChunkSize);
33+
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
34+
return std::make_unique<NullsStreamChunker>(
35+
static_cast<NullsStreamData&>(streamData),
36+
maxChunkSize,
37+
minChunkSize,
38+
emptyStreamContent);
39+
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
40+
return std::make_unique<NullableContentStreamChunker<T>>(
41+
static_cast<NullableContentStreamData<T>&>(streamData),
42+
maxChunkSize,
43+
minChunkSize,
44+
emptyStreamContent,
45+
isNullStream);
46+
}
47+
NIMBLE_UNREACHABLE(
48+
fmt::format("Unsupported streamData type {}", streamDataType))
49+
}
50+
51+
std::unique_ptr<StreamChunker> getStreamChunker(
52+
StreamData& streamData,
53+
uint64_t maxChunkSize,
54+
uint64_t minChunkSize,
55+
bool emptyStreamContent,
56+
bool isNullStream) {
57+
const auto scalarKind = streamData.descriptor().scalarKind();
58+
switch (scalarKind) {
59+
case ScalarKind::Bool:
60+
return encodeStreamTyped<bool>(
61+
streamData,
62+
maxChunkSize,
63+
minChunkSize,
64+
emptyStreamContent,
65+
isNullStream);
66+
case ScalarKind::Int8:
67+
return encodeStreamTyped<int8_t>(
68+
streamData,
69+
maxChunkSize,
70+
minChunkSize,
71+
emptyStreamContent,
72+
isNullStream);
73+
case ScalarKind::Int16:
74+
return encodeStreamTyped<int16_t>(
75+
streamData,
76+
maxChunkSize,
77+
minChunkSize,
78+
emptyStreamContent,
79+
isNullStream);
80+
case ScalarKind::Int32:
81+
return encodeStreamTyped<int32_t>(
82+
streamData,
83+
maxChunkSize,
84+
minChunkSize,
85+
emptyStreamContent,
86+
isNullStream);
87+
case ScalarKind::UInt32:
88+
return encodeStreamTyped<uint32_t>(
89+
streamData,
90+
maxChunkSize,
91+
minChunkSize,
92+
emptyStreamContent,
93+
isNullStream);
94+
case ScalarKind::Int64:
95+
return encodeStreamTyped<int64_t>(
96+
streamData,
97+
maxChunkSize,
98+
minChunkSize,
99+
emptyStreamContent,
100+
isNullStream);
101+
case ScalarKind::Float:
102+
return encodeStreamTyped<float>(
103+
streamData,
104+
maxChunkSize,
105+
minChunkSize,
106+
emptyStreamContent,
107+
isNullStream);
108+
case ScalarKind::Double:
109+
return encodeStreamTyped<double>(
110+
streamData,
111+
maxChunkSize,
112+
minChunkSize,
113+
emptyStreamContent,
114+
isNullStream);
115+
case ScalarKind::String:
116+
case ScalarKind::Binary:
117+
return encodeStreamTyped<std::string_view>(
118+
streamData,
119+
maxChunkSize,
120+
minChunkSize,
121+
emptyStreamContent,
122+
isNullStream);
123+
default:
124+
NIMBLE_UNREACHABLE(
125+
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
126+
}
127+
}
128+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)