@@ -44,7 +44,6 @@ namespace detail {
4444class WriterContext : public FieldWriterContext {
4545 public:
4646 const VeloxWriterOptions options;
47- std::unique_ptr<FlushPolicy> flushPolicy;
4847 velox::CpuWallTiming totalFlushTiming;
4948 velox::CpuWallTiming stripeFlushTiming;
5049 velox::CpuWallTiming encodingSelectionTiming;
@@ -56,8 +55,11 @@ class WriterContext : public FieldWriterContext {
5655 uint64_t bytesWritten{0 };
5756 uint64_t rowsInFile{0 };
5857 uint64_t rowsInStripe{0 };
59- uint64_t stripeSize{0 };
60- uint64_t rawSize{0 };
58+ // Physical size of the encoded stripe data.
59+ uint64_t stripeEncodedPhysicalSize{0 };
60+ // Logical size of the encoded stripe data.
61+ uint64_t stripeEncodedLogicalSize{0 };
62+ uint64_t fileRawSize{0 };
6163 std::vector<uint64_t > rowsPerStripe;
6264
6365 WriterContext (
@@ -66,7 +68,6 @@ class WriterContext : public FieldWriterContext {
6668 : FieldWriterContext{memoryPool, options.reclaimerFactory (), options.vectorDecoderVisitor },
6769 options{std::move (options)},
6870 logger{this ->options .metricsLogger } {
69- flushPolicy = this ->options .flushPolicyFactory ();
7071 inputBufferGrowthPolicy = this ->options .lowMemoryMode
7172 ? std::make_unique<ExactGrowthPolicy>()
7273 : this ->options .inputGrowthPolicyFactory ();
@@ -81,7 +82,8 @@ class WriterContext : public FieldWriterContext {
8182 rowsPerStripe.push_back (rowsInStripe);
8283 memoryUsed = 0 ;
8384 rowsInStripe = 0 ;
84- stripeSize = 0 ;
85+ stripeEncodedPhysicalSize = 0 ;
86+ stripeEncodedLogicalSize = 0 ;
8587 ++stripeIndex_;
8688 }
8789
@@ -99,6 +101,45 @@ namespace {
99101
100102constexpr uint32_t kInitialSchemaSectionSize = 1 << 20 ; // 1MB
101103
104+ // When writing null streams, we write the nulls as data, and the stream itself
105+ // is non-nullable. This adpater class is how we expose the nulls as values.
106+ class NullsAsDataStreamData : public StreamData {
107+ public:
108+ explicit NullsAsDataStreamData (StreamData& streamData)
109+ : StreamData(streamData.descriptor()), streamData_{streamData} {
110+ streamData_.materialize ();
111+ }
112+
113+ inline virtual std::string_view data () const override {
114+ return {
115+ reinterpret_cast <const char *>(streamData_.nonNulls ().data ()),
116+ streamData_.nonNulls ().size ()};
117+ }
118+
119+ inline virtual std::span<const bool > nonNulls () const override {
120+ return {};
121+ }
122+
123+ inline virtual bool hasNulls () const override {
124+ return false ;
125+ }
126+
127+ inline virtual bool empty () const override {
128+ return streamData_.empty ();
129+ }
130+
131+ inline virtual uint64_t memoryUsed () const override {
132+ return streamData_.memoryUsed ();
133+ }
134+
135+ inline virtual void reset () override {
136+ streamData_.reset ();
137+ }
138+
139+ private:
140+ StreamData& streamData_;
141+ };
142+
102143class WriterStreamContext : public StreamContext {
103144 public:
104145 bool isNullStream = false ;
@@ -132,7 +173,7 @@ std::string_view encode(
132173 std::unique_ptr<EncodingSelectionPolicy<T>> policy;
133174 if (encodingLayout.has_value ()) {
134175 policy = std::make_unique<ReplayedEncodingSelectionPolicy<T>>(
135- encodingLayout.value (),
176+ std::move ( encodingLayout) .value (),
136177 context.options .compressionOptions ,
137178 context.options .encodingSelectionPolicyFactory );
138179
@@ -167,7 +208,7 @@ std::string_view encodeStreamTyped(
167208 }
168209
169210 try {
170- return encode<T>(encodingLayout, context, buffer, streamData);
211+ return encode<T>(std::move ( encodingLayout) , context, buffer, streamData);
171212 } catch (const NimbleUserError& e) {
172213 if (e.errorCode () != error_code::IncompatibleEncoding ||
173214 !encodingLayout.has_value ()) {
@@ -214,7 +255,8 @@ template <typename Set>
214255void findNodeIds (
215256 const velox::dwio::common::TypeWithId& typeWithId,
216257 Set& output,
217- std::function<bool (const velox::dwio::common::TypeWithId&)> predicate) {
258+ const std::function<bool (const velox::dwio::common::TypeWithId&)>&
259+ predicate) {
218260 if (predicate (typeWithId)) {
219261 output.insert (typeWithId.id ());
220262 }
@@ -515,7 +557,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
515557 auto rawSize = nimble::getRawSizeFromVector (
516558 vector, velox::common::Ranges::of (0 , size));
517559 DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
518- context_->rawSize += rawSize;
560+ context_->fileRawSize += rawSize;
519561
520562 if (context_->options .writeExecutor ) {
521563 velox::dwio::common::ExecutorBarrier barrier{
@@ -580,7 +622,8 @@ void VeloxWriter::close() {
580622 *context_->schemaBuilder .getRoot (), context_->columnStats );
581623 // TODO(T228118622): Write column stats to file.
582624 flatbuffers::FlatBufferBuilder builder;
583- builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
625+ builder.Finish (
626+ serialization::CreateStats (builder, context_->fileRawSize ));
584627 writer_.writeOptionalSection (
585628 std::string (kStatsSection ),
586629 {reinterpret_cast <const char *>(builder.GetBufferPointer ()),
@@ -650,45 +693,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
650693 }
651694 streams_.resize (context_->schemaBuilder .nodeCount ());
652695
653- // When writing null streams, we write the nulls as data, and the stream
654- // itself is non-nullable. This adpater class is how we expose the nulls as
655- // values.
656- class NullsAsDataStreamData : public StreamData {
657- public:
658- explicit NullsAsDataStreamData (StreamData& streamData)
659- : StreamData(streamData.descriptor()), streamData_{streamData} {
660- streamData_.materialize ();
661- }
662-
663- inline virtual std::string_view data () const override {
664- return {
665- reinterpret_cast <const char *>(streamData_.nonNulls ().data ()),
666- streamData_.nonNulls ().size ()};
667- }
668-
669- inline virtual std::span<const bool > nonNulls () const override {
670- return {};
671- }
672-
673- inline virtual bool hasNulls () const override {
674- return false ;
675- }
676-
677- inline virtual bool empty () const override {
678- return streamData_.empty ();
679- }
680- inline virtual uint64_t memoryUsed () const override {
681- return streamData_.memoryUsed ();
682- }
683-
684- inline virtual void reset () override {
685- streamData_.reset ();
686- }
687-
688- private:
689- StreamData& streamData_;
690- };
691-
692696 auto encode = [&](StreamData& streamData, uint64_t & streamSize) {
693697 const auto offset = streamData.descriptor ().offset ();
694698 auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
@@ -777,8 +781,103 @@ void VeloxWriter::writeChunk(bool lastChunk) {
777781 if (lastChunk) {
778782 root_->reset ();
779783 }
784+ }
785+
786+ // Consider getting this from flush timing.
787+ auto flushWallTimeMs =
788+ (context_->stripeFlushTiming .wallNanos - previousFlushWallTime) /
789+ 1'000'000 ;
790+ VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
791+ << " , chunk bytes: " << chunkSize;
792+ }
793+
794+ bool VeloxWriter::writeChunks (bool lastChunk) {
795+ uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
796+ std::atomic<uint64_t > chunkSize = 0 ;
797+ std::atomic<uint64_t > logicalSizeBeforeEncoding = 0 ;
798+ std::atomic<bool > wroteChunk = false ;
799+ {
800+ LoggingScope scope{*context_->logger };
801+ velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming };
802+
803+ if (!encodingBuffer_) {
804+ encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
805+ }
806+ streams_.resize (context_->schemaBuilder .nodeCount ());
807+
808+ auto processStream = [&](StreamData& streamData) {
809+ // TODO: Breakdown large streams above a threshold into smaller chunks.
810+ const auto minStreamSize =
811+ lastChunk ? 0 : context_->options .minStreamChunkRawSize ;
812+ const auto * context =
813+ streamData.descriptor ().context <WriterStreamContext>();
814+ bool isNullStream = context && context->isNullStream ;
815+ bool shouldChunkStream = false ;
816+ if (isNullStream) {
817+ // We apply the same null logic, where if all values
818+ // are non-nulls, we omit the entire stream.
819+ shouldChunkStream = streamData.hasNulls () &&
820+ streamData.nonNulls ().size () > minStreamSize;
821+ } else {
822+ shouldChunkStream = streamData.data ().size () > minStreamSize;
823+ }
824+
825+ // If we have previous written chunks for this stream, during final
826+ // chunk, always write any remaining data.
827+ const auto offset = streamData.descriptor ().offset ();
828+ NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
829+ auto & stream = streams_[offset];
830+ if (lastChunk && !shouldChunkStream && !stream.content .empty ()) {
831+ shouldChunkStream =
832+ !streamData.empty () || !streamData.nonNulls ().empty ();
833+ }
834+
835+ if (shouldChunkStream) {
836+ std::string_view encoded;
837+ if (isNullStream) {
838+ // For null streams we promote the null values to be written as
839+ // boolean data.
840+ encoded = encodeStream (
841+ *context_, *encodingBuffer_, NullsAsDataStreamData{streamData});
842+ } else {
843+ encoded = encodeStream (*context_, *encodingBuffer_, streamData);
844+ }
845+
846+ if (!encoded.empty ()) {
847+ auto & streamSize = context_->columnStats [offset].physicalSize ;
848+ ChunkedStreamWriter chunkWriter{*encodingBuffer_};
849+ for (auto & buffer : chunkWriter.encode (encoded)) {
850+ streamSize += buffer.size ();
851+ chunkSize += buffer.size ();
852+ stream.content .push_back (std::move (buffer));
853+ }
854+ }
855+ wroteChunk = true ;
856+ logicalSizeBeforeEncoding += streamData.memoryUsed ();
857+ streamData.reset ();
858+ }
859+ };
860+
861+ if (context_->options .encodingExecutor ) {
862+ velox::dwio::common::ExecutorBarrier barrier{
863+ context_->options .encodingExecutor };
864+ for (auto & streamData : context_->streams ()) {
865+ barrier.add ([&] { processStream (*streamData); });
866+ }
867+ barrier.waitAll ();
868+ } else {
869+ for (auto & streamData : context_->streams ()) {
870+ processStream (*streamData);
871+ }
872+ }
873+
874+ if (lastChunk) {
875+ root_->reset ();
876+ }
780877
781- context_->stripeSize += chunkSize;
878+ context_->stripeEncodedPhysicalSize += chunkSize;
879+ context_->stripeEncodedLogicalSize += logicalSizeBeforeEncoding;
880+ context_->memoryUsed -= logicalSizeBeforeEncoding;
782881 }
783882
784883 // Consider getting this from flush timing.
@@ -787,10 +886,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
787886 1'000'000 ;
788887 VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
789888 << " , chunk bytes: " << chunkSize;
889+ return wroteChunk.load ();
790890}
791891
792892uint32_t VeloxWriter::writeStripe () {
793- writeChunk (true );
893+ if (context_->options .enableChunking ) {
894+ writeChunks (true );
895+
896+ } else {
897+ writeChunk (true );
898+ }
794899
795900 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
796901 uint64_t stripeSize = 0 ;
@@ -840,36 +945,42 @@ bool VeloxWriter::tryWriteStripe(bool force) {
840945 return false ;
841946 }
842947
948+ auto flushPolicy = context_->options .flushPolicyFactory ();
949+ NIMBLE_DASSERT (flushPolicy != nullptr , " Flush policy must not be null" );
950+
843951 auto shouldFlush = [&]() {
844- return context_-> flushPolicy ->shouldFlush (StripeProgress{
952+ return flushPolicy->shouldFlush (StripeProgress{
845953 .stripeRawSize = context_->memoryUsed ,
846- .stripeEncodedSize = context_->stripeSize });
954+ .stripeEncodedSize = context_->stripeEncodedPhysicalSize ,
955+ .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize });
847956 };
848957
849958 auto shouldChunk = [&]() {
850- return context_-> flushPolicy ->shouldChunk (StripeProgress{
959+ return flushPolicy->shouldChunk (StripeProgress{
851960 .stripeRawSize = context_->memoryUsed ,
852- .stripeEncodedSize = context_->stripeSize });
961+ .stripeEncodedSize = context_->stripeEncodedPhysicalSize ,
962+ .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize ,
963+ });
853964 };
854965
855966 try {
856967 // TODO: we can improve merge the last chunk write with stripe
857- if (context_->options .enableChunking &&
858- shouldChunk () == ChunkDecision::Chunk) {
859- writeChunk ( false );
968+ if (context_->options .enableChunking ) {
969+ while ( shouldChunk () == ChunkDecision::Chunk && writeChunks ( false ) ) {
970+ }
860971 }
861972
862973 if (!(force || shouldFlush () == FlushDecision::Stripe)) {
863974 return false ;
864975 }
865976
977+ uint32_t stripeSize = writeStripe ();
866978 StripeFlushMetrics metrics{
867- .inputSize = context_->stripeSize ,
979+ .inputSize = context_->stripeEncodedPhysicalSize ,
868980 .rowCount = context_->rowsInStripe ,
981+ .stripeSize = stripeSize,
869982 .trackedMemory = context_->memoryUsed ,
870983 };
871-
872- metrics.stripeSize = writeStripe ();
873984 context_->logger ->logStripeFlush (metrics);
874985
875986 context_->nextStripe ();
@@ -889,7 +1000,7 @@ VeloxWriter::RunStats VeloxWriter::getRunStats() const {
8891000 return RunStats{
8901001 .bytesWritten = context_->bytesWritten ,
8911002 .stripeCount = folly::to<uint32_t >(context_->getStripeIndex ()),
892- .rawSize = context_->rawSize ,
1003+ .rawSize = context_->fileRawSize ,
8931004 .rowsPerStripe = context_->rowsPerStripe ,
8941005 .flushCpuTimeUsec = context_->totalFlushTiming .cpuNanos / 1000 ,
8951006 .flushWallTimeUsec = context_->totalFlushTiming .wallNanos / 1000 ,
0 commit comments