@@ -267,6 +267,41 @@ struct RChangeCompressionFunc {
267267 RPageStorage::RSealedPage &fSealedPage ;
268268 ROOT::Internal::RPageAllocator &fPageAlloc ;
269269 std::uint8_t *fBuffer ;
270+ std::size_t fBufSize ;
271+ const ROOT::RNTupleWriteOptions &fWriteOpts ;
272+
273+ void operator ()() const
274+ {
275+ assert (fSrcColElement .GetIdentifier () == fDstColElement .GetIdentifier ());
276+
277+ fSealedPage .VerifyChecksumIfEnabled ().ThrowOnError ();
278+
279+ const auto bytesPacked = fSrcColElement .GetPackedSize (fSealedPage .GetNElements ());
280+ // TODO: this buffer could be kept and reused across pages
281+ auto unzipBuf = MakeUninitArray<unsigned char >(bytesPacked);
282+ ROOT::Internal::RNTupleDecompressor::Unzip (fSealedPage .GetBuffer (), fSealedPage .GetDataSize (), bytesPacked,
283+ unzipBuf.get ());
284+
285+ const auto checksumSize = fWriteOpts .GetEnablePageChecksums () * sizeof (std::uint64_t );
286+ assert (fBufSize >= bytesPacked + checksumSize);
287+ auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip (unzipBuf.get (), bytesPacked,
288+ fMergeOptions .fCompressionSettings .value (), fBuffer );
289+
290+ fSealedPage = {fBuffer , nBytesZipped + checksumSize, fSealedPage .GetNElements (), fSealedPage .GetHasChecksum ()};
291+ fSealedPage .ChecksumIfEnabled ();
292+ }
293+ };
294+
295+ struct RResealFunc {
296+ const RColumnElementBase &fSrcColElement ;
297+ const RColumnElementBase &fDstColElement ;
298+ const RNTupleMergeOptions &fMergeOptions ;
299+
300+ RPageStorage::RSealedPage &fSealedPage ;
301+ ROOT::Internal::RPageAllocator &fPageAlloc ;
302+ std::uint8_t *fBuffer ;
303+ std::size_t fBufSize ;
304+ const ROOT::RNTupleWriteOptions &fWriteOpts ;
270305
271306 void operator ()() const
272307 {
@@ -276,12 +311,26 @@ struct RChangeCompressionFunc {
276311 sealConf.fPage = &page;
277312 sealConf.fBuffer = fBuffer ;
278313 sealConf.fCompressionSettings = *fMergeOptions .fCompressionSettings ;
279- sealConf.fWriteChecksum = fSealedPage .GetHasChecksum ();
314+ sealConf.fWriteChecksum = fWriteOpts .GetEnablePageChecksums ();
315+ assert (fBufSize >= fSealedPage .GetDataSize () + fSealedPage .GetHasChecksum () * sizeof (std::uint64_t ));
280316 auto refSealedPage = RPageSink::SealPage (sealConf);
281317 fSealedPage = refSealedPage;
282318 }
283319};
284320
321+ struct RTaskVisitor {
322+ std::optional<ROOT::Experimental::TTaskGroup> &fGroup ;
323+
324+ template <typename T>
325+ void operator ()(T &&f)
326+ {
327+ if (fGroup )
328+ fGroup ->Run (f);
329+ else
330+ f ();
331+ }
332+ };
333+
285334struct RCommonField {
286335 const ROOT::RFieldDescriptor *fSrc ;
287336 const ROOT::RFieldDescriptor *fDst ;
@@ -778,15 +827,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
778827
779828 // Each column range potentially has a distinct compression settings
780829 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange (columnId).GetCompressionSettings ().value ();
781- // If either the compression or the encoding of the source doesn't match that of the destination, we need
782- // to reseal the page. Otherwise, if both match, we can fast merge.
830+
831+ // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
832+ // L1: compression and encoding of src and dest both match: we can simply copy the page
833+ // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
834+ // resealing it.
835+ // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
836+ // it.
837+ const bool compressionIsDifferent =
838+ colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value ();
783839 const bool needsResealing =
784- colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value () ||
785840 srcColElement->GetIdentifier ().fOnDiskType != dstColElement->GetIdentifier ().fOnDiskType ;
841+ const bool needsRecompressing = compressionIsDifferent || needsResealing;
786842
787- if (needsResealing && mergeData.fMergeOpts .fExtraVerbose ) {
843+ if (needsRecompressing && mergeData.fMergeOpts .fExtraVerbose ) {
788844 R__LOG_INFO (NTupleMergeLog ())
789- << " Resealing column " << column.fColumnName << " : { compression: " << colRangeCompressionSettings << " => "
845+ << (needsResealing ? " Resealing" : " Recompressing" ) << " column " << column.fColumnName
846+ << " : { compression: " << colRangeCompressionSettings << " => "
790847 << mergeData.fMergeOpts .fCompressionSettings .value ()
791848 << " , onDiskType: " << RColumnElementBase::GetColumnTypeName (srcColElement->GetIdentifier ().fOnDiskType )
792849 << " => " << RColumnElementBase::GetColumnTypeName (dstColElement->GetIdentifier ().fOnDiskType );
@@ -795,7 +852,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
795852 size_t pageBufferBaseIdx = sealedPageData.fBuffers .size ();
796853 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
797854 // bother reserving memory for them.
798- if (needsResealing )
855+ if (needsRecompressing )
799856 sealedPageData.fBuffers .resize (sealedPageData.fBuffers .size () + pages.GetPageInfos ().size ());
800857
801858 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
@@ -831,18 +888,38 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
831888 sealedPage.VerifyChecksumIfEnabled ().ThrowOnError ();
832889 R__ASSERT (onDiskPage && (onDiskPage->GetSize () == sealedPage.GetBufferSize ()));
833890
834- if (needsResealing ) {
891+ if (needsRecompressing ) {
835892 const auto uncompressedSize = srcColElement->GetSize () * sealedPage.GetNElements ();
836893 auto &buffer = sealedPageData.fBuffers [pageBufferBaseIdx + pageIdx];
837- buffer = MakeUninitArray<std::uint8_t >(uncompressedSize + checksumSize);
838- RChangeCompressionFunc compressTask{
839- *srcColElement, *dstColElement, mergeData.fMergeOpts , sealedPage, *fPageAlloc , buffer.get (),
840- };
841-
842- if (fTaskGroup )
843- fTaskGroup ->Run (compressTask);
844- else
845- compressTask ();
894+ const auto bufSize = uncompressedSize + checksumSize;
895+ // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
896+ // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
897+ // the actual data size after recompressing.
898+ buffer = MakeUninitArray<std::uint8_t >(bufSize);
899+
900+ if (needsResealing) {
901+ RTaskVisitor{fTaskGroup }(RResealFunc{
902+ *srcColElement,
903+ *dstColElement,
904+ mergeData.fMergeOpts ,
905+ sealedPage,
906+ *fPageAlloc ,
907+ buffer.get (),
908+ bufSize,
909+ mergeData.fDestination .GetWriteOptions ()
910+ });
911+ } else {
912+ RTaskVisitor{fTaskGroup }(RChangeCompressionFunc{
913+ *srcColElement,
914+ *dstColElement,
915+ mergeData.fMergeOpts ,
916+ sealedPage,
917+ *fPageAlloc ,
918+ buffer.get (),
919+ bufSize,
920+ mergeData.fDestination .GetWriteOptions ()
921+ });
922+ }
846923 }
847924
848925 ++pageIdx;
0 commit comments