Skip to content

Commit 2da4c5e

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Velox Writer (facebookincubator#392)
Summary: Integrates the per-stream string buffer infrastructure (from D89933055) into the Velox Writer, enabling string fields to use per-field buffers instead of a shared global buffer. This architectural change enables incremental memory reclamation during chunking operations, improving memory efficiency for string-heavy workloads. The feature is controlled by a global [JustKnob](https://www.internalfb.com/intern/justknobs/?name=dwio%2Fnimble%3Adisable_shared_string_buffers). When the knob returns true, per-stream string buffers are enabled. When set to false (default), the existing shared buffer behavior is preserved. Reviewed By: helfman Differential Revision: D89938211
1 parent 4596136 commit 2da4c5e

File tree

11 files changed

+163
-16
lines changed

11 files changed

+163
-16
lines changed

dwio/nimble/velox/BufferGrowthPolicy.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ class DefaultInputBufferGrowthPolicy : public InputBufferGrowthPolicy {
5656
{32UL, 4.0}, {512UL, 1.414}, {4096UL, 1.189}});
5757
}
5858

59+
// Growth policy for per-stream string buffers when disableSharedStringBuffers
60+
// is enabled. Uses byte-based thresholds since string data is stored as raw
61+
// bytes.
62+
static std::unique_ptr<InputBufferGrowthPolicy> withStringBufferRanges() {
63+
return std::make_unique<DefaultInputBufferGrowthPolicy>(
64+
std::map<uint64_t, float>{
65+
{4096UL, 2.0}, {4194304UL, 1.414}, {12582912UL, 1.189}});
66+
}
67+
5968
private:
6069
// Map of range lowerbounds and the growth factor for the range.
6170
// The first lowerbound is the smallest unit of allocation and the last range

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ class SimpleFieldWriter : public FieldWriter {
338338
nullCount = size - nonNullCount;
339339
}
340340

341+
// NOTE: This logic is wrong. Will be removed with new stats changes.
341342
columnStats_.logicalSize += nullCount +
342343
((K == velox::TypeKind::VARCHAR || K == velox::TypeKind::VARBINARY)
343344
? valuesStream_.extraMemory()
@@ -355,6 +356,72 @@ class SimpleFieldWriter : public FieldWriter {
355356
ColumnStats& columnStats_;
356357
};
357358

359+
template <velox::TypeKind K>
360+
class StringFieldWriter : public FieldWriter {
361+
public:
362+
explicit StringFieldWriter(FieldWriterContext& context)
363+
: FieldWriter(
364+
context,
365+
context.schemaBuilder().createScalarTypeBuilder(
366+
NimbleTypeTraits<K>::scalarKind)),
367+
valuesStream_{context.createNullableContentStringStreamData(
368+
typeBuilder_->asScalar().scalarDescriptor())},
369+
columnStats_{context.columnStats(valuesStream_.descriptor().offset())} {
370+
}
371+
372+
void write(
373+
const velox::VectorPtr& vector,
374+
const OrderedRanges& ranges,
375+
folly::Executor*) override {
376+
// Ensure string buffer capacity.
377+
auto size = ranges.size();
378+
const uint64_t totalBytes = getRawSizeFromVector(vector, ranges);
379+
valuesStream_.ensureStringBufferCapacity(size, totalBytes);
380+
381+
// Append to string buffer.
382+
uint64_t memoryUsed = 0;
383+
auto stringBuffer = valuesStream_.mutableData();
384+
auto appendToStringBuffer = [&](velox::StringView sv) {
385+
memoryUsed += sv.size();
386+
auto& buffer = stringBuffer.buffer;
387+
buffer.insert(buffer.end(), sv.begin(), sv.end());
388+
auto& mutableLengths = stringBuffer.lengths;
389+
mutableLengths.push_back(sv.size());
390+
};
391+
392+
uint64_t nonNullCount = 0;
393+
if (auto flat = vector->asFlatVector<velox::StringView>()) {
394+
valuesStream_.ensureAdditionalNullsCapacity(flat->mayHaveNulls(), size);
395+
nonNullCount = iterateNonNullValues(
396+
ranges,
397+
valuesStream_.mutableNonNulls(),
398+
Flat<velox::StringView>{vector},
399+
appendToStringBuffer);
400+
} else {
401+
auto decodingContext = context_.decodingContext();
402+
auto& decoded = decodingContext.decode(vector, ranges);
403+
valuesStream_.ensureAdditionalNullsCapacity(decoded.mayHaveNulls(), size);
404+
nonNullCount = iterateNonNullValues(
405+
ranges,
406+
valuesStream_.mutableNonNulls(),
407+
Decoded<velox::StringView>{decoded},
408+
appendToStringBuffer);
409+
}
410+
uint64_t nullCount = size - nonNullCount;
411+
columnStats_.logicalSize += nullCount + memoryUsed;
412+
columnStats_.nullCount += nullCount;
413+
columnStats_.valueCount += size;
414+
}
415+
416+
void reset() override {
417+
valuesStream_.reset();
418+
}
419+
420+
private:
421+
NullableContentStringStreamData& valuesStream_;
422+
ColumnStats& columnStats_;
423+
};
424+
358425
class TimestampFieldWriter : public FieldWriter {
359426
public:
360427
explicit TimestampFieldWriter(FieldWriterContext& context)
@@ -1964,15 +2031,25 @@ std::unique_ptr<FieldWriter> FieldWriter::create(
19642031
break;
19652032
}
19662033
case velox::TypeKind::VARCHAR: {
1967-
field = std::make_unique<
1968-
SimpleFieldWriter<velox::TypeKind::VARCHAR, StringConverter>>(
1969-
context);
2034+
if (context.disableSharedStringBuffers()) {
2035+
field = std::make_unique<StringFieldWriter<velox::TypeKind::VARCHAR>>(
2036+
context);
2037+
} else {
2038+
field = std::make_unique<
2039+
SimpleFieldWriter<velox::TypeKind::VARCHAR, StringConverter>>(
2040+
context);
2041+
}
19702042
break;
19712043
}
19722044
case velox::TypeKind::VARBINARY: {
1973-
field = std::make_unique<
1974-
SimpleFieldWriter<velox::TypeKind::VARBINARY, StringConverter>>(
1975-
context);
2045+
if (context.disableSharedStringBuffers()) {
2046+
field = std::make_unique<StringFieldWriter<velox::TypeKind::VARBINARY>>(
2047+
context);
2048+
} else {
2049+
field = std::make_unique<
2050+
SimpleFieldWriter<velox::TypeKind::VARBINARY, StringConverter>>(
2051+
context);
2052+
}
19762053
break;
19772054
}
19782055
case velox::TypeKind::TIMESTAMP: {

dwio/nimble/velox/FieldWriter.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,22 @@ class FieldWriterContext {
296296
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_)));
297297
}
298298

299+
nimble::NullableContentStringStreamData&
300+
createNullableContentStringStreamData(
301+
const nimble::StreamDescriptorBuilder& descriptor) {
302+
return static_cast<nimble::NullableContentStringStreamData&>(
303+
*streams_.emplace_back(
304+
std::make_unique<nimble::NullableContentStringStreamData>(
305+
*bufferMemoryPool_,
306+
descriptor,
307+
*inputBufferGrowthPolicy_,
308+
*stringBufferGrowthPolicy_)));
309+
}
310+
311+
inline bool disableSharedStringBuffers() const {
312+
return disableSharedStringBuffers_;
313+
}
314+
299315
protected:
300316
MemoryPoolHolder bufferMemoryPool_;
301317
std::mutex flatMapSchemaMutex_;
@@ -305,8 +321,10 @@ class FieldWriterContext {
305321
folly::F14FastSet<uint32_t> dictionaryArrayNodeIds_;
306322
folly::F14FastSet<uint32_t> deduplicatedMapNodeIds_;
307323
bool ignoreTopLevelNulls_{false};
324+
bool disableSharedStringBuffers_{false};
308325

309326
std::unique_ptr<InputBufferGrowthPolicy> inputBufferGrowthPolicy_;
327+
std::unique_ptr<InputBufferGrowthPolicy> stringBufferGrowthPolicy_;
310328
InputBufferGrowthStats inputBufferGrowthStats_;
311329

312330
std::unordered_map<offset_size, ColumnStats> columnStats_;

dwio/nimble/velox/StreamChunker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,7 @@ class NullableContentStringStreamChunker final : public StreamChunker {
591591
options.isFirstChunk)},
592592
ensureFullChunks_{options.ensureFullChunks} {
593593
static_assert(sizeof(bool) == 1);
594+
streamData_->materializeNulls();
594595
}
595596

596597
std::optional<StreamDataView> next() override {

dwio/nimble/velox/StreamData.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,10 @@ class NullableContentStringStreamData final : public NullsStreamData {
381381
return stringViews_;
382382
}
383383

384+
void materializeNulls() {
385+
NullsStreamData::materialize();
386+
}
387+
384388
void materialize() override {
385389
stringViews_.resize(lengths_.size());
386390
auto runningOffset = 0;
@@ -389,7 +393,7 @@ class NullableContentStringStreamData final : public NullsStreamData {
389393
std::string_view(buffer_.data() + runningOffset, lengths_[i]);
390394
runningOffset += lengths_[i];
391395
}
392-
NullsStreamData::materialize();
396+
materializeNulls();
393397
materialized_ = true;
394398
}
395399

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ class WriterContext : public FieldWriterContext {
5757
inputBufferGrowthPolicy_ = this->options_.lowMemoryMode
5858
? std::make_unique<ExactGrowthPolicy>()
5959
: this->options_.inputGrowthPolicyFactory();
60+
stringBufferGrowthPolicy_ = this->options_.lowMemoryMode
61+
? std::make_unique<ExactGrowthPolicy>()
62+
: this->options_.stringBufferGrowthPolicyFactory();
6063
ignoreTopLevelNulls_ = options_.ignoreTopLevelNulls;
64+
disableSharedStringBuffers_ = options_.disableSharedStringBuffers;
6165
}
6266

6367
const VeloxWriterOptions& options() const {
@@ -991,6 +995,7 @@ void VeloxWriter::processStream(
991995
encodeStream(nullsStreamData, streamSize, chunkSize);
992996
}
993997
} else {
998+
streamData.materialize();
994999
if (!streamData.data().empty()) {
9951000
encodeStream(streamData, streamSize, chunkSize);
9961001
}

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,14 @@ struct VeloxWriterOptions {
142142
return DefaultInputBufferGrowthPolicy::withDefaultRanges();
143143
};
144144

145+
// When per-stream string buffers are enabled (disableSharedStringBuffers),
146+
// this policy controls how the string buffer vectors grow.
147+
std::function<std::unique_ptr<InputBufferGrowthPolicy>()>
148+
stringBufferGrowthPolicyFactory =
149+
[]() -> std::unique_ptr<InputBufferGrowthPolicy> {
150+
return DefaultInputBufferGrowthPolicy::withStringBufferRanges();
151+
};
152+
145153
std::function<std::unique_ptr<velox::memory::MemoryReclaimer>()>
146154
reclaimerFactory = []() { return nullptr; };
147155

@@ -178,6 +186,10 @@ struct VeloxWriterOptions {
178186
bool ignoreTopLevelNulls{false};
179187

180188
bool enableStreamDeduplication{true};
189+
190+
// When true, string fields use per-field buffers instead of a shared buffer.
191+
// This enables incremental memory reclamation during chunking.
192+
bool disableSharedStringBuffers{false};
181193
};
182194

183195
} // namespace facebook::nimble

dwio/nimble/velox/stats/ColumnStatsUtils.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,30 @@
1818
#include "velox/dwio/common/Range.h"
1919

2020
namespace facebook::nimble {
21-
uint64_t getRawSizeFromVector(
22-
const velox::VectorPtr& vector,
23-
const OrderedRanges& ranges,
24-
RawSizeContext& context) {
21+
22+
namespace {
23+
velox::common::Ranges toVeloxRanges(const OrderedRanges& ranges) {
2524
velox::common::Ranges veloxRanges;
2625
for (auto& range : ranges.ranges()) {
2726
veloxRanges.add(
2827
std::get<0>(range), std::get<0>(range) + std::get<1>(range));
2928
}
30-
return getRawSizeFromVector(vector, veloxRanges, context);
31-
};
29+
return veloxRanges;
30+
}
31+
} // namespace
32+
33+
uint64_t getRawSizeFromVector(
34+
const velox::VectorPtr& vector,
35+
const OrderedRanges& ranges,
36+
RawSizeContext& context) {
37+
return getRawSizeFromVector(vector, toVeloxRanges(ranges), context);
38+
}
39+
40+
uint64_t getRawSizeFromVector(
41+
const velox::VectorPtr& vector,
42+
const OrderedRanges& ranges) {
43+
return getRawSizeFromVector(vector, toVeloxRanges(ranges));
44+
}
3245

3346
void aggregateStats(
3447
const TypeBuilder& builder,

dwio/nimble/velox/stats/ColumnStatsUtils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ uint64_t getRawSizeFromVector(
3939
const OrderedRanges& ranges,
4040
RawSizeContext& context);
4141

42+
uint64_t getRawSizeFromVector(
43+
const velox::VectorPtr& vector,
44+
const OrderedRanges& ranges);
45+
4246
void aggregateStats(
4347
const TypeBuilder& builder,
4448
std::unordered_map<offset_size, ColumnStats>& columnStats,

dwio/nimble/velox/tests/FieldWriterStatsTests.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class FieldWriterStatsTests : public ::testing::Test {
6666
rowType = input->type();
6767
}
6868
options.enableChunking = true;
69+
options.disableSharedStringBuffers = true;
6970
nimble::VeloxWriter writer(
7071
rowType, std::move(writeFile), *rootPool_, options);
7172
writer.write(input);

0 commit comments

Comments
 (0)