@@ -267,6 +267,40 @@ struct RChangeCompressionFunc {
267267 RPageStorage::RSealedPage &fSealedPage ;
268268 ROOT::Internal::RPageAllocator &fPageAlloc ;
269269 std::uint8_t *fBuffer ;
270+ std::size_t fBufSize ;
271+ const ROOT::RNTupleWriteOptions &fWriteOpts ;
272+
273+ void operator ()() const
274+ {
275+ assert (fSrcColElement .GetIdentifier () == fDstColElement .GetIdentifier ());
276+
277+ fSealedPage .VerifyChecksumIfEnabled ().ThrowOnError ();
278+
279+ const auto bytesPacked = fSrcColElement .GetPackedSize (fSealedPage .GetNElements ());
280+ // TODO: this buffer could be kept and reused across pages
281+ auto unzipBuf = MakeUninitArray<unsigned char >(bytesPacked);
282+ ROOT::Internal::RNTupleDecompressor::Unzip (fSealedPage .GetBuffer (), fSealedPage .GetDataSize (), bytesPacked,
283+ unzipBuf.get ());
284+
285+ const auto checksumSize = fWriteOpts .GetEnablePageChecksums () * sizeof (std::uint64_t );
286+ assert (fBufSize >= bytesPacked + checksumSize);
287+ auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip (unzipBuf.get (), bytesPacked,
288+ fMergeOptions .fCompressionSettings .value (), fBuffer );
289+
290+ fSealedPage = {fBuffer , nBytesZipped + checksumSize, fSealedPage .GetNElements (), fSealedPage .GetHasChecksum ()};
291+ fSealedPage .ChecksumIfEnabled ();
292+ }
293+ };
294+
295+ struct RResealFunc {
296+ const RColumnElementBase &fSrcColElement ;
297+ const RColumnElementBase &fDstColElement ;
298+ const RNTupleMergeOptions &fMergeOptions ;
299+
300+ RPageStorage::RSealedPage &fSealedPage ;
301+ ROOT::Internal::RPageAllocator &fPageAlloc ;
302+ std::uint8_t *fBuffer ;
303+ std::size_t fBufSize ;
270304
271305 void operator ()() const
272306 {
@@ -277,11 +311,25 @@ struct RChangeCompressionFunc {
277311 sealConf.fBuffer = fBuffer ;
278312 sealConf.fCompressionSettings = *fMergeOptions .fCompressionSettings ;
279313 sealConf.fWriteChecksum = fSealedPage .GetHasChecksum ();
314+ assert (fBufSize >= fSealedPage .GetDataSize () + fSealedPage .GetHasChecksum () * sizeof (std::uint64_t ));
280315 auto refSealedPage = RPageSink::SealPage (sealConf);
281316 fSealedPage = refSealedPage;
282317 }
283318};
284319
320+ struct RTaskVisitor {
321+ std::optional<ROOT::Experimental::TTaskGroup> &fGroup ;
322+
323+ template <typename T>
324+ void operator ()(T &&f)
325+ {
326+ if (fGroup )
327+ fGroup ->Run (f);
328+ else
329+ f ();
330+ }
331+ };
332+
285333struct RCommonField {
286334 const ROOT::RFieldDescriptor *fSrc ;
287335 const ROOT::RFieldDescriptor *fDst ;
@@ -789,15 +837,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
789837
790838 // Each column range potentially has a distinct compression settings
791839 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange (columnId).GetCompressionSettings ().value ();
792- // If either the compression or the encoding of the source doesn't match that of the destination, we need
793- // to reseal the page. Otherwise, if both match, we can fast merge.
840+
841+ // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
842+ // L1: compression and encoding of src and dest both match: we can simply copy the page
843+ // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
844+ // resealing it.
845+ // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
846+ // it.
847+ const bool compressionIsDifferent =
848+ colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value ();
794849 const bool needsResealing =
795- colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value () ||
796850 srcColElement->GetIdentifier ().fOnDiskType != dstColElement->GetIdentifier ().fOnDiskType ;
851+ const bool needsRecompressing = compressionIsDifferent || needsResealing;
797852
798- if (needsResealing && mergeData.fMergeOpts .fExtraVerbose ) {
853+ if (needsRecompressing && mergeData.fMergeOpts .fExtraVerbose ) {
799854 R__LOG_INFO (NTupleMergeLog ())
800- << " Resealing column " << column.fColumnName << " : { compression: " << colRangeCompressionSettings << " => "
855+ << (needsResealing ? " Resealing" : " Recompressing" ) << " column " << column.fColumnName
856+ << " : { compression: " << colRangeCompressionSettings << " => "
801857 << mergeData.fMergeOpts .fCompressionSettings .value ()
802858 << " , onDiskType: " << RColumnElementBase::GetColumnTypeName (srcColElement->GetIdentifier ().fOnDiskType )
803859 << " => " << RColumnElementBase::GetColumnTypeName (dstColElement->GetIdentifier ().fOnDiskType );
@@ -806,7 +862,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
806862 size_t pageBufferBaseIdx = sealedPageData.fBuffers .size ();
807863 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
808864 // bother reserving memory for them.
809- if (needsResealing )
865+ if (needsRecompressing )
810866 sealedPageData.fBuffers .resize (sealedPageData.fBuffers .size () + pages.GetPageInfos ().size ());
811867
812868 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
@@ -842,18 +898,37 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
842898 sealedPage.VerifyChecksumIfEnabled ().ThrowOnError ();
843899 R__ASSERT (onDiskPage && (onDiskPage->GetSize () == sealedPage.GetBufferSize ()));
844900
845- if (needsResealing ) {
901+ if (needsRecompressing ) {
846902 const auto uncompressedSize = srcColElement->GetSize () * sealedPage.GetNElements ();
847903 auto &buffer = sealedPageData.fBuffers [pageBufferBaseIdx + pageIdx];
848- buffer = MakeUninitArray<std::uint8_t >(uncompressedSize + checksumSize);
849- RChangeCompressionFunc compressTask{
850- *srcColElement, *dstColElement, mergeData.fMergeOpts , sealedPage, *fPageAlloc , buffer.get (),
851- };
852-
853- if (fTaskGroup )
854- fTaskGroup ->Run (compressTask);
855- else
856- compressTask ();
904+ const auto bufSize = uncompressedSize + checksumSize;
905+ // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
906+ // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
907+ // the actual data size after recompressing.
908+ buffer = MakeUninitArray<std::uint8_t >(bufSize);
909+
910+ if (needsResealing) {
911+ RTaskVisitor{fTaskGroup }(RResealFunc{
912+ *srcColElement,
913+ *dstColElement,
914+ mergeData.fMergeOpts ,
915+ sealedPage,
916+ *fPageAlloc ,
917+ buffer.get (),
918+ bufSize,
919+ });
920+ } else {
921+ RTaskVisitor{fTaskGroup }(RChangeCompressionFunc{
922+ *srcColElement,
923+ *dstColElement,
924+ mergeData.fMergeOpts ,
925+ sealedPage,
926+ *fPageAlloc ,
927+ buffer.get (),
928+ bufSize,
929+ mergeData.fDestination .GetWriteOptions ()
930+ });
931+ }
857932 }
858933
859934 ++pageIdx;
0 commit comments