Skip to content

Commit 3832c80

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Stream Data (facebookincubator#391)
Summary: Adds per-stream string buffer support to Nimble's StreamData infrastructure. Previously, string data buffers were managed externally, but now each MutableStreamData instance can own its own Buffer for string content, enabling better memory isolation and lifecycle management with chunking. Differential Revision: D89933055
1 parent b3b68ad commit 3832c80

File tree

3 files changed

+212
-89
lines changed

3 files changed

+212
-89
lines changed

dwio/nimble/velox/StreamChunker.h

Lines changed: 79 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,28 @@ inline bool shouldOmitNullStream(
4141
}
4242
return isFirstChunk || streamData.empty();
4343
}
44+
45+
template <typename T>
46+
inline void compactStringBuffer(
47+
velox::memory::MemoryPool* pool,
48+
StringBuffer& stringBuffer,
49+
Vector<T>& mutableData,
50+
size_t dataElementOffset,
51+
uint64_t extraMemory) {
52+
StringBuffer newStringBuffer(*pool);
53+
auto& newBuffer = newStringBuffer.mutableBuffer();
54+
newBuffer.reserve(extraMemory);
55+
56+
auto& newLengths = newStringBuffer.mutableLengths();
57+
newLengths.reserve(mutableData.size() - dataElementOffset);
58+
59+
for (auto i = dataElementOffset; i < mutableData.size(); ++i) {
60+
auto str = mutableData[i];
61+
newBuffer.insert(newBuffer.end(), str.begin(), str.end());
62+
newLengths.push_back(str.size());
63+
}
64+
stringBuffer = std::move(newStringBuffer);
65+
}
4466
} // namespace detail
4567

4668
/**
@@ -106,6 +128,9 @@ class ContentStreamChunker final : public StreamChunker {
106128
maxChunkSize_,
107129
sizeof(T),
108130
"MaxChunkSize must be at least the size of a single data element.");
131+
if constexpr (std::is_same_v<T, std::string_view>) {
132+
streamData_->materializeStringBuffer();
133+
}
109134
}
110135

111136
std::optional<StreamDataView> next() override {
@@ -194,27 +219,36 @@ class ContentStreamChunker final : public StreamChunker {
194219

195220
// Move and clear existing buffer
196221
auto tempData = std::move(currentData);
222+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
197223
streamData_->reset();
198224
NIMBLE_DCHECK(
199225
streamData_->empty(), "StreamData should be empty after reset");
200226

201227
auto& mutableData = streamData_->mutableData();
202-
mutableData.reserve(remainingDataCount);
203-
NIMBLE_DCHECK_GE(
204-
mutableData.capacity(),
205-
remainingDataCount,
206-
"Data buffer capacity should be at least new capacity");
207-
208-
mutableData.resize(remainingDataCount);
209-
NIMBLE_DCHECK_EQ(
210-
mutableData.size(),
211-
remainingDataCount,
212-
"Data buffer size should be equal to remaining data count");
213-
214-
std::copy_n(
215-
tempData.begin() + dataElementOffset_,
216-
remainingDataCount,
217-
mutableData.begin());
228+
229+
bool compactedStringBuffer{false};
230+
if constexpr (std::is_same_v<T, std::string_view>) {
231+
if (!tempBuffer.empty()) {
232+
detail::compactStringBuffer<T>(
233+
mutableData.memoryPool(),
234+
streamData_->mutableStringBuffer(),
235+
tempData,
236+
dataElementOffset_,
237+
extraMemory_);
238+
compactedStringBuffer = true;
239+
}
240+
}
241+
if (!compactedStringBuffer) {
242+
mutableData.resize(remainingDataCount);
243+
NIMBLE_DCHECK_EQ(
244+
mutableData.size(),
245+
remainingDataCount,
246+
"Data buffer size should be equal to remaining data count");
247+
std::copy_n(
248+
tempData.begin() + dataElementOffset_,
249+
remainingDataCount,
250+
mutableData.begin());
251+
}
218252
dataElementOffset_ = 0;
219253
streamData_->extraMemory() = extraMemory_;
220254

@@ -314,16 +348,10 @@ class NullsStreamChunker final : public StreamChunker {
314348
NIMBLE_CHECK(
315349
streamData_->empty(), "StreamData should be empty after reset");
316350

317-
auto& mutableNonNulls = streamData_->mutableNonNulls();
318-
mutableNonNulls.reserve(remainingNonNullsCount);
319-
NIMBLE_DCHECK_GE(
320-
mutableNonNulls.capacity(),
321-
remainingNonNullsCount,
322-
"NonNulls buffer capacity should be at least new capacity");
323-
324351
streamData_->ensureAdditionalNullsCapacity(
325352
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
326353
if (hasNulls) {
354+
auto& mutableNonNulls = streamData_->mutableNonNulls();
327355
mutableNonNulls.resize(remainingNonNullsCount);
328356
NIMBLE_CHECK_EQ(
329357
mutableNonNulls.size(),
@@ -372,6 +400,9 @@ class NullableContentStreamChunker final : public StreamChunker {
372400
"MaxChunkSize must be at least the size of a single element.");
373401

374402
streamData.materialize();
403+
if constexpr (std::is_same_v<T, std::string_view>) {
404+
streamData_->materializeStringBuffer();
405+
}
375406
}
376407

377408
std::optional<StreamDataView> next() override {
@@ -461,39 +492,41 @@ class NullableContentStreamChunker final : public StreamChunker {
461492
// Move and clear existing buffers
462493
auto tempNonNulls = std::move(currentNonNulls);
463494
auto tempData = std::move(currentData);
495+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
464496
const bool hasNulls = streamData_->hasNulls();
465497
streamData_->reset();
466498
NIMBLE_CHECK(
467499
streamData_->empty(), "StreamData should be empty after reset");
468500

469501
auto& mutableData = streamData_->mutableData();
470-
mutableData.reserve(remainingDataCount);
471-
NIMBLE_DCHECK_GE(
472-
mutableData.capacity(),
473-
remainingDataCount,
474-
"Data buffer capacity should be at least new capacity");
475-
476-
mutableData.resize(remainingDataCount);
477-
NIMBLE_CHECK_EQ(
478-
mutableData.size(),
479-
remainingDataCount,
480-
"Data buffer size should be equal to remaining data count");
481-
482-
std::copy_n(
483-
tempData.begin() + dataElementOffset_,
484-
remainingDataCount,
485-
mutableData.begin());
486-
487-
auto& mutableNonNulls = streamData_->mutableNonNulls();
488-
mutableNonNulls.reserve(remainingNonNullsCount);
489-
NIMBLE_DCHECK_GE(
490-
mutableNonNulls.capacity(),
491-
remainingNonNullsCount,
492-
"NonNulls buffer capacity should be at least new capacity");
502+
bool compactedStringBuffer{false};
503+
if constexpr (std::is_same_v<T, std::string_view>) {
504+
if (!tempBuffer.empty()) {
505+
detail::compactStringBuffer<T>(
506+
mutableData.memoryPool(),
507+
streamData_->mutableStringBuffer(),
508+
tempData,
509+
dataElementOffset_,
510+
extraMemory_);
511+
compactedStringBuffer = true;
512+
}
513+
}
514+
if (!compactedStringBuffer) {
515+
mutableData.resize(remainingDataCount);
516+
NIMBLE_DCHECK_EQ(
517+
mutableData.size(),
518+
remainingDataCount,
519+
"Data buffer size should be equal to remaining data count");
520+
std::copy_n(
521+
tempData.begin() + dataElementOffset_,
522+
remainingDataCount,
523+
mutableData.begin());
524+
}
493525

494526
streamData_->ensureAdditionalNullsCapacity(
495527
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
496528
if (hasNulls) {
529+
auto& mutableNonNulls = streamData_->mutableNonNulls();
497530
mutableNonNulls.resize(remainingNonNullsCount);
498531
NIMBLE_CHECK_EQ(
499532
mutableNonNulls.size(),

dwio/nimble/velox/StreamData.h

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class StreamData {
5454

5555
virtual void materialize() {}
5656

57+
virtual void materializeStringBuffer() {}
58+
5759
const StreamDescriptorBuilder& descriptor() const {
5860
return descriptor_;
5961
}
@@ -84,6 +86,50 @@ class MutableStreamData : public StreamData {
8486
const InputBufferGrowthPolicy& growthPolicy_;
8587
};
8688

89+
class StringBuffer {
90+
public:
91+
explicit StringBuffer(velox::memory::MemoryPool& pool)
92+
: buffer{&pool}, lengths{&pool} {}
93+
94+
Vector<char>& mutableBuffer() {
95+
return buffer;
96+
}
97+
98+
Vector<size_t>& mutableLengths() {
99+
return lengths;
100+
}
101+
102+
bool empty() const {
103+
return lengths.empty();
104+
}
105+
106+
uint64_t size() const {
107+
return lengths.size();
108+
}
109+
110+
void clear() {
111+
buffer.clear();
112+
lengths.clear();
113+
}
114+
115+
void materialize(Vector<std::string_view>& data) {
116+
if (empty()) {
117+
return;
118+
}
119+
data.clear();
120+
data.resize(lengths.size());
121+
size_t runningOffset = 0;
122+
for (size_t i = 0; i < lengths.size(); ++i) {
123+
data[i] = std::string_view(buffer.data() + runningOffset, lengths[i]);
124+
runningOffset += lengths[i];
125+
}
126+
}
127+
128+
private:
129+
Vector<char> buffer;
130+
Vector<size_t> lengths;
131+
};
132+
87133
/// Provides a lightweight, non-owning view into a portion of stream data,
88134
/// containing references to the data content and null indicators for efficient
89135
/// processing of large streams without copying data.
@@ -156,7 +202,9 @@ class ContentStreamData final : public MutableStreamData {
156202
velox::memory::MemoryPool& pool,
157203
const StreamDescriptorBuilder& descriptor,
158204
const InputBufferGrowthPolicy& growthPolicy)
159-
: MutableStreamData(descriptor, growthPolicy), data_{&pool} {}
205+
: MutableStreamData(descriptor, growthPolicy),
206+
data_{&pool},
207+
stringBuffer_{pool} {}
160208

161209
inline uint32_t rowCount() const override {
162210
return data_.size();
@@ -180,13 +228,18 @@ class ContentStreamData final : public MutableStreamData {
180228
}
181229

182230
inline uint64_t memoryUsed() const override {
183-
return (data_.size() * sizeof(T)) + extraMemory_;
231+
return (data_.size() * sizeof(T)) + extraMemory_ +
232+
(stringBuffer_.size() * sizeof(size_t));
184233
}
185234

186235
inline Vector<T>& mutableData() {
187236
return data_;
188237
}
189238

239+
inline StringBuffer& mutableStringBuffer() {
240+
return stringBuffer_;
241+
}
242+
190243
void ensureMutableDataCapacity(uint64_t newSize) {
191244
this->ensureDataCapacity(data_, newSize);
192245
}
@@ -197,11 +250,19 @@ class ContentStreamData final : public MutableStreamData {
197250

198251
inline virtual void reset() override {
199252
data_.clear();
253+
stringBuffer_.clear();
200254
extraMemory_ = 0;
201255
}
202256

257+
void materializeStringBuffer() override {
258+
if constexpr (std::is_same_v<T, std::string_view>) {
259+
stringBuffer_.materialize(data_);
260+
}
261+
}
262+
203263
private:
204264
Vector<T> data_;
265+
StringBuffer stringBuffer_;
205266
uint64_t extraMemory_{0};
206267
};
207268

@@ -286,6 +347,7 @@ class NullableContentStreamData final : public NullsStreamData {
286347
const InputBufferGrowthPolicy& growthPolicy)
287348
: NullsStreamData(memoryPool, descriptor, growthPolicy),
288349
data_{&memoryPool},
350+
stringBuffer_{memoryPool},
289351
extraMemory_{0} {}
290352

291353
inline std::string_view data() const override {
@@ -298,14 +360,19 @@ class NullableContentStreamData final : public NullsStreamData {
298360
}
299361

300362
inline uint64_t memoryUsed() const override {
301-
return (data_.size() * sizeof(T)) + extraMemory_ +
302-
NullsStreamData::memoryUsed();
363+
return (data_.size() * sizeof(T)) +
364+
(stringBuffer_.size() * sizeof(size_t)) +
365+
NullsStreamData::memoryUsed() + extraMemory_;
303366
}
304367

305368
inline Vector<T>& mutableData() {
306369
return data_;
307370
}
308371

372+
inline StringBuffer& mutableStringBuffer() {
373+
return stringBuffer_;
374+
}
375+
309376
void ensureMutableDataCapacity(uint64_t newSize) {
310377
this->ensureDataCapacity(data_, newSize);
311378
}
@@ -317,11 +384,19 @@ class NullableContentStreamData final : public NullsStreamData {
317384
inline void reset() override {
318385
NullsStreamData::reset();
319386
data_.clear();
387+
stringBuffer_.clear();
320388
extraMemory_ = 0;
321389
}
322390

391+
void materializeStringBuffer() override {
392+
if constexpr (std::is_same_v<T, std::string_view>) {
393+
stringBuffer_.materialize(data_);
394+
}
395+
}
396+
323397
private:
324398
Vector<T> data_;
399+
StringBuffer stringBuffer_;
325400
uint64_t extraMemory_;
326401
};
327402

0 commit comments

Comments
 (0)