Skip to content

Commit 26dbd3e

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Stream Data (facebookincubator#391)
Summary: Adds per-stream string buffer support to Nimble's StreamData infrastructure. Previously, string data buffers were managed externally, but now each MutableStreamData instance can own its own Buffer for string content, enabling better memory isolation and lifecycle management with chunking. Differential Revision: D89933055
1 parent b3b68ad commit 26dbd3e

File tree

3 files changed

+221
-89
lines changed

3 files changed

+221
-89
lines changed

dwio/nimble/velox/StreamChunker.h

Lines changed: 79 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,28 @@ inline bool shouldOmitNullStream(
4141
}
4242
return isFirstChunk || streamData.empty();
4343
}
44+
45+
template <typename T>
46+
inline void compactStringBuffer(
47+
velox::memory::MemoryPool* pool,
48+
StringBuffer& stringBuffer,
49+
Vector<T>& mutableData,
50+
uint32_t dataElementOffset,
51+
uint64_t extraMemory) {
52+
StringBuffer newStringBuffer(*pool);
53+
auto& newBuffer = newStringBuffer.mutableBuffer();
54+
newBuffer.reserve(extraMemory);
55+
56+
auto& newLengths = newStringBuffer.mutableLengths();
57+
newLengths.reserve(mutableData.size() - dataElementOffset);
58+
59+
for (auto i = dataElementOffset; i < mutableData.size(); ++i) {
60+
auto str = mutableData[i];
61+
newBuffer.insert(newBuffer.end(), str.begin(), str.end());
62+
newLengths.push_back(str.size());
63+
}
64+
stringBuffer = std::move(newStringBuffer);
65+
}
4466
} // namespace detail
4567

4668
/**
@@ -106,6 +128,9 @@ class ContentStreamChunker final : public StreamChunker {
106128
maxChunkSize_,
107129
sizeof(T),
108130
"MaxChunkSize must be at least the size of a single data element.");
131+
if constexpr (std::is_same_v<T, std::string_view>) {
132+
streamData_->materializeStringViews();
133+
}
109134
}
110135

111136
std::optional<StreamDataView> next() override {
@@ -194,27 +219,36 @@ class ContentStreamChunker final : public StreamChunker {
194219

195220
// Move and clear existing buffer
196221
auto tempData = std::move(currentData);
222+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
197223
streamData_->reset();
198224
NIMBLE_DCHECK(
199225
streamData_->empty(), "StreamData should be empty after reset");
200226

201227
auto& mutableData = streamData_->mutableData();
202-
mutableData.reserve(remainingDataCount);
203-
NIMBLE_DCHECK_GE(
204-
mutableData.capacity(),
205-
remainingDataCount,
206-
"Data buffer capacity should be at least new capacity");
207-
208-
mutableData.resize(remainingDataCount);
209-
NIMBLE_DCHECK_EQ(
210-
mutableData.size(),
211-
remainingDataCount,
212-
"Data buffer size should be equal to remaining data count");
213-
214-
std::copy_n(
215-
tempData.begin() + dataElementOffset_,
216-
remainingDataCount,
217-
mutableData.begin());
228+
229+
bool compactedStringBuffer{false};
230+
if constexpr (std::is_same_v<T, std::string_view>) {
231+
if (!tempBuffer.empty()) {
232+
detail::compactStringBuffer<T>(
233+
mutableData.memoryPool(),
234+
streamData_->mutableStringBuffer(),
235+
tempData,
236+
dataElementOffset_,
237+
extraMemory_);
238+
compactedStringBuffer = true;
239+
}
240+
}
241+
if (!compactedStringBuffer) {
242+
mutableData.resize(remainingDataCount);
243+
NIMBLE_DCHECK_EQ(
244+
mutableData.size(),
245+
remainingDataCount,
246+
"Data buffer size should be equal to remaining data count");
247+
std::copy_n(
248+
tempData.begin() + dataElementOffset_,
249+
remainingDataCount,
250+
mutableData.begin());
251+
}
218252
dataElementOffset_ = 0;
219253
streamData_->extraMemory() = extraMemory_;
220254

@@ -314,16 +348,10 @@ class NullsStreamChunker final : public StreamChunker {
314348
NIMBLE_CHECK(
315349
streamData_->empty(), "StreamData should be empty after reset");
316350

317-
auto& mutableNonNulls = streamData_->mutableNonNulls();
318-
mutableNonNulls.reserve(remainingNonNullsCount);
319-
NIMBLE_DCHECK_GE(
320-
mutableNonNulls.capacity(),
321-
remainingNonNullsCount,
322-
"NonNulls buffer capacity should be at least new capacity");
323-
324351
streamData_->ensureAdditionalNullsCapacity(
325352
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
326353
if (hasNulls) {
354+
auto& mutableNonNulls = streamData_->mutableNonNulls();
327355
mutableNonNulls.resize(remainingNonNullsCount);
328356
NIMBLE_CHECK_EQ(
329357
mutableNonNulls.size(),
@@ -372,6 +400,9 @@ class NullableContentStreamChunker final : public StreamChunker {
372400
"MaxChunkSize must be at least the size of a single element.");
373401

374402
streamData.materialize();
403+
if constexpr (std::is_same_v<T, std::string_view>) {
404+
streamData_->materializeStringViews();
405+
}
375406
}
376407

377408
std::optional<StreamDataView> next() override {
@@ -461,39 +492,41 @@ class NullableContentStreamChunker final : public StreamChunker {
461492
// Move and clear existing buffers
462493
auto tempNonNulls = std::move(currentNonNulls);
463494
auto tempData = std::move(currentData);
495+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
464496
const bool hasNulls = streamData_->hasNulls();
465497
streamData_->reset();
466498
NIMBLE_CHECK(
467499
streamData_->empty(), "StreamData should be empty after reset");
468500

469501
auto& mutableData = streamData_->mutableData();
470-
mutableData.reserve(remainingDataCount);
471-
NIMBLE_DCHECK_GE(
472-
mutableData.capacity(),
473-
remainingDataCount,
474-
"Data buffer capacity should be at least new capacity");
475-
476-
mutableData.resize(remainingDataCount);
477-
NIMBLE_CHECK_EQ(
478-
mutableData.size(),
479-
remainingDataCount,
480-
"Data buffer size should be equal to remaining data count");
481-
482-
std::copy_n(
483-
tempData.begin() + dataElementOffset_,
484-
remainingDataCount,
485-
mutableData.begin());
486-
487-
auto& mutableNonNulls = streamData_->mutableNonNulls();
488-
mutableNonNulls.reserve(remainingNonNullsCount);
489-
NIMBLE_DCHECK_GE(
490-
mutableNonNulls.capacity(),
491-
remainingNonNullsCount,
492-
"NonNulls buffer capacity should be at least new capacity");
502+
bool compactedStringBuffer{false};
503+
if constexpr (std::is_same_v<T, std::string_view>) {
504+
if (!tempBuffer.empty()) {
505+
detail::compactStringBuffer<T>(
506+
mutableData.memoryPool(),
507+
streamData_->mutableStringBuffer(),
508+
tempData,
509+
dataElementOffset_,
510+
extraMemory_);
511+
compactedStringBuffer = true;
512+
}
513+
}
514+
if (!compactedStringBuffer) {
515+
mutableData.resize(remainingDataCount);
516+
NIMBLE_DCHECK_EQ(
517+
mutableData.size(),
518+
remainingDataCount,
519+
"Data buffer size should be equal to remaining data count");
520+
std::copy_n(
521+
tempData.begin() + dataElementOffset_,
522+
remainingDataCount,
523+
mutableData.begin());
524+
}
493525

494526
streamData_->ensureAdditionalNullsCapacity(
495527
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
496528
if (hasNulls) {
529+
auto& mutableNonNulls = streamData_->mutableNonNulls();
497530
mutableNonNulls.resize(remainingNonNullsCount);
498531
NIMBLE_CHECK_EQ(
499532
mutableNonNulls.size(),

dwio/nimble/velox/StreamData.h

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,41 @@ class MutableStreamData : public StreamData {
8484
const InputBufferGrowthPolicy& growthPolicy_;
8585
};
8686

87+
class StringBuffer {
88+
public:
89+
StringBuffer(velox::memory::MemoryPool& pool)
90+
: buffer{&pool}, lengths{&pool} {}
91+
92+
Vector<char>& mutableBuffer() {
93+
return buffer;
94+
}
95+
96+
Vector<size_t>& mutableLengths() {
97+
return lengths;
98+
}
99+
100+
bool empty() const {
101+
return lengths.empty();
102+
}
103+
104+
uint64_t size() const {
105+
return lengths.size();
106+
}
107+
108+
uint64_t memoryUsed() const {
109+
return sizeof(size_t) * lengths.size() + buffer.size();
110+
}
111+
112+
void clear() {
113+
buffer.clear();
114+
lengths.clear();
115+
}
116+
117+
private:
118+
Vector<char> buffer;
119+
Vector<size_t> lengths;
120+
};
121+
87122
/// Provides a lightweight, non-owning view into a portion of stream data,
88123
/// containing references to the data content and null indicators for efficient
89124
/// processing of large streams without copying data.
@@ -156,7 +191,9 @@ class ContentStreamData final : public MutableStreamData {
156191
velox::memory::MemoryPool& pool,
157192
const StreamDescriptorBuilder& descriptor,
158193
const InputBufferGrowthPolicy& growthPolicy)
159-
: MutableStreamData(descriptor, growthPolicy), data_{&pool} {}
194+
: MutableStreamData(descriptor, growthPolicy),
195+
data_{&pool},
196+
stringBuffer_{pool} {}
160197

161198
inline uint32_t rowCount() const override {
162199
return data_.size();
@@ -180,13 +217,18 @@ class ContentStreamData final : public MutableStreamData {
180217
}
181218

182219
inline uint64_t memoryUsed() const override {
183-
return (data_.size() * sizeof(T)) + extraMemory_;
220+
return (data_.size() * sizeof(T)) + extraMemory_ +
221+
(stringBuffer_.size() * sizeof(size_t));
184222
}
185223

186224
inline Vector<T>& mutableData() {
187225
return data_;
188226
}
189227

228+
inline StringBuffer& mutableStringBuffer() {
229+
return stringBuffer_;
230+
}
231+
190232
void ensureMutableDataCapacity(uint64_t newSize) {
191233
this->ensureDataCapacity(data_, newSize);
192234
}
@@ -197,11 +239,29 @@ class ContentStreamData final : public MutableStreamData {
197239

198240
inline virtual void reset() override {
199241
data_.clear();
242+
stringBuffer_.clear();
200243
extraMemory_ = 0;
201244
}
202245

246+
void materializeStringViews() {
247+
if (stringBuffer_.empty()) {
248+
return;
249+
}
250+
const auto& buffer = stringBuffer_.mutableBuffer();
251+
const auto& lengths = stringBuffer_.mutableLengths();
252+
size_t runningOffset = 0;
253+
data_.resize(lengths.size());
254+
for (size_t i = 0; i < lengths.size(); ++i) {
255+
data_[i] = std::string_view(
256+
reinterpret_cast<const char*>(buffer.data() + runningOffset),
257+
lengths[i]);
258+
runningOffset += lengths[i];
259+
}
260+
}
261+
203262
private:
204263
Vector<T> data_;
264+
StringBuffer stringBuffer_;
205265
uint64_t extraMemory_{0};
206266
};
207267

@@ -286,6 +346,7 @@ class NullableContentStreamData final : public NullsStreamData {
286346
const InputBufferGrowthPolicy& growthPolicy)
287347
: NullsStreamData(memoryPool, descriptor, growthPolicy),
288348
data_{&memoryPool},
349+
stringBuffer_{memoryPool},
289350
extraMemory_{0} {}
290351

291352
inline std::string_view data() const override {
@@ -298,14 +359,19 @@ class NullableContentStreamData final : public NullsStreamData {
298359
}
299360

300361
inline uint64_t memoryUsed() const override {
301-
return (data_.size() * sizeof(T)) + extraMemory_ +
302-
NullsStreamData::memoryUsed();
362+
return (data_.size() * sizeof(T)) +
363+
(stringBuffer_.size() * sizeof(size_t)) +
364+
NullsStreamData::memoryUsed() + extraMemory_;
303365
}
304366

305367
inline Vector<T>& mutableData() {
306368
return data_;
307369
}
308370

371+
inline StringBuffer& mutableStringBuffer() {
372+
return stringBuffer_;
373+
}
374+
309375
void ensureMutableDataCapacity(uint64_t newSize) {
310376
this->ensureDataCapacity(data_, newSize);
311377
}
@@ -317,11 +383,29 @@ class NullableContentStreamData final : public NullsStreamData {
317383
inline void reset() override {
318384
NullsStreamData::reset();
319385
data_.clear();
386+
stringBuffer_.clear();
320387
extraMemory_ = 0;
321388
}
322389

390+
void materializeStringViews() {
391+
if (stringBuffer_.empty()) {
392+
return;
393+
}
394+
const auto& buffer = stringBuffer_.mutableBuffer();
395+
const auto& lengths = stringBuffer_.mutableLengths();
396+
size_t runningOffset = 0;
397+
data_.resize(lengths.size());
398+
for (size_t i = 0; i < lengths.size(); ++i) {
399+
data_[i] = std::string_view(
400+
reinterpret_cast<const char*>(buffer.data() + runningOffset),
401+
lengths[i]);
402+
runningOffset += lengths[i];
403+
}
404+
}
405+
323406
private:
324407
Vector<T> data_;
408+
StringBuffer stringBuffer_;
325409
uint64_t extraMemory_;
326410
};
327411

0 commit comments

Comments
 (0)