Skip to content

Commit 1402239

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Chunking in StreamData (facebookincubator#248)
Summary: Another feature in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1) is the ability to split large streams above a specified limit into smaller chunks. In this diff, we implement a `popChunk` method in each `StreamData` class to handle this functionality. With this feature we are not forced to encode extremely large streams into a single chunk. Integration will happen in the next diff. Differential Revision: D81824143
1 parent c4dc1a6 commit 1402239

File tree

5 files changed

+1228
-1
lines changed

5 files changed

+1228
-1
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "dwio/nimble/velox/StreamChunker.h"
18+
19+
namespace facebook::nimble {
20+
bool StreamChunker::omitStream(
21+
const StreamData& streamData,
22+
uint64_t minChunkSize,
23+
bool isNullStream,
24+
bool hasEmptyStreamContent) {
25+
bool shouldChunkStream;
26+
if (isNullStream) {
27+
// When all values are non-nulls, we omit the entire null stream.
28+
shouldChunkStream =
29+
streamData.hasNulls() && streamData.nonNulls().size() > minChunkSize;
30+
if (!shouldChunkStream && !hasEmptyStreamContent) {
31+
shouldChunkStream = (minChunkSize == 0) && !streamData.empty();
32+
}
33+
} else {
34+
// When all values are null, the values stream is omitted.
35+
shouldChunkStream = streamData.data().size() > minChunkSize;
36+
if (!shouldChunkStream && !hasEmptyStreamContent) {
37+
shouldChunkStream = (minChunkSize == 0) && !streamData.nonNulls().empty();
38+
}
39+
}
40+
41+
return !shouldChunkStream;
42+
}
43+
44+
template <typename T>
45+
std::unique_ptr<StreamChunker> getStreamChunkerTyped(
46+
StreamData& streamData,
47+
uint64_t maxChunkSize,
48+
uint64_t minChunkSize,
49+
bool emptyStreamContent,
50+
bool isNullStream) {
51+
const auto& streamDataType = streamData.type();
52+
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
53+
return std::make_unique<ContentStreamChunker<T>>(
54+
static_cast<ContentStreamData<T>&>(streamData),
55+
maxChunkSize,
56+
minChunkSize);
57+
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
58+
return std::make_unique<NullsStreamChunker>(
59+
static_cast<NullsStreamData&>(streamData),
60+
maxChunkSize,
61+
minChunkSize,
62+
emptyStreamContent);
63+
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
64+
return std::make_unique<NullableContentStreamChunker<T>>(
65+
static_cast<NullableContentStreamData<T>&>(streamData),
66+
maxChunkSize,
67+
minChunkSize,
68+
emptyStreamContent,
69+
isNullStream);
70+
}
71+
NIMBLE_UNREACHABLE(
72+
fmt::format("Unsupported streamData type {}", streamDataType))
73+
}
74+
75+
std::unique_ptr<StreamChunker> StreamChunker::getStreamChunker(
76+
StreamData& streamData,
77+
uint64_t maxChunkSize,
78+
uint64_t minChunkSize,
79+
bool emptyStreamContent,
80+
bool isNullStream) {
81+
const auto scalarKind = streamData.descriptor().scalarKind();
82+
switch (scalarKind) {
83+
#define HANDLE_SCALAR_KIND(kind, type) \
84+
case ScalarKind::kind: \
85+
return getStreamChunkerTyped<type>( \
86+
streamData, \
87+
maxChunkSize, \
88+
minChunkSize, \
89+
emptyStreamContent, \
90+
isNullStream);
91+
HANDLE_SCALAR_KIND(Bool, bool);
92+
HANDLE_SCALAR_KIND(Int8, int8_t);
93+
HANDLE_SCALAR_KIND(Int16, int16_t);
94+
HANDLE_SCALAR_KIND(Int32, int32_t);
95+
HANDLE_SCALAR_KIND(UInt32, uint32_t);
96+
HANDLE_SCALAR_KIND(Int64, int64_t);
97+
HANDLE_SCALAR_KIND(Float, float);
98+
HANDLE_SCALAR_KIND(Double, double);
99+
HANDLE_SCALAR_KIND(String, std::string_view);
100+
HANDLE_SCALAR_KIND(Binary, std::string_view);
101+
default:
102+
NIMBLE_UNREACHABLE(
103+
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
104+
}
105+
}
106+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)