3333#include " dwio/nimble/velox/SchemaSerialization.h"
3434#include " dwio/nimble/velox/SchemaTypes.h"
3535#include " dwio/nimble/velox/StatsGenerated.h"
36+ #include " dwio/nimble/velox/StreamChunker.h"
3637#include " velox/common/time/CpuWallTimer.h"
3738#include " velox/dwio/common/ExecutorBarrier.h"
3839#include " velox/type/Type.h"
@@ -808,6 +809,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
808809
809810bool VeloxWriter::writeChunks (
810811 std::span<const uint32_t > streamIndices,
812+ bool ensureFullChunks,
811813 bool lastChunk) {
812814 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
813815 std::atomic<uint64_t > chunkSize = 0 ;
@@ -823,56 +825,34 @@ bool VeloxWriter::writeChunks(
823825 streams_.resize (context_->schemaBuilder .nodeCount ());
824826
825827 auto processStream = [&](StreamData& streamData) {
826- // TODO: Breakdown large streams above a threshold into smaller chunks.
827- const auto minStreamSize =
828- lastChunk ? 0 : context_->options .minStreamChunkRawSize ;
829- const auto * context =
830- streamData.descriptor ().context <WriterStreamContext>();
831- bool isNullStream = context && context->isNullStream ;
832- bool shouldChunkStream = false ;
833- if (isNullStream) {
834- // We apply the same null logic, where if all values
835- // are non-nulls, we omit the entire stream.
836- shouldChunkStream = streamData.hasNulls () &&
837- streamData.nonNulls ().size () > minStreamSize;
838- } else {
839- shouldChunkStream = streamData.data ().size () > minStreamSize;
840- }
841-
842- // If we have previous written chunks for this stream, during final
843- // chunk, always write any remaining data.
844- const auto offset = streamData.descriptor ().offset ();
845- NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
846- auto & stream = streams_[offset];
847- if (lastChunk && !shouldChunkStream && !stream.content .empty ()) {
848- shouldChunkStream =
849- !streamData.empty () || !streamData.nonNulls ().empty ();
850- }
851-
852- if (shouldChunkStream) {
853- std::string_view encoded;
854- if (isNullStream) {
855- // For null streams we promote the null values to be written as
856- // boolean data.
857- encoded = encodeStream (
858- *context_, *encodingBuffer_, NullsAsDataStreamData (streamData));
859- } else {
860- encoded = encodeStream (*context_, *encodingBuffer_, streamData);
861- }
862-
828+ const auto & offset = streamData.descriptor ().offset ();
829+ auto & streamSize = context_->columnStats [offset].physicalSize ;
830+ logicalSizeBeforeEncoding += streamData.memoryUsed ();
831+ auto & streamContent = streams_[offset].content ;
832+ auto chunker = getStreamChunker (
833+ streamData,
834+ StreamChunkerOptions{
835+ .minChunkSize =
836+ lastChunk ? 0 : context_->options .minStreamChunkRawSize ,
837+ .maxChunkSize = context_->options .maxStreamChunkRawSize ,
838+ .ensureFullChunks = ensureFullChunks,
839+ .isFirstChunk = streamContent.empty ()});
840+ while (auto streamDataView = chunker->next ()) {
841+ std::string_view encoded =
842+ encodeStream (*context_, *encodingBuffer_, *streamDataView);
863843 if (!encoded.empty ()) {
864- auto & streamSize = context_->columnStats [offset].physicalSize ;
865844 ChunkedStreamWriter chunkWriter{*encodingBuffer_};
866845 for (auto & buffer : chunkWriter.encode (encoded)) {
867846 streamSize += buffer.size ();
868847 chunkSize += buffer.size ();
869- stream. content .push_back (std::move (buffer));
848+ streamContent .push_back (std::move (buffer));
870849 }
871850 }
872851 wroteChunk = true ;
873- logicalSizeBeforeEncoding += streamData.memoryUsed ();
874- streamData.reset ();
875852 }
853+ // Compact erases processed stream data to reclaim memory.
854+ chunker->compact ();
855+ logicalSizeBeforeEncoding -= streamData.memoryUsed ();
876856 };
877857
878858 const auto & streams = context_->streams ();
@@ -918,7 +898,7 @@ bool VeloxWriter::writeStripe() {
918898 // Chunk all streams.
919899 std::vector<uint32_t > streamIndices (context_->streams ().size ());
920900 std::iota (streamIndices.begin (), streamIndices.end (), 0 );
921- writeChunks (streamIndices, true );
901+ writeChunks (streamIndices, /* ensureFullChunks= */ false , /* lastChunk= */ true );
922902 } else {
923903 writeChunk (true );
924904 }
@@ -997,32 +977,50 @@ bool VeloxWriter::evalauateFlushPolicy() {
997977 };
998978
999979 if (context_->options .enableChunking && shouldChunk ()) {
1000- const auto & streams = context_->streams ();
1001- const size_t streamCount = streams.size ();
1002- // Sort streams for chunking based on raw memory usage.
1003- // TODO(T240072104): Improve performance by bucketing the streams by size
1004- // (most significant bit) instead of sorting.
1005- std::vector<uint32_t > streamIndices (streamCount);
1006- std::iota (streamIndices.begin (), streamIndices.end (), 0 );
1007- std::sort (
1008- streamIndices.begin (),
1009- streamIndices.end (),
1010- [&](const uint32_t & a, const uint32_t & b) {
1011- return streams[a]->memoryUsed () > streams[b]->memoryUsed ();
1012- });
980+ auto batchChunkStreams = [&](const std::vector<uint32_t >& indices,
981+ bool ensureFullChunks) {
982+ const size_t indicesCount = indices.size ();
983+ const auto batchSize = context_->options .chunkedStreamBatchSize ;
984+ for (size_t index = 0 ; index < indicesCount; index += batchSize) {
985+ size_t currentBatchSize = std::min (batchSize, indicesCount - index);
986+ std::span<const uint32_t > batchIndices (
987+ indices.begin () + index, currentBatchSize);
988+ // Stop attempting chunking once streams are too small to chunk or
989+ // memory pressure is relieved.
990+ if (!writeChunks (batchIndices, ensureFullChunks) || !shouldChunk ()) {
991+ return false ;
992+ }
993+ }
994+ return true ;
995+ };
1013996
1014- // Chunk streams in batches.
1015- const auto batchSize = context_->options .chunkedStreamBatchSize ;
1016- for (size_t index = 0 ; index < streamCount; index += batchSize) {
1017- const size_t currentBatchSize = std::min (batchSize, streamCount - index);
1018- std::span<const uint32_t > batchIndices (
1019- streamIndices.begin () + index, currentBatchSize);
1020- // Stop attempting chunking once streams are too small to chunk or
1021- // memory pressure is relieved.
1022- if (!(writeChunks (batchIndices, false ) && shouldChunk ())) {
1023- break ;
997+ // Relieve memory pressure by chunking streams above max size.
998+ const auto & streams = context_->streams ();
999+ std::vector<uint32_t > streamIndices;
1000+ streamIndices.reserve (streams.size ());
1001+ for (auto streamIndex = 0 ; streamIndex < streams.size (); ++streamIndex) {
1002+ if (streams[streamIndex]->memoryUsed () >=
1003+ context_->options .maxStreamChunkRawSize ) {
1004+ streamIndices.push_back (streamIndex);
10241005 }
10251006 }
1007+ const bool continueChunking =
1008+ batchChunkStreams (streamIndices, /* ensureFullChunks=*/ true );
1009+ if (continueChunking) {
1010+ // Relieve memory pressure by chunking small streams.
1011+ // Sort streams for chunking based on raw memory usage.
1012+ // TODO(T240072104): Improve performance by bucketing the streams
1013+ // by size (by most significant bit) instead of sorting them.
1014+ streamIndices.resize (streams.size ());
1015+ std::iota (streamIndices.begin (), streamIndices.end (), 0 );
1016+ std::sort (
1017+ streamIndices.begin (),
1018+ streamIndices.end (),
1019+ [&](const uint32_t & a, const uint32_t & b) {
1020+ return streams[a]->memoryUsed () > streams[b]->memoryUsed ();
1021+ });
1022+ batchChunkStreams (streamIndices, /* ensureFullChunks=*/ false );
1023+ }
10261024 }
10271025
10281026 if (shouldFlush ()) {
0 commit comments