Skip to content

Commit 380355b

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream String Buffers in Writer (facebookincubator#415)
Summary: This change enables Nimble to handle string data with per-stream buffers rather than a shared buffer, improving memory locality and enabling independent chunking of string columns. It follows the existing chunker pattern while adding string-specific optimizations for buffer management. Integration with the Velox Writer happens in the next diff. Differential Revision: D90411755
1 parent b5ba120 commit 380355b

File tree

5 files changed

+673
-13
lines changed

5 files changed

+673
-13
lines changed

dwio/nimble/velox/StreamChunker.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ std::unique_ptr<StreamChunker> getStreamChunkerTyped(
3737
}
3838
return std::make_unique<NullableContentStreamChunker<T>>(
3939
*nullableContentStreamData, options);
40+
} else if (
41+
auto* nullableContentStringStreamData =
42+
dynamic_cast<NullableContentStringStreamData*>(&streamData)) {
43+
return std::make_unique<NullableContentStringStreamChunker>(
44+
*nullableContentStringStreamData, options);
4045
} else if (
4146
auto* nullsStreamData = dynamic_cast<NullsStreamData*>(&streamData)) {
4247
return std::make_unique<NullsStreamChunker>(*nullsStreamData, options);

dwio/nimble/velox/StreamChunker.h

Lines changed: 206 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
namespace facebook::nimble {
2222
namespace detail {
2323
inline bool shouldOmitDataStream(
24-
const StreamData& streamData,
24+
uint64_t dataVectorSize,
2525
uint64_t minChunkSize,
26+
bool allNulls,
2627
bool isFirstChunk) {
27-
if (streamData.data().size() > minChunkSize) {
28+
if (dataVectorSize > minChunkSize) {
2829
return false;
2930
}
3031
// When all values are null, the values stream is omitted.
31-
return isFirstChunk || streamData.nonNulls().empty();
32+
return isFirstChunk || allNulls;
3233
}
3334

3435
inline bool shouldOmitNullStream(
@@ -357,8 +358,9 @@ class NullableContentStreamChunker final : public StreamChunker {
357358
minChunkSize_{options.minChunkSize},
358359
maxChunkSize_{options.maxChunkSize},
359360
omitStream_{detail::shouldOmitDataStream(
360-
streamData,
361+
streamData.data().size(),
361362
options.minChunkSize,
363+
streamData.nonNulls().empty(),
362364
options.isFirstChunk)},
363365
ensureFullChunks_{options.ensureFullChunks},
364366
extraMemory_{streamData_->extraMemory()} {
@@ -573,4 +575,204 @@ NullableContentStreamChunker<std::string_view>::nextChunkSize() {
573575
}
574576
return chunkSize;
575577
}
578+
579+
class NullableContentStringStreamChunker final : public StreamChunker {
580+
public:
581+
explicit NullableContentStringStreamChunker(
582+
NullableContentStringStreamData& streamData,
583+
const StreamChunkerOptions& options)
584+
: streamData_{&streamData},
585+
minChunkSize_{options.minChunkSize},
586+
maxChunkSize_{options.maxChunkSize},
587+
omitStream_{detail::shouldOmitDataStream(
588+
streamData.bufferSize(),
589+
options.minChunkSize,
590+
streamData.nonNulls().empty(),
591+
options.isFirstChunk)},
592+
ensureFullChunks_{options.ensureFullChunks} {
593+
static_assert(sizeof(bool) == 1);
594+
streamData.materialize();
595+
}
596+
597+
std::optional<StreamDataView> next() override {
598+
if (omitStream_) {
599+
return std::nullopt;
600+
}
601+
const auto& chunkSize = nextChunkSize();
602+
if (chunkSize.rollingChunkSize == 0) {
603+
return std::nullopt;
604+
}
605+
606+
// Content
607+
auto& dataChunkVector = streamData_->mutableStringViews();
608+
dataChunkVector.resize(0);
609+
dataChunkVector.reserve(chunkSize.dataElementCount);
610+
auto mutableData = streamData_->mutableData();
611+
auto& mutableLengths = mutableData.lengths;
612+
auto& mutableBuffer = mutableData.buffer;
613+
auto runningOffset = extraMemoryOffset_;
614+
auto endOffset = dataElementOffset_ + chunkSize.dataElementCount;
615+
for (size_t i = dataElementOffset_; i < endOffset; ++i) {
616+
dataChunkVector.emplace_back(
617+
std::string_view{
618+
mutableBuffer.data() + runningOffset, mutableLengths[i]});
619+
runningOffset += mutableLengths[i];
620+
}
621+
std::string_view dataChunk = {
622+
reinterpret_cast<const char*>(dataChunkVector.data()),
623+
chunkSize.dataElementCount * sizeof(std::string_view)};
624+
625+
// Nulls
626+
std::span<const bool> nonNullsChunk(
627+
streamData_->mutableNonNulls().data() + nonNullsOffset_,
628+
chunkSize.nullElementCount);
629+
630+
dataElementOffset_ += chunkSize.dataElementCount;
631+
nonNullsOffset_ += chunkSize.nullElementCount;
632+
extraMemoryOffset_ += chunkSize.extraMemory;
633+
634+
if (chunkSize.nullElementCount > chunkSize.dataElementCount) {
635+
return StreamDataView{
636+
streamData_->descriptor(),
637+
dataChunk,
638+
static_cast<uint32_t>(chunkSize.nullElementCount),
639+
nonNullsChunk};
640+
}
641+
NIMBLE_CHECK_EQ(chunkSize.dataElementCount, chunkSize.nullElementCount);
642+
return StreamDataView{
643+
streamData_->descriptor(),
644+
dataChunk,
645+
static_cast<uint32_t>(chunkSize.dataElementCount)};
646+
}
647+
648+
private:
649+
ChunkSize nextChunkSize() {
650+
const auto& stringLengths = streamData_->mutableData().lengths;
651+
const auto& nonNulls = streamData_->mutableNonNulls();
652+
const auto bufferedCount = nonNulls.size();
653+
ChunkSize chunkSize;
654+
bool fullChunk{false};
655+
// Calculate how many entries we can fit in the chunk
656+
for (size_t nonNullsIndex = nonNullsOffset_; nonNullsIndex < bufferedCount;
657+
++nonNullsIndex) {
658+
uint64_t elementTotalSize{sizeof(bool)};
659+
uint32_t elementDataCount{0};
660+
size_t elementExtraMemory{0};
661+
if (nonNulls[nonNullsIndex]) {
662+
elementExtraMemory =
663+
stringLengths[dataElementOffset_ + chunkSize.dataElementCount];
664+
elementTotalSize += elementExtraMemory + sizeof(uint64_t);
665+
++elementDataCount;
666+
}
667+
668+
if (chunkSize.rollingChunkSize == 0 && elementTotalSize > maxChunkSize_) {
669+
// Allow a single oversized string as its own chunk.
670+
fullChunk = true;
671+
chunkSize.extraMemory += elementExtraMemory;
672+
chunkSize.dataElementCount += elementDataCount;
673+
chunkSize.rollingChunkSize += elementTotalSize;
674+
++chunkSize.nullElementCount;
675+
break;
676+
}
677+
678+
if (chunkSize.rollingChunkSize + elementTotalSize > maxChunkSize_) {
679+
fullChunk = true;
680+
break;
681+
}
682+
683+
chunkSize.extraMemory += elementExtraMemory;
684+
chunkSize.dataElementCount += elementDataCount;
685+
chunkSize.rollingChunkSize += elementTotalSize;
686+
++chunkSize.nullElementCount;
687+
}
688+
689+
fullChunk = fullChunk || (chunkSize.rollingChunkSize == maxChunkSize_);
690+
if ((ensureFullChunks_ && !fullChunk) ||
691+
(chunkSize.rollingChunkSize < minChunkSize_)) {
692+
chunkSize = ChunkSize{};
693+
}
694+
return chunkSize;
695+
}
696+
697+
void compact() override {
698+
// Clear existing outputvector before beginning compaction.
699+
streamData_->mutableStringViews().clear();
700+
701+
// No changes made to stream data, nothing to compact.
702+
if (nonNullsOffset_ == 0) {
703+
return;
704+
}
705+
706+
const bool hasNulls = streamData_->hasNulls();
707+
// Move and clear existing buffers
708+
auto tempBuffer = std::move(streamData_->mutableData().buffer);
709+
auto tempLengths = std::move(streamData_->mutableData().lengths);
710+
auto tempNonNulls = std::move(streamData_->mutableNonNulls());
711+
streamData_->reset();
712+
NIMBLE_CHECK(
713+
streamData_->empty(), "StreamData should be empty after reset");
714+
715+
{
716+
const auto remainingDataCount = tempLengths.size() - dataElementOffset_;
717+
auto& mutableDataLength = streamData_->mutableData().lengths;
718+
mutableDataLength.resize(remainingDataCount);
719+
NIMBLE_DCHECK_EQ(
720+
mutableDataLength.size(),
721+
remainingDataCount,
722+
"Data length size should be equal to remaining data count");
723+
724+
std::copy_n(
725+
tempLengths.begin() + dataElementOffset_,
726+
remainingDataCount,
727+
mutableDataLength.begin());
728+
}
729+
730+
{
731+
const auto remainingDataBytes = tempBuffer.size() - extraMemoryOffset_;
732+
auto& mutableDataBuffer = streamData_->mutableData().buffer;
733+
mutableDataBuffer.resize(remainingDataBytes);
734+
NIMBLE_DCHECK_EQ(
735+
mutableDataBuffer.size(),
736+
remainingDataBytes,
737+
"Data buffer size should be equal to remaining data bytes");
738+
739+
std::copy_n(
740+
tempBuffer.begin() + extraMemoryOffset_,
741+
remainingDataBytes,
742+
mutableDataBuffer.begin());
743+
}
744+
745+
{
746+
auto& mutableNonNulls = streamData_->mutableNonNulls();
747+
const auto remainingNonNullsCount = tempNonNulls.size() - nonNullsOffset_;
748+
streamData_->ensureAdditionalNullsCapacity(
749+
hasNulls, static_cast<uint32_t>(remainingNonNullsCount));
750+
if (hasNulls) {
751+
mutableNonNulls.resize(remainingNonNullsCount);
752+
NIMBLE_CHECK_EQ(
753+
mutableNonNulls.size(),
754+
remainingNonNullsCount,
755+
"NonNulls buffer size should be equal to remaining non-nulls count");
756+
757+
std::copy_n(
758+
tempNonNulls.begin() + nonNullsOffset_,
759+
remainingNonNullsCount,
760+
mutableNonNulls.begin());
761+
}
762+
}
763+
dataElementOffset_ = 0;
764+
nonNullsOffset_ = 0;
765+
extraMemoryOffset_ = 0;
766+
}
767+
768+
NullableContentStringStreamData* const streamData_;
769+
const uint64_t minChunkSize_;
770+
const uint64_t maxChunkSize_;
771+
const bool omitStream_;
772+
const bool ensureFullChunks_;
773+
774+
size_t dataElementOffset_{0};
775+
size_t nonNullsOffset_{0};
776+
uint64_t extraMemoryOffset_{0};
777+
};
576778
} // namespace facebook::nimble

dwio/nimble/velox/StreamData.h

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,113 @@ class NullableContentStreamData final : public NullsStreamData {
325325
uint64_t extraMemory_;
326326
};
327327

328+
struct StringBuffer {
329+
Vector<char>& buffer;
330+
Vector<size_t>& lengths;
331+
};
332+
333+
class NullableContentStringStreamData final : public NullsStreamData {
334+
public:
335+
NullableContentStringStreamData(
336+
velox::memory::MemoryPool& memoryPool,
337+
const StreamDescriptorBuilder& descriptor,
338+
const InputBufferGrowthPolicy& dataGrowthPolicy,
339+
const InputBufferGrowthPolicy& bufferGrowthPolicy)
340+
: NullsStreamData(memoryPool, descriptor, dataGrowthPolicy),
341+
buffer_{&memoryPool},
342+
lengths_{&memoryPool},
343+
stringViews_{&memoryPool},
344+
bufferGrowthPolicy_{bufferGrowthPolicy},
345+
lengthGrowthPolicy_{dataGrowthPolicy} {}
346+
347+
inline std::string_view data() const override {
348+
NIMBLE_CHECK(materialized_, "Data must be materialized before access");
349+
return std::string_view{
350+
reinterpret_cast<const char*>(stringViews_.data()),
351+
stringViews_.size() * sizeof(std::string_view)};
352+
}
353+
354+
inline bool empty() const override {
355+
return NullsStreamData::empty() && lengths_.empty();
356+
}
357+
358+
inline uint64_t memoryUsed() const override {
359+
return NullsStreamData::memoryUsed() + buffer_.size() +
360+
lengths_.size() * sizeof(uint64_t) +
361+
stringViews_.size() * sizeof(std::string_view);
362+
}
363+
364+
inline uint64_t bufferSize() const {
365+
return buffer_.size();
366+
}
367+
368+
inline StringBuffer mutableData() {
369+
// When mutableData() is called, it returns direct references to buffer_ and
370+
// lengths_, allowing external code to modify them. Since dataChunkVector_
371+
// contains string_view objects pointing into buffer_, any buffer
372+
// modifications invalidate these views. Therefore, materialize() must be
373+
// called before the next access to rebuild dataChunkVector_ with valid
374+
// string_view references to the updated buffer content.
375+
materialized_ = false;
376+
return {buffer_, lengths_};
377+
}
378+
379+
inline Vector<std::string_view>& mutableStringViews() {
380+
materialized_ = false;
381+
return stringViews_;
382+
}
383+
384+
void materialize() override {
385+
stringViews_.resize(lengths_.size());
386+
auto runningOffset = 0;
387+
for (auto i = 0; i < lengths_.size(); ++i) {
388+
stringViews_[i] = std::string_view(
389+
buffer_.data() + runningOffset, lengths_[i] * sizeof(char));
390+
runningOffset += lengths_[i];
391+
}
392+
NullsStreamData::materialize();
393+
materialized_ = true;
394+
}
395+
396+
void ensureStringBufferCapacity(
397+
uint64_t additionalStrings,
398+
uint64_t additionalBytes) {
399+
if (additionalStrings == 0) {
400+
return;
401+
}
402+
403+
const auto newLengthsSize = lengths_.size() + additionalStrings;
404+
if (newLengthsSize > lengths_.capacity()) {
405+
const auto newCapacity = lengthGrowthPolicy_.getExtendedCapacity(
406+
newLengthsSize, lengths_.capacity());
407+
lengths_.reserve(newCapacity);
408+
}
409+
410+
if (additionalBytes > 0) {
411+
const auto newBufferSize = buffer_.size() + additionalBytes;
412+
if (newBufferSize > buffer_.capacity()) {
413+
const auto newCapacity = bufferGrowthPolicy_.getExtendedCapacity(
414+
newBufferSize, buffer_.capacity());
415+
buffer_.reserve(newCapacity);
416+
}
417+
}
418+
}
419+
420+
inline void reset() override {
421+
NullsStreamData::reset();
422+
buffer_.clear();
423+
lengths_.clear();
424+
stringViews_.clear();
425+
}
426+
427+
private:
428+
Vector<char> buffer_;
429+
Vector<uint64_t> lengths_;
430+
Vector<std::string_view> stringViews_;
431+
const InputBufferGrowthPolicy& bufferGrowthPolicy_;
432+
const InputBufferGrowthPolicy& lengthGrowthPolicy_;
433+
434+
bool materialized_{false};
435+
};
436+
328437
} // namespace facebook::nimble

0 commit comments

Comments
 (0)