Skip to content

Commit 2aa9df8

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Stream Data (facebookincubator#391)
Summary: Adds per-stream string buffer support to Nimble's StreamData infrastructure. Previously, string data buffers were managed externally, but now each MutableStreamData instance can own its own Buffer for string content, enabling better memory isolation and lifecycle management with chunking. Differential Revision: D89933055
1 parent 18a98c5 commit 2aa9df8

File tree

4 files changed

+205
-106
lines changed

4 files changed

+205
-106
lines changed

dwio/nimble/velox/StreamChunker.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ template <typename T>
2121
std::unique_ptr<StreamChunker> getStreamChunkerTyped(
2222
StreamData& streamData,
2323
const StreamChunkerOptions& options) {
24+
streamData.materializeStrings();
2425
if (auto* contentStreamChunker =
2526
dynamic_cast<ContentStreamData<T>*>(&streamData)) {
2627
return std::make_unique<ContentStreamChunker<T>>(

dwio/nimble/velox/StreamChunker.h

Lines changed: 76 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,28 @@ inline bool shouldOmitNullStream(
4141
}
4242
return isFirstChunk || streamData.empty();
4343
}
44+
45+
template <typename T>
46+
inline void compactStringBuffer(
47+
velox::memory::MemoryPool* pool,
48+
StringBuffer& stringBuffer,
49+
Vector<T>& mutableData,
50+
size_t dataElementOffset,
51+
uint64_t extraMemory) {
52+
StringBuffer newStringBuffer(*pool);
53+
auto& newBuffer = newStringBuffer.mutableBuffer();
54+
newBuffer.reserve(extraMemory);
55+
56+
auto& newLengths = newStringBuffer.mutableLengths();
57+
newLengths.reserve(mutableData.size() - dataElementOffset);
58+
59+
for (auto i = dataElementOffset; i < mutableData.size(); ++i) {
60+
auto str = mutableData[i];
61+
newBuffer.insert(newBuffer.end(), str.begin(), str.end());
62+
newLengths.push_back(str.size());
63+
}
64+
stringBuffer = std::move(newStringBuffer);
65+
}
4466
} // namespace detail
4567

4668
/**
@@ -94,6 +116,7 @@ class ContentStreamChunker final : public StreamChunker {
94116
minChunkSize_{options.minChunkSize},
95117
maxChunkSize_{options.maxChunkSize},
96118
ensureFullChunks_{options.ensureFullChunks},
119+
useStreamStringBuffer_{!streamData_->mutableStringBuffer().empty()},
97120
extraMemory_{streamData_->extraMemory()} {
98121
static_assert(
99122
std::is_same_v<StreamDataT, ContentStreamData<T>> ||
@@ -194,27 +217,36 @@ class ContentStreamChunker final : public StreamChunker {
194217

195218
// Move and clear existing buffer
196219
auto tempData = std::move(currentData);
220+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
197221
streamData_->reset();
198222
NIMBLE_DCHECK(
199223
streamData_->empty(), "StreamData should be empty after reset");
200224

201225
auto& mutableData = streamData_->mutableData();
202-
mutableData.reserve(remainingDataCount);
203-
NIMBLE_DCHECK_GE(
204-
mutableData.capacity(),
205-
remainingDataCount,
206-
"Data buffer capacity should be at least new capacity");
207-
208-
mutableData.resize(remainingDataCount);
209-
NIMBLE_DCHECK_EQ(
210-
mutableData.size(),
211-
remainingDataCount,
212-
"Data buffer size should be equal to remaining data count");
213-
214-
std::copy_n(
215-
tempData.begin() + dataElementOffset_,
216-
remainingDataCount,
217-
mutableData.begin());
226+
227+
if constexpr (std::is_same_v<T, std::string_view>) {
228+
if (useStreamStringBuffer_) {
229+
detail::compactStringBuffer<T>(
230+
mutableData.memoryPool(),
231+
streamData_->mutableStringBuffer(),
232+
tempData,
233+
dataElementOffset_,
234+
extraMemory_);
235+
}
236+
}
237+
238+
if (!useStreamStringBuffer_) {
239+
mutableData.resize(remainingDataCount);
240+
NIMBLE_DCHECK_EQ(
241+
mutableData.size(),
242+
remainingDataCount,
243+
"Data buffer size should be equal to remaining data count");
244+
245+
std::copy_n(
246+
tempData.begin() + dataElementOffset_,
247+
remainingDataCount,
248+
mutableData.begin());
249+
}
218250
dataElementOffset_ = 0;
219251
streamData_->extraMemory() = extraMemory_;
220252

@@ -230,6 +262,7 @@ class ContentStreamChunker final : public StreamChunker {
230262
const uint64_t minChunkSize_;
231263
const uint64_t maxChunkSize_;
232264
const bool ensureFullChunks_;
265+
const bool useStreamStringBuffer_;
233266

234267
size_t dataElementOffset_{0};
235268
size_t extraMemory_{0};
@@ -314,16 +347,10 @@ class NullsStreamChunker final : public StreamChunker {
314347
NIMBLE_CHECK(
315348
streamData_->empty(), "StreamData should be empty after reset");
316349

317-
auto& mutableNonNulls = streamData_->mutableNonNulls();
318-
mutableNonNulls.reserve(remainingNonNullsCount);
319-
NIMBLE_DCHECK_GE(
320-
mutableNonNulls.capacity(),
321-
remainingNonNullsCount,
322-
"NonNulls buffer capacity should be at least new capacity");
323-
324350
streamData_->ensureAdditionalNullsCapacity(
325351
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
326352
if (hasNulls) {
353+
auto& mutableNonNulls = streamData_->mutableNonNulls();
327354
mutableNonNulls.resize(remainingNonNullsCount);
328355
NIMBLE_CHECK_EQ(
329356
mutableNonNulls.size(),
@@ -361,6 +388,7 @@ class NullableContentStreamChunker final : public StreamChunker {
361388
options.minChunkSize,
362389
options.isFirstChunk)},
363390
ensureFullChunks_{options.ensureFullChunks},
391+
useStreamStringBuffer_{!streamData_->mutableStringBuffer().empty()},
364392
extraMemory_{streamData_->extraMemory()} {
365393
static_assert(sizeof(bool) == 1);
366394
NIMBLE_CHECK(
@@ -461,39 +489,40 @@ class NullableContentStreamChunker final : public StreamChunker {
461489
// Move and clear existing buffers
462490
auto tempNonNulls = std::move(currentNonNulls);
463491
auto tempData = std::move(currentData);
492+
auto tempBuffer = std::move(streamData_->mutableStringBuffer());
464493
const bool hasNulls = streamData_->hasNulls();
465494
streamData_->reset();
466495
NIMBLE_CHECK(
467496
streamData_->empty(), "StreamData should be empty after reset");
468497

469498
auto& mutableData = streamData_->mutableData();
470-
mutableData.reserve(remainingDataCount);
471-
NIMBLE_DCHECK_GE(
472-
mutableData.capacity(),
473-
remainingDataCount,
474-
"Data buffer capacity should be at least new capacity");
475-
476-
mutableData.resize(remainingDataCount);
477-
NIMBLE_CHECK_EQ(
478-
mutableData.size(),
479-
remainingDataCount,
480-
"Data buffer size should be equal to remaining data count");
481-
482-
std::copy_n(
483-
tempData.begin() + dataElementOffset_,
484-
remainingDataCount,
485-
mutableData.begin());
486-
487-
auto& mutableNonNulls = streamData_->mutableNonNulls();
488-
mutableNonNulls.reserve(remainingNonNullsCount);
489-
NIMBLE_DCHECK_GE(
490-
mutableNonNulls.capacity(),
491-
remainingNonNullsCount,
492-
"NonNulls buffer capacity should be at least new capacity");
499+
if constexpr (std::is_same_v<T, std::string_view>) {
500+
if (useStreamStringBuffer_) {
501+
detail::compactStringBuffer<T>(
502+
mutableData.memoryPool(),
503+
streamData_->mutableStringBuffer(),
504+
tempData,
505+
dataElementOffset_,
506+
extraMemory_);
507+
}
508+
}
509+
if (!useStreamStringBuffer_) {
510+
mutableData.resize(remainingDataCount);
511+
NIMBLE_DCHECK_EQ(
512+
mutableData.size(),
513+
remainingDataCount,
514+
"Data buffer size should be equal to remaining data count");
515+
516+
std::copy_n(
517+
tempData.begin() + dataElementOffset_,
518+
remainingDataCount,
519+
mutableData.begin());
520+
}
493521

494522
streamData_->ensureAdditionalNullsCapacity(
495523
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
496524
if (hasNulls) {
525+
auto& mutableNonNulls = streamData_->mutableNonNulls();
497526
mutableNonNulls.resize(remainingNonNullsCount);
498527
NIMBLE_CHECK_EQ(
499528
mutableNonNulls.size(),
@@ -515,6 +544,7 @@ class NullableContentStreamChunker final : public StreamChunker {
515544
const uint64_t maxChunkSize_;
516545
const bool omitStream_;
517546
const bool ensureFullChunks_;
547+
const bool useStreamStringBuffer_;
518548

519549
size_t dataElementOffset_{0};
520550
size_t nonNullsOffset_{0};

dwio/nimble/velox/StreamData.h

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class StreamData {
5454

5555
virtual void materialize() {}
5656

57+
virtual void materializeStrings() {}
58+
5759
const StreamDescriptorBuilder& descriptor() const {
5860
return descriptor_;
5961
}
@@ -84,6 +86,53 @@ class MutableStreamData : public StreamData {
8486
const InputBufferGrowthPolicy& growthPolicy_;
8587
};
8688

89+
class StringBuffer {
90+
public:
91+
explicit StringBuffer(velox::memory::MemoryPool& pool)
92+
: buffer_{&pool}, lengths_{&pool} {}
93+
94+
Vector<char>& mutableBuffer() {
95+
return buffer_;
96+
}
97+
98+
Vector<size_t>& mutableLengths() {
99+
return lengths_;
100+
}
101+
102+
bool empty() const {
103+
return lengths_.empty();
104+
}
105+
106+
uint64_t size() const {
107+
return lengths_.size();
108+
}
109+
110+
uint64_t memoryUsed() const {
111+
return buffer_.size() + lengths_.size() * sizeof(size_t);
112+
}
113+
114+
void clear() {
115+
buffer_.clear();
116+
lengths_.clear();
117+
}
118+
119+
void materialize(Vector<std::string_view>& data) {
120+
if (empty()) {
121+
return;
122+
}
123+
data.resize(lengths_.size());
124+
size_t runningOffset = 0;
125+
for (size_t i = 0; i < lengths_.size(); ++i) {
126+
data[i] = std::string_view(buffer_.data() + runningOffset, lengths_[i]);
127+
runningOffset += lengths_[i];
128+
}
129+
}
130+
131+
private:
132+
Vector<char> buffer_;
133+
Vector<size_t> lengths_;
134+
};
135+
87136
/// Provides a lightweight, non-owning view into a portion of stream data,
88137
/// containing references to the data content and null indicators for efficient
89138
/// processing of large streams without copying data.
@@ -156,7 +205,9 @@ class ContentStreamData final : public MutableStreamData {
156205
velox::memory::MemoryPool& pool,
157206
const StreamDescriptorBuilder& descriptor,
158207
const InputBufferGrowthPolicy& growthPolicy)
159-
: MutableStreamData(descriptor, growthPolicy), data_{&pool} {}
208+
: MutableStreamData(descriptor, growthPolicy),
209+
data_{&pool},
210+
stringBuffer_{pool} {}
160211

161212
inline uint32_t rowCount() const override {
162213
return data_.size();
@@ -180,13 +231,18 @@ class ContentStreamData final : public MutableStreamData {
180231
}
181232

182233
inline uint64_t memoryUsed() const override {
183-
return (data_.size() * sizeof(T)) + extraMemory_;
234+
return (data_.size() * sizeof(T)) +
235+
(stringBuffer_.empty() ? extraMemory_ : stringBuffer_.memoryUsed());
184236
}
185237

186238
inline Vector<T>& mutableData() {
187239
return data_;
188240
}
189241

242+
inline StringBuffer& mutableStringBuffer() {
243+
return stringBuffer_;
244+
}
245+
190246
void ensureMutableDataCapacity(uint64_t newSize) {
191247
this->ensureDataCapacity(data_, newSize);
192248
}
@@ -197,11 +253,19 @@ class ContentStreamData final : public MutableStreamData {
197253

198254
inline virtual void reset() override {
199255
data_.clear();
256+
stringBuffer_.clear();
200257
extraMemory_ = 0;
201258
}
202259

260+
void materializeStrings() override {
261+
if constexpr (std::is_same_v<T, std::string_view>) {
262+
stringBuffer_.materialize(data_);
263+
}
264+
}
265+
203266
private:
204267
Vector<T> data_;
268+
StringBuffer stringBuffer_;
205269
uint64_t extraMemory_{0};
206270
};
207271

@@ -286,6 +350,7 @@ class NullableContentStreamData final : public NullsStreamData {
286350
const InputBufferGrowthPolicy& growthPolicy)
287351
: NullsStreamData(memoryPool, descriptor, growthPolicy),
288352
data_{&memoryPool},
353+
stringBuffer_{memoryPool},
289354
extraMemory_{0} {}
290355

291356
inline std::string_view data() const override {
@@ -298,14 +363,18 @@ class NullableContentStreamData final : public NullsStreamData {
298363
}
299364

300365
inline uint64_t memoryUsed() const override {
301-
return (data_.size() * sizeof(T)) + extraMemory_ +
302-
NullsStreamData::memoryUsed();
366+
return (data_.size() * sizeof(T)) + NullsStreamData::memoryUsed() +
367+
(stringBuffer_.empty() ? extraMemory_ : stringBuffer_.memoryUsed());
303368
}
304369

305370
inline Vector<T>& mutableData() {
306371
return data_;
307372
}
308373

374+
inline StringBuffer& mutableStringBuffer() {
375+
return stringBuffer_;
376+
}
377+
309378
void ensureMutableDataCapacity(uint64_t newSize) {
310379
this->ensureDataCapacity(data_, newSize);
311380
}
@@ -317,11 +386,19 @@ class NullableContentStreamData final : public NullsStreamData {
317386
inline void reset() override {
318387
NullsStreamData::reset();
319388
data_.clear();
389+
stringBuffer_.clear();
320390
extraMemory_ = 0;
321391
}
322392

393+
void materializeStrings() override {
394+
if constexpr (std::is_same_v<T, std::string_view>) {
395+
stringBuffer_.materialize(data_);
396+
}
397+
}
398+
323399
private:
324400
Vector<T> data_;
401+
StringBuffer stringBuffer_;
325402
uint64_t extraMemory_;
326403
};
327404

0 commit comments

Comments
 (0)