|
21 | 21 | namespace facebook::nimble { |
22 | 22 | namespace detail { |
23 | 23 | inline bool shouldOmitDataStream( |
24 | | - const StreamData& streamData, |
| 24 | + uint64_t dataVectorSize, |
25 | 25 | uint64_t minChunkSize, |
| 26 | + bool allNulls, |
26 | 27 | bool isFirstChunk) { |
27 | | - if (streamData.data().size() > minChunkSize) { |
| 28 | + if (dataVectorSize > minChunkSize) { |
28 | 29 | return false; |
29 | 30 | } |
30 | 31 | // When all values are null, the values stream is omitted. |
31 | | - return isFirstChunk || streamData.nonNulls().empty(); |
| 32 | + return isFirstChunk || allNulls; |
32 | 33 | } |
33 | 34 |
|
34 | 35 | inline bool shouldOmitNullStream( |
@@ -357,8 +358,9 @@ class NullableContentStreamChunker final : public StreamChunker { |
357 | 358 | minChunkSize_{options.minChunkSize}, |
358 | 359 | maxChunkSize_{options.maxChunkSize}, |
359 | 360 | omitStream_{detail::shouldOmitDataStream( |
360 | | - streamData, |
| 361 | + streamData.data().size(), |
361 | 362 | options.minChunkSize, |
| 363 | + streamData.nonNulls().empty(), |
362 | 364 | options.isFirstChunk)}, |
363 | 365 | ensureFullChunks_{options.ensureFullChunks}, |
364 | 366 | extraMemory_{streamData_->extraMemory()} { |
@@ -573,4 +575,204 @@ NullableContentStreamChunker<std::string_view>::nextChunkSize() { |
573 | 575 | } |
574 | 576 | return chunkSize; |
575 | 577 | } |
| 578 | + |
| 579 | +class NullableContentStringStreamChunker final : public StreamChunker { |
| 580 | + public: |
| 581 | + explicit NullableContentStringStreamChunker( |
| 582 | + NullableContentStringStreamData& streamData, |
| 583 | + const StreamChunkerOptions& options) |
| 584 | + : streamData_{&streamData}, |
| 585 | + minChunkSize_{options.minChunkSize}, |
| 586 | + maxChunkSize_{options.maxChunkSize}, |
| 587 | + omitStream_{detail::shouldOmitDataStream( |
| 588 | + streamData.elementCount(), |
| 589 | + options.minChunkSize, |
| 590 | + streamData.nonNulls().empty(), |
| 591 | + options.isFirstChunk)}, |
| 592 | + ensureFullChunks_{options.ensureFullChunks} { |
| 593 | + static_assert(sizeof(bool) == 1); |
| 594 | + streamData.materialize(); |
| 595 | + } |
| 596 | + |
| 597 | + std::optional<StreamDataView> next() override { |
| 598 | + if (omitStream_) { |
| 599 | + return std::nullopt; |
| 600 | + } |
| 601 | + const auto& chunkSize = nextChunkSize(); |
| 602 | + if (chunkSize.rollingChunkSize == 0) { |
| 603 | + return std::nullopt; |
| 604 | + } |
| 605 | + |
| 606 | + // Content |
| 607 | + auto& dataChunkVector = streamData_->mutableStringViews(); |
| 608 | + dataChunkVector.resize(0); |
| 609 | + dataChunkVector.reserve(chunkSize.dataElementCount); |
| 610 | + auto mutableData = streamData_->mutableData(); |
| 611 | + auto& mutableLengths = mutableData.lengths; |
| 612 | + auto& mutableBuffer = mutableData.buffer; |
| 613 | + auto runningOffset = extraMemoryOffset_; |
| 614 | + auto endOffset = dataElementOffset_ + chunkSize.dataElementCount; |
| 615 | + for (size_t i = dataElementOffset_; i < endOffset; ++i) { |
| 616 | + dataChunkVector.emplace_back( |
| 617 | + std::string_view{ |
| 618 | + mutableBuffer.data() + runningOffset, mutableLengths[i]}); |
| 619 | + runningOffset += mutableLengths[i]; |
| 620 | + } |
| 621 | + std::string_view dataChunk = { |
| 622 | + reinterpret_cast<const char*>(dataChunkVector.data()), |
| 623 | + chunkSize.dataElementCount * sizeof(std::string_view)}; |
| 624 | + |
| 625 | + // Nulls |
| 626 | + std::span<const bool> nonNullsChunk( |
| 627 | + streamData_->mutableNonNulls().data() + nonNullsOffset_, |
| 628 | + chunkSize.nullElementCount); |
| 629 | + |
| 630 | + dataElementOffset_ += chunkSize.dataElementCount; |
| 631 | + nonNullsOffset_ += chunkSize.nullElementCount; |
| 632 | + extraMemoryOffset_ += chunkSize.extraMemory; |
| 633 | + |
| 634 | + if (chunkSize.nullElementCount > chunkSize.dataElementCount) { |
| 635 | + return StreamDataView{ |
| 636 | + streamData_->descriptor(), |
| 637 | + dataChunk, |
| 638 | + static_cast<uint32_t>(chunkSize.nullElementCount), |
| 639 | + nonNullsChunk}; |
| 640 | + } |
| 641 | + NIMBLE_CHECK_EQ(chunkSize.dataElementCount, chunkSize.nullElementCount); |
| 642 | + return StreamDataView{ |
| 643 | + streamData_->descriptor(), |
| 644 | + dataChunk, |
| 645 | + static_cast<uint32_t>(chunkSize.dataElementCount)}; |
| 646 | + } |
| 647 | + |
| 648 | + private: |
| 649 | + ChunkSize nextChunkSize() { |
| 650 | + const auto& stringLengths = streamData_->mutableData().lengths; |
| 651 | + const auto& nonNulls = streamData_->mutableNonNulls(); |
| 652 | + const auto bufferedCount = nonNulls.size(); |
| 653 | + ChunkSize chunkSize; |
| 654 | + bool fullChunk{false}; |
| 655 | + // Calculate how many entries we can fit in the chunk |
| 656 | + for (size_t nonNullsIndex = nonNullsOffset_; nonNullsIndex < bufferedCount; |
| 657 | + ++nonNullsIndex) { |
| 658 | + uint64_t elementTotalSize{sizeof(bool)}; |
| 659 | + uint32_t elementDataCount{0}; |
| 660 | + size_t elementExtraMemory{0}; |
| 661 | + if (nonNulls[nonNullsIndex]) { |
| 662 | + elementExtraMemory = |
| 663 | + stringLengths[dataElementOffset_ + chunkSize.dataElementCount]; |
| 664 | + elementTotalSize += elementExtraMemory + sizeof(uint64_t); |
| 665 | + ++elementDataCount; |
| 666 | + } |
| 667 | + |
| 668 | + if (chunkSize.rollingChunkSize == 0 && elementTotalSize > maxChunkSize_) { |
| 669 | + // Allow a single oversized string as its own chunk. |
| 670 | + fullChunk = true; |
| 671 | + chunkSize.extraMemory += elementExtraMemory; |
| 672 | + chunkSize.dataElementCount += elementDataCount; |
| 673 | + chunkSize.rollingChunkSize += elementTotalSize; |
| 674 | + ++chunkSize.nullElementCount; |
| 675 | + break; |
| 676 | + } |
| 677 | + |
| 678 | + if (chunkSize.rollingChunkSize + elementTotalSize > maxChunkSize_) { |
| 679 | + fullChunk = true; |
| 680 | + break; |
| 681 | + } |
| 682 | + |
| 683 | + chunkSize.extraMemory += elementExtraMemory; |
| 684 | + chunkSize.dataElementCount += elementDataCount; |
| 685 | + chunkSize.rollingChunkSize += elementTotalSize; |
| 686 | + ++chunkSize.nullElementCount; |
| 687 | + } |
| 688 | + |
| 689 | + fullChunk = fullChunk || (chunkSize.rollingChunkSize == maxChunkSize_); |
| 690 | + if ((ensureFullChunks_ && !fullChunk) || |
| 691 | + (chunkSize.rollingChunkSize < minChunkSize_)) { |
| 692 | + chunkSize = ChunkSize{}; |
| 693 | + } |
| 694 | + return chunkSize; |
| 695 | + } |
| 696 | + |
| 697 | + void compact() override { |
| 698 | + // Clear existing outputvector before beginning compaction. |
| 699 | + streamData_->mutableStringViews().clear(); |
| 700 | + |
| 701 | + // No changes made to stream data, nothing to compact. |
| 702 | + if (nonNullsOffset_ == 0) { |
| 703 | + return; |
| 704 | + } |
| 705 | + |
| 706 | + const bool hasNulls = streamData_->hasNulls(); |
| 707 | + // Move and clear existing buffers |
| 708 | + auto tempBuffer = std::move(streamData_->mutableData().buffer); |
| 709 | + auto tempLengths = std::move(streamData_->mutableData().lengths); |
| 710 | + auto tempNonNulls = std::move(streamData_->mutableNonNulls()); |
| 711 | + streamData_->reset(); |
| 712 | + NIMBLE_CHECK( |
| 713 | + streamData_->empty(), "StreamData should be empty after reset"); |
| 714 | + |
| 715 | + { |
| 716 | + const auto remainingDataCount = tempLengths.size() - dataElementOffset_; |
| 717 | + auto& mutableDataLength = streamData_->mutableData().lengths; |
| 718 | + mutableDataLength.resize(remainingDataCount); |
| 719 | + NIMBLE_DCHECK_EQ( |
| 720 | + mutableDataLength.size(), |
| 721 | + remainingDataCount, |
| 722 | + "Data length size should be equal to remaining data count"); |
| 723 | + |
| 724 | + std::copy_n( |
| 725 | + tempLengths.begin() + dataElementOffset_, |
| 726 | + remainingDataCount, |
| 727 | + mutableDataLength.begin()); |
| 728 | + } |
| 729 | + |
| 730 | + { |
| 731 | + const auto remainingDataBytes = tempBuffer.size() - extraMemoryOffset_; |
| 732 | + auto& mutableDataBuffer = streamData_->mutableData().buffer; |
| 733 | + mutableDataBuffer.resize(remainingDataBytes); |
| 734 | + NIMBLE_DCHECK_EQ( |
| 735 | + mutableDataBuffer.size(), |
| 736 | + remainingDataBytes, |
| 737 | + "Data buffer size should be equal to remaining data bytes"); |
| 738 | + |
| 739 | + std::copy_n( |
| 740 | + tempBuffer.begin() + extraMemoryOffset_, |
| 741 | + remainingDataBytes, |
| 742 | + mutableDataBuffer.begin()); |
| 743 | + } |
| 744 | + |
| 745 | + { |
| 746 | + auto& mutableNonNulls = streamData_->mutableNonNulls(); |
| 747 | + const auto remainingNonNullsCount = tempNonNulls.size() - nonNullsOffset_; |
| 748 | + streamData_->ensureAdditionalNullsCapacity( |
| 749 | + hasNulls, static_cast<uint32_t>(remainingNonNullsCount)); |
| 750 | + if (hasNulls) { |
| 751 | + mutableNonNulls.resize(remainingNonNullsCount); |
| 752 | + NIMBLE_CHECK_EQ( |
| 753 | + mutableNonNulls.size(), |
| 754 | + remainingNonNullsCount, |
| 755 | + "NonNulls buffer size should be equal to remaining non-nulls count"); |
| 756 | + |
| 757 | + std::copy_n( |
| 758 | + tempNonNulls.begin() + nonNullsOffset_, |
| 759 | + remainingNonNullsCount, |
| 760 | + mutableNonNulls.begin()); |
| 761 | + } |
| 762 | + } |
| 763 | + dataElementOffset_ = 0; |
| 764 | + nonNullsOffset_ = 0; |
| 765 | + extraMemoryOffset_ = 0; |
| 766 | + } |
| 767 | + |
| 768 | + NullableContentStringStreamData* const streamData_; |
| 769 | + const uint64_t minChunkSize_; |
| 770 | + const uint64_t maxChunkSize_; |
| 771 | + const bool omitStream_; |
| 772 | + const bool ensureFullChunks_; |
| 773 | + |
| 774 | + size_t dataElementOffset_{0}; |
| 775 | + size_t nonNullsOffset_{0}; |
| 776 | + uint64_t extraMemoryOffset_{0}; |
| 777 | +}; |
576 | 778 | } // namespace facebook::nimble |
0 commit comments