Skip to content

Commit ec3f495

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Velox Writer (facebookincubator#392)
Summary: Integrates the per-stream string buffer infrastructure (from D89933055) into the Velox Writer, enabling string fields to use per-field buffers instead of a shared global buffer. This architectural change enables incremental memory reclamation during chunking operations, improving memory efficiency for string-heavy workloads. The feature is controlled by a global [JustKnob](https://www.internalfb.com/intern/justknobs/?name=dwio%2Fnimble%3Adisable_shared_string_buffers). When the knob returns true, per-stream string buffers are enabled. When set to false (default), the existing shared buffer behavior is preserved. Reviewed By: helfman Differential Revision: D89938211
1 parent 8b90fcb commit ec3f495

File tree

9 files changed

+161
-15
lines changed

9 files changed

+161
-15
lines changed

dwio/nimble/stats/ColumnStatsUtils.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,30 @@
1818
#include "velox/dwio/common/Range.h"
1919

2020
namespace facebook::nimble {
21-
uint64_t getRawSizeFromVector(
22-
const velox::VectorPtr& vector,
23-
const OrderedRanges& ranges,
24-
RawSizeContext& context) {
21+
22+
namespace {
23+
velox::common::Ranges toVeloxRanges(const OrderedRanges& ranges) {
2524
velox::common::Ranges veloxRanges;
2625
for (auto& range : ranges.ranges()) {
2726
veloxRanges.add(
2827
std::get<0>(range), std::get<0>(range) + std::get<1>(range));
2928
}
30-
return getRawSizeFromVector(vector, veloxRanges, context);
31-
};
29+
return veloxRanges;
30+
}
31+
} // namespace
32+
33+
uint64_t getRawSizeFromVector(
34+
const velox::VectorPtr& vector,
35+
const OrderedRanges& ranges,
36+
RawSizeContext& context) {
37+
return getRawSizeFromVector(vector, toVeloxRanges(ranges), context);
38+
}
39+
40+
uint64_t getRawSizeFromVector(
41+
const velox::VectorPtr& vector,
42+
const OrderedRanges& ranges) {
43+
return getRawSizeFromVector(vector, toVeloxRanges(ranges));
44+
}
3245

3346
void aggregateStats(
3447
const TypeBuilder& builder,

dwio/nimble/stats/ColumnStatsUtils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ uint64_t getRawSizeFromVector(
3939
const OrderedRanges& ranges,
4040
RawSizeContext& context);
4141

42+
uint64_t getRawSizeFromVector(
43+
const velox::VectorPtr& vector,
44+
const OrderedRanges& ranges);
45+
4246
void aggregateStats(
4347
const TypeBuilder& builder,
4448
std::unordered_map<offset_size, ColumnStats>& columnStats,

dwio/nimble/velox/BufferGrowthPolicy.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ class DefaultInputBufferGrowthPolicy : public InputBufferGrowthPolicy {
5656
{32UL, 4.0}, {512UL, 1.414}, {4096UL, 1.189}});
5757
}
5858

59+
// Growth policy for per-stream string buffers when disableSharedStringBuffers
60+
// is enabled. Uses byte-based thresholds since string data is stored as raw
61+
// bytes.
62+
static std::unique_ptr<InputBufferGrowthPolicy> withStringBufferRanges() {
63+
return std::make_unique<DefaultInputBufferGrowthPolicy>(
64+
std::map<uint64_t, float>{
65+
{4096UL, 2.0}, {4194304UL, 1.414}, {12582912UL, 1.189}});
66+
}
67+
5968
private:
6069
// Map of range lowerbounds and the growth factor for the range.
6170
// The first lowerbound is the smallest unit of allocation and the last range

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ class SimpleFieldWriter : public FieldWriter {
338338
nullCount = size - nonNullCount;
339339
}
340340

341+
// NOTE: This logic is wrong. Will be removed with new stats changes.
341342
columnStats_.logicalSize += nullCount +
342343
((K == velox::TypeKind::VARCHAR || K == velox::TypeKind::VARBINARY)
343344
? valuesStream_.extraMemory()
@@ -355,6 +356,76 @@ class SimpleFieldWriter : public FieldWriter {
355356
ColumnStats& columnStats_;
356357
};
357358

359+
template <velox::TypeKind K>
360+
class StringFieldWriter : public FieldWriter {
361+
using SourceType = typename velox::TypeTraits<K>::NativeType;
362+
363+
public:
364+
explicit StringFieldWriter(FieldWriterContext& context)
365+
: FieldWriter(
366+
context,
367+
context.schemaBuilder().createScalarTypeBuilder(
368+
NimbleTypeTraits<K>::scalarKind)),
369+
valuesStream_{context.createNullableContentStringStreamData(
370+
typeBuilder_->asScalar().scalarDescriptor())},
371+
columnStats_{context.columnStats(valuesStream_.descriptor().offset())} {
372+
}
373+
374+
void write(
375+
const velox::VectorPtr& vector,
376+
const OrderedRanges& ranges,
377+
folly::Executor*) override {
378+
// Ensure string buffer capacity.
379+
auto size = ranges.size();
380+
uint64_t totalBytes = getRawSizeFromVector(vector, ranges);
381+
valuesStream_.ensureStringBufferCapacity(size, totalBytes);
382+
383+
// Append to string buffer.
384+
uint64_t memoryUsed = 0;
385+
auto appendToStringBuffer = [&](SourceType sv) {
386+
memoryUsed += sv.size();
387+
auto stringBuffer = valuesStream_.mutableData();
388+
auto& buffer = stringBuffer.buffer;
389+
buffer.insert(buffer.end(), sv.begin(), sv.end());
390+
auto& mutableLengths = stringBuffer.lengths;
391+
mutableLengths.push_back(sv.size());
392+
};
393+
394+
uint64_t nonNullCount = 0;
395+
if (auto flat = vector->asFlatVector<SourceType>()) {
396+
valuesStream_.ensureAdditionalNullsCapacity(flat->mayHaveNulls(), size);
397+
nonNullCount = iterateNonNullValues(
398+
ranges,
399+
valuesStream_.mutableNonNulls(),
400+
Flat<SourceType>{vector},
401+
appendToStringBuffer);
402+
} else {
403+
auto decodingContext = context_.decodingContext();
404+
auto& decoded = decodingContext.decode(vector, ranges);
405+
valuesStream_.ensureAdditionalNullsCapacity(decoded.mayHaveNulls(), size);
406+
nonNullCount = iterateNonNullValues(
407+
ranges,
408+
valuesStream_.mutableNonNulls(),
409+
Decoded<SourceType>{decoded},
410+
appendToStringBuffer);
411+
}
412+
uint64_t nullCount = size - nonNullCount;
413+
414+
// TODO: Validate that this logic is correct.
415+
columnStats_.logicalSize += nullCount + memoryUsed;
416+
columnStats_.nullCount += nullCount;
417+
columnStats_.valueCount += size;
418+
}
419+
420+
void reset() override {
421+
valuesStream_.reset();
422+
}
423+
424+
private:
425+
NullableContentStringStreamData& valuesStream_;
426+
ColumnStats& columnStats_;
427+
};
428+
358429
class TimestampFieldWriter : public FieldWriter {
359430
public:
360431
explicit TimestampFieldWriter(FieldWriterContext& context)
@@ -1964,15 +2035,25 @@ std::unique_ptr<FieldWriter> FieldWriter::create(
19642035
break;
19652036
}
19662037
case velox::TypeKind::VARCHAR: {
1967-
field = std::make_unique<
1968-
SimpleFieldWriter<velox::TypeKind::VARCHAR, StringConverter>>(
1969-
context);
2038+
if (context.disableSharedStringBuffers()) {
2039+
field = std::make_unique<StringFieldWriter<velox::TypeKind::VARCHAR>>(
2040+
context);
2041+
} else {
2042+
field = std::make_unique<
2043+
SimpleFieldWriter<velox::TypeKind::VARCHAR, StringConverter>>(
2044+
context);
2045+
}
19702046
break;
19712047
}
19722048
case velox::TypeKind::VARBINARY: {
1973-
field = std::make_unique<
1974-
SimpleFieldWriter<velox::TypeKind::VARBINARY, StringConverter>>(
1975-
context);
2049+
if (context.disableSharedStringBuffers()) {
2050+
field = std::make_unique<StringFieldWriter<velox::TypeKind::VARBINARY>>(
2051+
context);
2052+
} else {
2053+
field = std::make_unique<
2054+
SimpleFieldWriter<velox::TypeKind::VARBINARY, StringConverter>>(
2055+
context);
2056+
}
19762057
break;
19772058
}
19782059
case velox::TypeKind::TIMESTAMP: {

dwio/nimble/velox/FieldWriter.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,22 @@ class FieldWriterContext {
296296
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_)));
297297
}
298298

299+
nimble::NullableContentStringStreamData&
300+
createNullableContentStringStreamData(
301+
const nimble::StreamDescriptorBuilder& descriptor) {
302+
return static_cast<nimble::NullableContentStringStreamData&>(
303+
*streams_.emplace_back(
304+
std::make_unique<nimble::NullableContentStringStreamData>(
305+
*bufferMemoryPool_,
306+
descriptor,
307+
*inputBufferGrowthPolicy_,
308+
*stringBufferGrowthPolicy_)));
309+
}
310+
311+
inline bool disableSharedStringBuffers() const {
312+
return disableSharedStringBuffers_;
313+
}
314+
299315
protected:
300316
MemoryPoolHolder bufferMemoryPool_;
301317
std::mutex flatMapSchemaMutex_;
@@ -305,8 +321,10 @@ class FieldWriterContext {
305321
folly::F14FastSet<uint32_t> dictionaryArrayNodeIds_;
306322
folly::F14FastSet<uint32_t> deduplicatedMapNodeIds_;
307323
bool ignoreTopLevelNulls_{false};
324+
bool disableSharedStringBuffers_{false};
308325

309326
std::unique_ptr<InputBufferGrowthPolicy> inputBufferGrowthPolicy_;
327+
std::unique_ptr<InputBufferGrowthPolicy> stringBufferGrowthPolicy_;
310328
InputBufferGrowthStats inputBufferGrowthStats_;
311329

312330
std::unordered_map<offset_size, ColumnStats> columnStats_;

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ class WriterContext : public FieldWriterContext {
5757
inputBufferGrowthPolicy_ = this->options_.lowMemoryMode
5858
? std::make_unique<ExactGrowthPolicy>()
5959
: this->options_.inputGrowthPolicyFactory();
60+
stringBufferGrowthPolicy_ = this->options_.lowMemoryMode
61+
? std::make_unique<ExactGrowthPolicy>()
62+
: this->options_.stringBufferGrowthPolicyFactory();
6063
ignoreTopLevelNulls_ = options_.ignoreTopLevelNulls;
64+
disableSharedStringBuffers_ = options_.disableSharedStringBuffers;
6165
}
6266

6367
const VeloxWriterOptions& options() const {
@@ -991,6 +995,7 @@ void VeloxWriter::processStream(
991995
encodeStream(nullsStreamData, streamSize, chunkSize);
992996
}
993997
} else {
998+
streamData.materialize();
994999
if (!streamData.data().empty()) {
9951000
encodeStream(streamData, streamSize, chunkSize);
9961001
}

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,14 @@ struct VeloxWriterOptions {
142142
return DefaultInputBufferGrowthPolicy::withDefaultRanges();
143143
};
144144

145+
// When per-stream string buffers are enabled (disableSharedStringBuffers),
146+
// this policy controls how the string buffer vectors grow.
147+
std::function<std::unique_ptr<InputBufferGrowthPolicy>()>
148+
stringBufferGrowthPolicyFactory =
149+
[]() -> std::unique_ptr<InputBufferGrowthPolicy> {
150+
return DefaultInputBufferGrowthPolicy::withStringBufferRanges();
151+
};
152+
145153
std::function<std::unique_ptr<velox::memory::MemoryReclaimer>()>
146154
reclaimerFactory = []() { return nullptr; };
147155

@@ -178,6 +186,10 @@ struct VeloxWriterOptions {
178186
bool ignoreTopLevelNulls{false};
179187

180188
bool enableStreamDeduplication{true};
189+
190+
// When true, string fields use per-field buffers instead of a shared buffer.
191+
// This enables incremental memory reclamation during chunking.
192+
bool disableSharedStringBuffers{false};
181193
};
182194

183195
} // namespace facebook::nimble

dwio/nimble/velox/tests/FieldWriterStatsTests.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class FieldWriterStatsTests : public ::testing::Test {
6666
rowType = input->type();
6767
}
6868
options.enableChunking = true;
69+
options.disableSharedStringBuffers = true;
6970
nimble::VeloxWriter writer(
7071
rowType, std::move(writeFile), *rootPool_, options);
7172
writer.write(input);

dwio/nimble/velox/tests/VeloxWriterTest.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2184,7 +2184,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
21842184

21852185
TEST_F(VeloxWriterTest, fuzzComplex) {
21862186
auto type = velox::ROW(
2187-
{{"array", velox::ARRAY(velox::REAL())},
2187+
{{"array", velox::ARRAY(velox::VARCHAR())},
21882188
{"dict_array", velox::ARRAY(velox::REAL())},
21892189
{"map", velox::MAP(velox::INTEGER(), velox::DOUBLE())},
21902190
{"row",
@@ -2204,13 +2204,14 @@ TEST_F(VeloxWriterTest, fuzzComplex) {
22042204
velox::ARRAY(
22052205
velox::ROW({
22062206
{"a", velox::INTEGER()},
2207-
{"b", velox::MAP(velox::REAL(), velox::REAL())},
2207+
{"b", velox::MAP(velox::REAL(), velox::VARBINARY())},
22082208
}))},
22092209
{"nested_map_array1",
22102210
velox::MAP(velox::INTEGER(), velox::ARRAY(velox::REAL()))},
22112211
{"nested_map_array2",
22122212
velox::MAP(velox::INTEGER(), velox::ARRAY(velox::INTEGER()))},
22132213
{"dict_map", velox::MAP(velox::INTEGER(), velox::INTEGER())}});
2214+
22142215
auto rowType = std::dynamic_pointer_cast<const velox::RowType>(type);
22152216
const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed
22162217
: folly::Random::rand32();
@@ -2220,6 +2221,7 @@ TEST_F(VeloxWriterTest, fuzzComplex) {
22202221
std::shared_ptr<folly::CPUThreadPoolExecutor> executor;
22212222
nimble::VeloxWriterOptions writerOptions;
22222223
writerOptions.enableChunking = true;
2224+
writerOptions.disableSharedStringBuffers = true;
22232225
writerOptions.flushPolicyFactory =
22242226
[]() -> std::unique_ptr<nimble::FlushPolicy> {
22252227
return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2302,7 +2304,7 @@ TEST_F(VeloxWriterTest, batchedChunkingRelievesMemoryPressure) {
23022304
ranges.add(0, rowCount);
23032305
const uint64_t stringColumnRawSize =
23042306
nimble::getRawSizeFromVector(stringColumn, ranges, context) +
2305-
sizeof(std::string_view) * rowCount;
2307+
sizeof(uint64_t) * rowCount;
23062308
const uint64_t intColumnRawSize =
23072309
nimble::getRawSizeFromVector(intColumn, ranges, context);
23082310

@@ -2334,6 +2336,7 @@ TEST_F(VeloxWriterTest, batchedChunkingRelievesMemoryPressure) {
23342336
nimble::VeloxWriterOptions writerOptions;
23352337
writerOptions.chunkedStreamBatchSize = kBatchSize;
23362338
writerOptions.enableChunking = true;
2339+
writerOptions.disableSharedStringBuffers = true;
23372340
writerOptions.minStreamChunkRawSize = minChunkSize;
23382341
writerOptions.maxStreamChunkRawSize = maxChunkSize;
23392342
writerOptions.flushPolicyFactory =

0 commit comments

Comments
 (0)