Skip to content

Commit a300655

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Stream Data (facebookincubator#391)
Summary: Adds per-stream string buffer support to Nimble's StreamData infrastructure. Previously, string data buffers were managed externally, but now each MutableStreamData instance can own its own Buffer for string content, enabling better memory isolation and lifecycle management with chunking. Differential Revision: D89933055
1 parent 43994c7 commit a300655

File tree

3 files changed

+86
-48
lines changed

3 files changed

+86
-48
lines changed

dwio/nimble/velox/StreamChunker.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ class ContentStreamChunker final : public StreamChunker {
194194

195195
// Move and clear existing buffer
196196
auto tempData = std::move(currentData);
197+
auto tempBuffer = streamData_->takeStringBuffer();
197198
streamData_->reset();
198199
NIMBLE_DCHECK(
199200
streamData_->empty(), "StreamData should be empty after reset");
@@ -215,6 +216,11 @@ class ContentStreamChunker final : public StreamChunker {
215216
tempData.begin() + dataElementOffset_,
216217
remainingDataCount,
217218
mutableData.begin());
219+
if constexpr (std::is_same_v<T, std::string_view>) {
220+
if (tempBuffer) {
221+
streamData_->compactStringBuffer(mutableData);
222+
}
223+
}
218224
dataElementOffset_ = 0;
219225
streamData_->extraMemory() = extraMemory_;
220226

@@ -461,6 +467,7 @@ class NullableContentStreamChunker final : public StreamChunker {
461467
// Move and clear existing buffers
462468
auto tempNonNulls = std::move(currentNonNulls);
463469
auto tempData = std::move(currentData);
470+
auto tempBuffer = streamData_->takeStringBuffer();
464471
const bool hasNulls = streamData_->hasNulls();
465472
streamData_->reset();
466473
NIMBLE_CHECK(
@@ -505,6 +512,11 @@ class NullableContentStreamChunker final : public StreamChunker {
505512
remainingNonNullsCount,
506513
mutableNonNulls.begin());
507514
}
515+
if constexpr (std::is_same_v<T, std::string_view>) {
516+
if (tempBuffer) {
517+
streamData_->compactStringBuffer(mutableData);
518+
}
519+
}
508520
streamData_->extraMemory() = extraMemory_;
509521
dataElementOffset_ = 0;
510522
nonNullsOffset_ = 0;

dwio/nimble/velox/StreamData.h

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <span>
2020
#include <string_view>
2121

22+
#include "dwio/nimble/common/Buffer.h"
2223
#include "dwio/nimble/common/Vector.h"
2324
#include "dwio/nimble/velox/BufferGrowthPolicy.h"
2425
#include "dwio/nimble/velox/SchemaBuilder.h"
@@ -65,9 +66,12 @@ class StreamData {
6566
class MutableStreamData : public StreamData {
6667
protected:
6768
MutableStreamData(
69+
velox::memory::MemoryPool& memoryPool,
6870
const StreamDescriptorBuilder& descriptor,
6971
const InputBufferGrowthPolicy& growthPolicy)
70-
: StreamData(descriptor), growthPolicy_{growthPolicy} {}
72+
: StreamData(descriptor),
73+
growthPolicy_{growthPolicy},
74+
memoryPool_{memoryPool} {}
7175

7276
virtual ~MutableStreamData() = default;
7377

@@ -80,8 +84,44 @@ class MutableStreamData : public StreamData {
8084
}
8185
}
8286

87+
void resetStringBuffer() {
88+
if (stringBuffer_) {
89+
stringBuffer_.reset();
90+
}
91+
}
92+
93+
public:
94+
/// Returns a reference to the internal string buffer.
95+
/// Creates the string buffer lazily if it doesn't exist.
96+
Buffer& stringBuffer() {
97+
if (!stringBuffer_) {
98+
stringBuffer_ = std::make_unique<Buffer>(memoryPool_);
99+
}
100+
return *stringBuffer_;
101+
}
102+
103+
/// Takes ownership of the buffer from this stream.
104+
std::unique_ptr<Buffer> takeStringBuffer() {
105+
return std::move(stringBuffer_);
106+
}
107+
108+
/// Compacts the string buffer by copying remaining strings to a new buffer
109+
/// and updating the string_views in data.
110+
void compactStringBuffer(Vector<std::string_view>& data) {
111+
if (data.empty()) {
112+
return;
113+
}
114+
auto newStringBuffer = std::make_unique<Buffer>(memoryPool_);
115+
for (size_t i = 0; i < data.size(); ++i) {
116+
data[i] = newStringBuffer->writeString(data[i]);
117+
}
118+
stringBuffer_ = std::move(newStringBuffer);
119+
}
120+
83121
private:
84122
const InputBufferGrowthPolicy& growthPolicy_;
123+
velox::memory::MemoryPool& memoryPool_;
124+
std::unique_ptr<Buffer> stringBuffer_;
85125
};
86126

87127
/// Provides a lightweight, non-owning view into a portion of stream data,
@@ -156,7 +196,7 @@ class ContentStreamData final : public MutableStreamData {
156196
velox::memory::MemoryPool& pool,
157197
const StreamDescriptorBuilder& descriptor,
158198
const InputBufferGrowthPolicy& growthPolicy)
159-
: MutableStreamData(descriptor, growthPolicy), data_{&pool} {}
199+
: MutableStreamData(pool, descriptor, growthPolicy), data_{&pool} {}
160200

161201
inline uint32_t rowCount() const override {
162202
return data_.size();
@@ -198,6 +238,7 @@ class ContentStreamData final : public MutableStreamData {
198238
inline virtual void reset() override {
199239
data_.clear();
200240
extraMemory_ = 0;
241+
resetStringBuffer();
201242
}
202243

203244
private:
@@ -218,7 +259,7 @@ class NullsStreamData : public MutableStreamData {
218259
velox::memory::MemoryPool& pool,
219260
const StreamDescriptorBuilder& descriptor,
220261
const InputBufferGrowthPolicy& growthPolicy)
221-
: MutableStreamData(descriptor, growthPolicy), nonNulls_{&pool} {}
262+
: MutableStreamData(pool, descriptor, growthPolicy), nonNulls_{&pool} {}
222263

223264
inline uint32_t rowCount() const override {
224265
return bufferedCount_;
@@ -252,6 +293,7 @@ class NullsStreamData : public MutableStreamData {
252293
nonNulls_.clear();
253294
hasNulls_ = false;
254295
bufferedCount_ = 0;
296+
resetStringBuffer();
255297
}
256298

257299
void materialize() override {

dwio/nimble/velox/tests/StreamChunkerTests.cpp

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ void populateData(Vector<T>& vec, const std::vector<T>& data) {
3535
}
3636
}
3737

38+
uint64_t populateStringData(
39+
Buffer& stringBuffer,
40+
Vector<std::string_view>& data,
41+
const std::vector<std::string_view>& testData) {
42+
uint64_t extraMemory = 0;
43+
for (const auto& str : testData) {
44+
extraMemory += str.size();
45+
data.emplace_back(stringBuffer.writeString(str));
46+
}
47+
return extraMemory;
48+
}
49+
3850
template <typename T>
3951
std::vector<T> toVector(std::string_view data) {
4052
const T* chunkData = reinterpret_cast<const T*>(data.data());
@@ -508,13 +520,8 @@ TEST_F(StreamChunkerTestsBase, contentStreamStringChunking) {
508520
{
509521
std::vector<std::string_view> testData = {
510522
"short", "a_longer_string", "x", "medium_size", "tiny"};
511-
auto& data = stream.mutableData();
512-
populateData(data, testData);
513-
514-
// Calculate extra memory for string content
515-
for (const auto& str : testData) {
516-
stream.extraMemory() += str.size();
517-
}
523+
stream.extraMemory() += populateStringData(
524+
stream.stringBuffer(), stream.mutableData(), testData);
518525

519526
// Total string content sizes:
520527
// "short"(5) + "a_longer_string"(15) + "x"(1) + "medium_size"(11) +
@@ -582,16 +589,11 @@ TEST_F(StreamChunkerTestsBase, contentStreamStringChunking) {
582589

583590
// Test 4: We can reuse a stream post compaction.
584591
{
585-
auto& data = stream.mutableData();
586592
stream.extraMemory() = 0; // Reset extra memory
587593
std::vector<std::string_view> testData = {
588594
"hello", "world", "hello", "world"};
589-
populateData(data, testData);
590-
591-
// Calculate extra memory for string content
592-
for (const auto& str : testData) {
593-
stream.extraMemory() += str.size();
594-
}
595+
stream.extraMemory() += populateStringData(
596+
stream.stringBuffer(), stream.mutableData(), testData);
595597

596598
auto chunker = getStreamChunker(
597599
stream,
@@ -619,17 +621,12 @@ TEST_F(StreamChunkerTestsBase, contentStreamStringChunking) {
619621
{
620622
std::vector<std::string_view> testData = {
621623
"a", "short", "really really large string", "small"};
622-
auto& data = stream.mutableData();
623-
populateData(data, testData);
624624

625625
// Size of "really really large string" and string view minus 1.
626626
maxChunkSize = 24 + sizeof(std::string_view);
627627

628-
// Calculate extra memory for string content.
629-
for (const auto& str : testData) {
630-
stream.extraMemory() += str.size();
631-
}
632-
628+
stream.extraMemory() += populateStringData(
629+
stream.stringBuffer(), stream.mutableData(), testData);
633630
// "a"(1), "short"(5), "really really large string"(25), "small"(5) = 37
634631
// bytes of extra memory.
635632
ASSERT_EQ(stream.extraMemory(), 37);
@@ -888,9 +885,7 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunking) {
888885
// insertion
889886
stream.ensureAdditionalNullsCapacity(
890887
/*mayHaveNulls=*/true, static_cast<uint32_t>(nonNullsData.size()));
891-
auto& data = stream.mutableData();
892888
auto& nonNulls = stream.mutableNonNulls();
893-
populateData(data, testData);
894889
populateData(nonNulls, nonNullsData);
895890

896891
// Set extra memory for string overhead
@@ -900,9 +895,9 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunking) {
900895
// size of nulls = 11 * 1 = 11 bytes
901896
// total size of stored string data = 36 bytes
902897
// total size of stream data = 56 + 56 + 11 + 36 = 159 bytes
903-
for (const auto& entry : testData) {
904-
stream.extraMemory() += entry.size();
905-
}
898+
stream.extraMemory() +=
899+
populateStringData(stream.stringBuffer(), stream.mutableData(), testData);
900+
906901
ASSERT_EQ(stream.memoryUsed(), 159);
907902

908903
// Test 1: Not last chunk
@@ -982,16 +977,13 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunking) {
982977
stream.ensureAdditionalNullsCapacity(
983978
/*mayHaveNulls=*/false,
984979
static_cast<uint32_t>(smallNonNullsData.size()));
985-
auto& smallData = stream.mutableData();
986980
auto& smallNonNulls = stream.mutableNonNulls();
987-
populateData(smallData, smallTestData);
988981
populateData(smallNonNulls, smallNonNullsData);
989982

990983
// Reset extra memory for new test data
991984
stream.extraMemory() = 0;
992-
for (const auto& entry : smallTestData) {
993-
stream.extraMemory() += entry.size();
994-
}
985+
stream.extraMemory() += populateStringData(
986+
stream.stringBuffer(), stream.mutableData(), smallTestData);
995987

996988
const uint64_t minChunkSize =
997989
smallTestData.at(0).size() + sizeof(std::string_view);
@@ -1025,7 +1017,6 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunking) {
10251017
// Test 5: Single string exceeds maxChunkSize.
10261018
{
10271019
testData = {"a", "short", "really really large string", "small"};
1028-
populateData(data, testData);
10291020

10301021
nonNullsData = {true, false, true, true, false, true, false};
10311022
stream.ensureAdditionalNullsCapacity(
@@ -1037,9 +1028,8 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunking) {
10371028
const uint64_t maxChunkSize = 24 + sizeof(std::string_view) + sizeof(bool);
10381029

10391030
// Calculate extra memory for string content.
1040-
for (const auto& str : testData) {
1041-
stream.extraMemory() += str.size();
1042-
}
1031+
stream.extraMemory() += populateStringData(
1032+
stream.stringBuffer(), stream.mutableData(), testData);
10431033

10441034
// "a"(1), "short"(5), "really really large string"(25), "small"(5) =
10451035
// 37 bytes of extra memory.
@@ -1113,11 +1103,8 @@ TEST_F(StreamChunkerTestsBase, contentStreamStringChunkerRowCount) {
11131103
// Add strings with varying sizes.
11141104
std::vector<std::string_view> testData = {
11151105
"a", "bb", "ccc", "dddd", "eeeee", "ffffff"};
1116-
auto& data = stream.mutableData();
1117-
populateData(data, testData);
1118-
for (const auto& str : testData) {
1119-
stream.extraMemory() += str.size();
1120-
}
1106+
stream.extraMemory() +=
1107+
populateStringData(stream.stringBuffer(), stream.mutableData(), testData);
11211108

11221109
// Max chunk size that fits ~2-3 strings per chunk.
11231110
auto chunker = getStreamChunker(
@@ -1228,13 +1215,10 @@ TEST_F(StreamChunkerTestsBase, nullableContentStreamStringChunkerRowCount) {
12281215
std::vector<bool> nonNullsData = {true, false, true, true, false, true};
12291216
stream.ensureAdditionalNullsCapacity(
12301217
/*mayHaveNulls=*/true, static_cast<uint32_t>(nonNullsData.size()));
1231-
auto& data = stream.mutableData();
12321218
auto& nonNulls = stream.mutableNonNulls();
1233-
populateData(data, testData);
12341219
populateData(nonNulls, nonNullsData);
1235-
for (const auto& str : testData) {
1236-
stream.extraMemory() += str.size();
1237-
}
1220+
stream.extraMemory() +=
1221+
populateStringData(stream.stringBuffer(), stream.mutableData(), testData);
12381222

12391223
auto chunker = getStreamChunker(
12401224
stream,

0 commit comments

Comments
 (0)