Skip to content

Commit f27fada

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Writer (facebookincubator#415)
Summary: This change enables Nimble to handle string data with per-stream buffers rather than a shared buffer, improving memory locality and enabling independent chunking of string columns. It follows the existing chunker pattern while adding string-specific optimizations for buffer management. Integration with the Velox Writer happens in the next diff. Reviewed By: helfman Differential Revision: D90411755
1 parent 2118a20 commit f27fada

File tree

5 files changed

+670
-16
lines changed

5 files changed

+670
-16
lines changed

dwio/nimble/velox/StreamChunker.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ std::unique_ptr<StreamChunker> getStreamChunkerTyped(
3737
}
3838
return std::make_unique<NullableContentStreamChunker<T>>(
3939
*nullableContentStreamData, options);
40+
} else if (
41+
auto* nullableContentStringStreamData =
42+
dynamic_cast<NullableContentStringStreamData*>(&streamData)) {
43+
return std::make_unique<NullableContentStringStreamChunker>(
44+
*nullableContentStringStreamData, options);
4045
} else if (
4146
auto* nullsStreamData = dynamic_cast<NullsStreamData*>(&streamData)) {
4247
return std::make_unique<NullsStreamChunker>(*nullsStreamData, options);

dwio/nimble/velox/StreamChunker.h

Lines changed: 206 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
namespace facebook::nimble {
2222
namespace detail {
2323
inline bool shouldOmitDataStream(
24-
const StreamData& streamData,
24+
uint64_t dataVectorSize,
2525
uint64_t minChunkSize,
26+
bool allNulls,
2627
bool isFirstChunk) {
27-
if (streamData.data().size() > minChunkSize) {
28+
if (dataVectorSize > minChunkSize) {
2829
return false;
2930
}
3031
// When all values are null, the values stream is omitted.
31-
return isFirstChunk || streamData.nonNulls().empty();
32+
return isFirstChunk || allNulls;
3233
}
3334

3435
inline bool shouldOmitNullStream(
@@ -184,7 +185,7 @@ class ContentStreamChunker final : public StreamChunker {
184185
}
185186

186187
void compact() override {
187-
// No changes made to stream data, nothing to compact.
188+
// No chunks consumed from stream data, we should not compact.
188189
if (dataElementOffset_ == 0) {
189190
return;
190191
}
@@ -300,7 +301,7 @@ class NullsStreamChunker final : public StreamChunker {
300301

301302
private:
302303
void compact() override {
303-
// No changes made to stream data, nothing to compact.
304+
// No chunks consumed from stream data, we should not compact.
304305
if (nonNullsOffset_ == 0) {
305306
return;
306307
}
@@ -357,8 +358,9 @@ class NullableContentStreamChunker final : public StreamChunker {
357358
minChunkSize_{options.minChunkSize},
358359
maxChunkSize_{options.maxChunkSize},
359360
omitStream_{detail::shouldOmitDataStream(
360-
streamData,
361+
streamData.data().size(),
361362
options.minChunkSize,
363+
streamData.nonNulls().empty(),
362364
options.isFirstChunk)},
363365
ensureFullChunks_{options.ensureFullChunks},
364366
extraMemory_{streamData_->extraMemory()} {
@@ -448,7 +450,7 @@ class NullableContentStreamChunker final : public StreamChunker {
448450
}
449451

450452
void compact() override {
451-
// No changes made to stream data, nothing to compact.
453+
// No chunks consumed from stream data, we should not compact.
452454
if (nonNullsOffset_ == 0) {
453455
return;
454456
}
@@ -573,4 +575,201 @@ NullableContentStreamChunker<std::string_view>::nextChunkSize() {
573575
}
574576
return chunkSize;
575577
}
578+
579+
class NullableContentStringStreamChunker final : public StreamChunker {
580+
public:
581+
explicit NullableContentStringStreamChunker(
582+
NullableContentStringStreamData& streamData,
583+
const StreamChunkerOptions& options)
584+
: streamData_{&streamData},
585+
minChunkSize_{options.minChunkSize},
586+
maxChunkSize_{options.maxChunkSize},
587+
omitStream_{detail::shouldOmitDataStream(
588+
streamData.bufferSize(),
589+
options.minChunkSize,
590+
streamData.nonNulls().empty(),
591+
options.isFirstChunk)},
592+
ensureFullChunks_{options.ensureFullChunks} {
593+
static_assert(sizeof(bool) == 1);
594+
}
595+
596+
std::optional<StreamDataView> next() override {
597+
if (omitStream_) {
598+
return std::nullopt;
599+
}
600+
const auto& chunkSize = nextChunkSize();
601+
if (chunkSize.rollingChunkSize == 0) {
602+
return std::nullopt;
603+
}
604+
605+
// Content
606+
auto& output = streamData_->mutableStringViews();
607+
output.resize(0);
608+
output.reserve(chunkSize.dataElementCount);
609+
auto mutableData = streamData_->mutableData();
610+
auto& mutableLengths = mutableData.lengths;
611+
auto& mutableBuffer = mutableData.buffer;
612+
auto currentBufferOffset = bufferOffset_;
613+
for (size_t i = 0; i < chunkSize.dataElementCount; ++i) {
614+
const auto currentLength = mutableLengths[lengthsOffset_ + i];
615+
output.emplace_back(
616+
std::string_view{
617+
mutableBuffer.data() + currentBufferOffset, currentLength});
618+
currentBufferOffset += currentLength;
619+
}
620+
std::string_view dataChunk = {
621+
reinterpret_cast<const char*>(output.data()),
622+
chunkSize.dataElementCount * sizeof(std::string_view)};
623+
624+
// Nulls
625+
std::span<const bool> nonNullsChunk(
626+
streamData_->mutableNonNulls().data() + nonNullsOffset_,
627+
chunkSize.nullElementCount);
628+
629+
lengthsOffset_ += chunkSize.dataElementCount;
630+
nonNullsOffset_ += chunkSize.nullElementCount;
631+
bufferOffset_ += chunkSize.extraMemory;
632+
633+
if (chunkSize.nullElementCount > chunkSize.dataElementCount) {
634+
return StreamDataView{
635+
streamData_->descriptor(),
636+
dataChunk,
637+
static_cast<uint32_t>(chunkSize.nullElementCount),
638+
nonNullsChunk};
639+
}
640+
NIMBLE_DCHECK_EQ(chunkSize.dataElementCount, chunkSize.nullElementCount);
641+
return StreamDataView{
642+
streamData_->descriptor(),
643+
dataChunk,
644+
static_cast<uint32_t>(chunkSize.dataElementCount)};
645+
}
646+
647+
private:
648+
ChunkSize nextChunkSize() {
649+
const auto& stringLengths = streamData_->mutableData().lengths;
650+
const auto& nonNulls = streamData_->mutableNonNulls();
651+
ChunkSize chunkSize;
652+
bool fullChunk{false};
653+
// Calculate how many entries we can fit in the chunk
654+
for (size_t idx = nonNullsOffset_; idx < nonNulls.size(); ++idx) {
655+
uint64_t currentTotalSize{sizeof(bool)};
656+
uint32_t currentDataCount{0};
657+
size_t currentExtraMemory{0};
658+
if (nonNulls[idx]) {
659+
currentExtraMemory =
660+
stringLengths[lengthsOffset_ + chunkSize.dataElementCount];
661+
currentTotalSize += currentExtraMemory + sizeof(uint64_t);
662+
++currentDataCount;
663+
}
664+
665+
if (chunkSize.rollingChunkSize == 0 && currentTotalSize > maxChunkSize_) {
666+
// Allow a single oversized string as its own chunk.
667+
fullChunk = true;
668+
chunkSize.extraMemory += currentExtraMemory;
669+
chunkSize.dataElementCount += currentDataCount;
670+
chunkSize.rollingChunkSize += currentTotalSize;
671+
++chunkSize.nullElementCount;
672+
break;
673+
}
674+
675+
if (chunkSize.rollingChunkSize + currentTotalSize > maxChunkSize_) {
676+
fullChunk = true;
677+
break;
678+
}
679+
680+
chunkSize.extraMemory += currentExtraMemory;
681+
chunkSize.dataElementCount += currentDataCount;
682+
chunkSize.rollingChunkSize += currentTotalSize;
683+
++chunkSize.nullElementCount;
684+
}
685+
686+
fullChunk = fullChunk || (chunkSize.rollingChunkSize == maxChunkSize_);
687+
if ((ensureFullChunks_ && !fullChunk) ||
688+
(chunkSize.rollingChunkSize < minChunkSize_)) {
689+
chunkSize = ChunkSize{};
690+
}
691+
return chunkSize;
692+
}
693+
694+
void compact() override {
695+
// Clear existing outputvector before beginning compaction.
696+
streamData_->mutableStringViews().clear();
697+
698+
// No chunks consumed from stream data, we should not compact.
699+
if (nonNullsOffset_ == 0) {
700+
return;
701+
}
702+
703+
const bool hasNulls = streamData_->hasNulls();
704+
// Move and clear existing buffers
705+
auto tempBuffer = std::move(streamData_->mutableData().buffer);
706+
auto tempLengths = std::move(streamData_->mutableData().lengths);
707+
auto tempNonNulls = std::move(streamData_->mutableNonNulls());
708+
streamData_->reset();
709+
NIMBLE_DCHECK(
710+
streamData_->empty(), "StreamData should be empty after reset");
711+
712+
{
713+
const auto remainingDataCount = tempLengths.size() - lengthsOffset_;
714+
auto& mutableDataLength = streamData_->mutableData().lengths;
715+
mutableDataLength.resize(remainingDataCount);
716+
NIMBLE_DCHECK_EQ(
717+
mutableDataLength.size(),
718+
remainingDataCount,
719+
"Data length size should be equal to remaining data count");
720+
721+
std::copy_n(
722+
tempLengths.begin() + lengthsOffset_,
723+
remainingDataCount,
724+
mutableDataLength.begin());
725+
}
726+
727+
{
728+
const auto remainingDataBytes = tempBuffer.size() - bufferOffset_;
729+
auto& mutableDataBuffer = streamData_->mutableData().buffer;
730+
mutableDataBuffer.resize(remainingDataBytes);
731+
NIMBLE_DCHECK_EQ(
732+
mutableDataBuffer.size(),
733+
remainingDataBytes,
734+
"Data buffer size should be equal to remaining data bytes");
735+
736+
std::copy_n(
737+
tempBuffer.begin() + bufferOffset_,
738+
remainingDataBytes,
739+
mutableDataBuffer.begin());
740+
}
741+
742+
{
743+
auto& mutableNonNulls = streamData_->mutableNonNulls();
744+
const auto remainingNonNullsCount = tempNonNulls.size() - nonNullsOffset_;
745+
streamData_->ensureAdditionalNullsCapacity(
746+
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
747+
if (hasNulls) {
748+
mutableNonNulls.resize(remainingNonNullsCount);
749+
NIMBLE_DCHECK_EQ(
750+
mutableNonNulls.size(),
751+
remainingNonNullsCount,
752+
"NonNulls buffer size should be equal to remaining non-nulls count");
753+
754+
std::copy_n(
755+
tempNonNulls.begin() + nonNullsOffset_,
756+
remainingNonNullsCount,
757+
mutableNonNulls.begin());
758+
}
759+
}
760+
lengthsOffset_ = 0;
761+
nonNullsOffset_ = 0;
762+
bufferOffset_ = 0;
763+
}
764+
765+
NullableContentStringStreamData* const streamData_;
766+
const uint64_t minChunkSize_;
767+
const uint64_t maxChunkSize_;
768+
const bool omitStream_;
769+
const bool ensureFullChunks_;
770+
771+
size_t lengthsOffset_{0};
772+
size_t nonNullsOffset_{0};
773+
uint64_t bufferOffset_{0};
774+
};
576775
} // namespace facebook::nimble

dwio/nimble/velox/StreamData.h

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,113 @@ class NullableContentStreamData final : public NullsStreamData {
325325
uint64_t extraMemory_;
326326
};
327327

328+
struct StringBuffer {
329+
Vector<char>& buffer;
330+
Vector<size_t>& lengths;
331+
};
332+
333+
class NullableContentStringStreamData final : public NullsStreamData {
334+
public:
335+
NullableContentStringStreamData(
336+
velox::memory::MemoryPool& memoryPool,
337+
const StreamDescriptorBuilder& descriptor,
338+
const InputBufferGrowthPolicy& lengthsGrowthPolicy,
339+
const InputBufferGrowthPolicy& bufferGrowthPolicy)
340+
: NullsStreamData(memoryPool, descriptor, lengthsGrowthPolicy),
341+
buffer_{&memoryPool},
342+
lengths_{&memoryPool},
343+
stringViews_{&memoryPool},
344+
bufferGrowthPolicy_{bufferGrowthPolicy},
345+
lengthGrowthPolicy_{lengthsGrowthPolicy} {}
346+
347+
inline std::string_view data() const override {
348+
NIMBLE_DCHECK(materialized_, "Data must be materialized before access");
349+
return std::string_view{
350+
reinterpret_cast<const char*>(stringViews_.data()),
351+
stringViews_.size() * sizeof(std::string_view)};
352+
}
353+
354+
inline bool empty() const override {
355+
return NullsStreamData::empty() && lengths_.empty();
356+
}
357+
358+
inline uint64_t memoryUsed() const override {
359+
return NullsStreamData::memoryUsed() + buffer_.size() +
360+
lengths_.size() * sizeof(uint64_t) +
361+
stringViews_.size() * sizeof(std::string_view);
362+
}
363+
364+
inline uint64_t bufferSize() const {
365+
return lengths_.size() * sizeof(uint64_t) + buffer_.size();
366+
}
367+
368+
inline StringBuffer mutableData() {
369+
// When mutableData() is called, it returns direct references to buffer_ and
370+
// lengths_, allowing external code to modify them. Since dataChunkVector_
371+
// contains string_view objects pointing into buffer_, any buffer
372+
// modifications invalidate these views. Therefore, materialize() must be
373+
// called before the next access to rebuild dataChunkVector_ with valid
374+
// string_view references to the updated buffer content.
375+
materialized_ = false;
376+
return {buffer_, lengths_};
377+
}
378+
379+
inline Vector<std::string_view>& mutableStringViews() {
380+
materialized_ = false;
381+
return stringViews_;
382+
}
383+
384+
void materialize() override {
385+
stringViews_.resize(lengths_.size());
386+
auto runningOffset = 0;
387+
for (auto i = 0; i < lengths_.size(); ++i) {
388+
stringViews_[i] =
389+
std::string_view(buffer_.data() + runningOffset, lengths_[i]);
390+
runningOffset += lengths_[i];
391+
}
392+
NullsStreamData::materialize();
393+
materialized_ = true;
394+
}
395+
396+
void ensureStringBufferCapacity(
397+
uint64_t additionalStrings,
398+
uint64_t additionalBytes) {
399+
if (additionalStrings == 0) {
400+
return;
401+
}
402+
403+
const auto newLengthsSize = lengths_.size() + additionalStrings;
404+
if (newLengthsSize > lengths_.capacity()) {
405+
const auto newCapacity = lengthGrowthPolicy_.getExtendedCapacity(
406+
newLengthsSize, lengths_.capacity());
407+
lengths_.reserve(newCapacity);
408+
}
409+
410+
if (additionalBytes > 0) {
411+
const auto newBufferSize = buffer_.size() + additionalBytes;
412+
if (newBufferSize > buffer_.capacity()) {
413+
const auto newCapacity = bufferGrowthPolicy_.getExtendedCapacity(
414+
newBufferSize, buffer_.capacity());
415+
buffer_.reserve(newCapacity);
416+
}
417+
}
418+
}
419+
420+
inline void reset() override {
421+
NullsStreamData::reset();
422+
buffer_.clear();
423+
lengths_.clear();
424+
stringViews_.clear();
425+
}
426+
427+
private:
428+
Vector<char> buffer_;
429+
Vector<uint64_t> lengths_;
430+
Vector<std::string_view> stringViews_;
431+
const InputBufferGrowthPolicy& bufferGrowthPolicy_;
432+
const InputBufferGrowthPolicy& lengthGrowthPolicy_;
433+
434+
bool materialized_{false};
435+
};
436+
328437
} // namespace facebook::nimble

0 commit comments

Comments
 (0)