@@ -41,6 +41,28 @@ inline bool shouldOmitNullStream(
4141 }
4242 return isFirstChunk || streamData.empty ();
4343}
44+
45+ template <typename T>
46+ inline void compactStringBuffer (
47+ velox::memory::MemoryPool* pool,
48+ StringBuffer& stringBuffer,
49+ Vector<T>& mutableData,
50+ size_t dataElementOffset,
51+ uint64_t extraMemory) {
52+ StringBuffer newStringBuffer (*pool);
53+ auto & newBuffer = newStringBuffer.mutableBuffer ();
54+ newBuffer.reserve (extraMemory);
55+
56+ auto & newLengths = newStringBuffer.mutableLengths ();
57+ newLengths.reserve (mutableData.size () - dataElementOffset);
58+
59+ for (auto i = dataElementOffset; i < mutableData.size (); ++i) {
60+ auto str = mutableData[i];
61+ newBuffer.insert (newBuffer.end (), str.begin (), str.end ());
62+ newLengths.push_back (str.size ());
63+ }
64+ stringBuffer = std::move (newStringBuffer);
65+ }
4466} // namespace detail
4567
4668/* *
@@ -94,6 +116,7 @@ class ContentStreamChunker final : public StreamChunker {
94116 minChunkSize_{options.minChunkSize },
95117 maxChunkSize_{options.maxChunkSize },
96118 ensureFullChunks_{options.ensureFullChunks },
119+ useStreamStringBuffer_{!streamData_->mutableStringBuffer ().empty ()},
97120 extraMemory_{streamData_->extraMemory ()} {
98121 static_assert (
99122 std::is_same_v<StreamDataT, ContentStreamData<T>> ||
@@ -194,27 +217,36 @@ class ContentStreamChunker final : public StreamChunker {
194217
195218 // Move and clear existing buffer
196219 auto tempData = std::move (currentData);
220+ auto tempBuffer = std::move (streamData_->mutableStringBuffer ());
197221 streamData_->reset ();
198222 NIMBLE_DCHECK (
199223 streamData_->empty (), " StreamData should be empty after reset" );
200224
201225 auto & mutableData = streamData_->mutableData ();
202- mutableData.reserve (remainingDataCount);
203- NIMBLE_DCHECK_GE (
204- mutableData.capacity (),
205- remainingDataCount,
206- " Data buffer capacity should be at least new capacity" );
207-
208- mutableData.resize (remainingDataCount);
209- NIMBLE_DCHECK_EQ (
210- mutableData.size (),
211- remainingDataCount,
212- " Data buffer size should be equal to remaining data count" );
213-
214- std::copy_n (
215- tempData.begin () + dataElementOffset_,
216- remainingDataCount,
217- mutableData.begin ());
226+
227+ if constexpr (std::is_same_v<T, std::string_view>) {
228+ if (useStreamStringBuffer_) {
229+ detail::compactStringBuffer<T>(
230+ mutableData.memoryPool (),
231+ streamData_->mutableStringBuffer (),
232+ tempData,
233+ dataElementOffset_,
234+ extraMemory_);
235+ }
236+ }
237+
238+ if (!useStreamStringBuffer_) {
239+ mutableData.resize (remainingDataCount);
240+ NIMBLE_DCHECK_EQ (
241+ mutableData.size (),
242+ remainingDataCount,
243+ " Data buffer size should be equal to remaining data count" );
244+
245+ std::copy_n (
246+ tempData.begin () + dataElementOffset_,
247+ remainingDataCount,
248+ mutableData.begin ());
249+ }
218250 dataElementOffset_ = 0 ;
219251 streamData_->extraMemory () = extraMemory_;
220252
@@ -230,6 +262,7 @@ class ContentStreamChunker final : public StreamChunker {
230262 const uint64_t minChunkSize_;
231263 const uint64_t maxChunkSize_;
232264 const bool ensureFullChunks_;
265+ const bool useStreamStringBuffer_;
233266
234267 size_t dataElementOffset_{0 };
235268 size_t extraMemory_{0 };
@@ -314,16 +347,10 @@ class NullsStreamChunker final : public StreamChunker {
314347 NIMBLE_CHECK (
315348 streamData_->empty (), " StreamData should be empty after reset" );
316349
317- auto & mutableNonNulls = streamData_->mutableNonNulls ();
318- mutableNonNulls.reserve (remainingNonNullsCount);
319- NIMBLE_DCHECK_GE (
320- mutableNonNulls.capacity (),
321- remainingNonNullsCount,
322- " NonNulls buffer capacity should be at least new capacity" );
323-
324350 streamData_->ensureAdditionalNullsCapacity (
325351 hasNulls, static_cast <uint32_t >(remainingNonNullsCount));
326352 if (hasNulls) {
353+ auto & mutableNonNulls = streamData_->mutableNonNulls ();
327354 mutableNonNulls.resize (remainingNonNullsCount);
328355 NIMBLE_CHECK_EQ (
329356 mutableNonNulls.size (),
@@ -361,6 +388,7 @@ class NullableContentStreamChunker final : public StreamChunker {
361388 options.minChunkSize ,
362389 options.isFirstChunk )},
363390 ensureFullChunks_{options.ensureFullChunks },
391+ useStreamStringBuffer_{!streamData_->mutableStringBuffer ().empty ()},
364392 extraMemory_{streamData_->extraMemory ()} {
365393 static_assert (sizeof (bool ) == 1 );
366394 NIMBLE_CHECK (
@@ -461,39 +489,40 @@ class NullableContentStreamChunker final : public StreamChunker {
461489 // Move and clear existing buffers
462490 auto tempNonNulls = std::move (currentNonNulls);
463491 auto tempData = std::move (currentData);
492+ auto tempBuffer = std::move (streamData_->mutableStringBuffer ());
464493 const bool hasNulls = streamData_->hasNulls ();
465494 streamData_->reset ();
466495 NIMBLE_CHECK (
467496 streamData_->empty (), " StreamData should be empty after reset" );
468497
469498 auto & mutableData = streamData_->mutableData ();
470- mutableData.reserve (remainingDataCount);
471- NIMBLE_DCHECK_GE (
472- mutableData.capacity (),
473- remainingDataCount,
474- " Data buffer capacity should be at least new capacity" );
475-
476- mutableData.resize (remainingDataCount);
477- NIMBLE_CHECK_EQ (
478- mutableData.size (),
479- remainingDataCount,
480- " Data buffer size should be equal to remaining data count" );
481-
482- std::copy_n (
483- tempData.begin () + dataElementOffset_,
484- remainingDataCount,
485- mutableData.begin ());
486-
487- auto & mutableNonNulls = streamData_->mutableNonNulls ();
488- mutableNonNulls.reserve (remainingNonNullsCount);
489- NIMBLE_DCHECK_GE (
490- mutableNonNulls.capacity (),
491- remainingNonNullsCount,
492- " NonNulls buffer capacity should be at least new capacity" );
499+ if constexpr (std::is_same_v<T, std::string_view>) {
500+ if (useStreamStringBuffer_) {
501+ detail::compactStringBuffer<T>(
502+ mutableData.memoryPool (),
503+ streamData_->mutableStringBuffer (),
504+ tempData,
505+ dataElementOffset_,
506+ extraMemory_);
507+ }
508+ }
509+ if (!useStreamStringBuffer_) {
510+ mutableData.resize (remainingDataCount);
511+ NIMBLE_DCHECK_EQ (
512+ mutableData.size (),
513+ remainingDataCount,
514+ " Data buffer size should be equal to remaining data count" );
515+
516+ std::copy_n (
517+ tempData.begin () + dataElementOffset_,
518+ remainingDataCount,
519+ mutableData.begin ());
520+ }
493521
494522 streamData_->ensureAdditionalNullsCapacity (
495523 hasNulls, static_cast <uint32_t >(remainingNonNullsCount));
496524 if (hasNulls) {
525+ auto & mutableNonNulls = streamData_->mutableNonNulls ();
497526 mutableNonNulls.resize (remainingNonNullsCount);
498527 NIMBLE_CHECK_EQ (
499528 mutableNonNulls.size (),
@@ -515,6 +544,7 @@ class NullableContentStreamChunker final : public StreamChunker {
515544 const uint64_t maxChunkSize_;
516545 const bool omitStream_;
517546 const bool ensureFullChunks_;
547+ const bool useStreamStringBuffer_;
518548
519549 size_t dataElementOffset_{0 };
520550 size_t nonNullsOffset_{0 };
0 commit comments