@@ -267,6 +267,40 @@ struct RChangeCompressionFunc {
267267 RPageStorage::RSealedPage &fSealedPage ;
268268 ROOT::Internal::RPageAllocator &fPageAlloc ;
269269 std::uint8_t *fBuffer ;
270+ std::size_t fBufSize ;
271+ const ROOT::RNTupleWriteOptions &fWriteOpts ;
272+
273+ void operator ()() const
274+ {
275+ assert (fSrcColElement .GetIdentifier () == fDstColElement .GetIdentifier ());
276+
277+ fSealedPage .VerifyChecksumIfEnabled ().ThrowOnError ();
278+
279+ const auto bytesPacked = fSrcColElement .GetPackedSize (fSealedPage .GetNElements ());
280+ // TODO: this buffer could be kept and reused across pages
281+ auto unzipBuf = MakeUninitArray<unsigned char >(bytesPacked);
282+ ROOT::Internal::RNTupleDecompressor::Unzip (fSealedPage .GetBuffer (), fSealedPage .GetDataSize (), bytesPacked,
283+ unzipBuf.get ());
284+
285+ const auto checksumSize = fWriteOpts .GetEnablePageChecksums () * sizeof (std::uint64_t );
286+ assert (fBufSize >= bytesPacked + checksumSize);
287+ auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip (unzipBuf.get (), bytesPacked,
288+ fMergeOptions .fCompressionSettings .value (), fBuffer );
289+
290+ fSealedPage = {fBuffer , nBytesZipped + checksumSize, fSealedPage .GetNElements (), fSealedPage .GetHasChecksum ()};
291+ fSealedPage .ChecksumIfEnabled ();
292+ }
293+ };
294+
295+ struct RResealFunc {
296+ const RColumnElementBase &fSrcColElement ;
297+ const RColumnElementBase &fDstColElement ;
298+ const RNTupleMergeOptions &fMergeOptions ;
299+
300+ RPageStorage::RSealedPage &fSealedPage ;
301+ ROOT::Internal::RPageAllocator &fPageAlloc ;
302+ std::uint8_t *fBuffer ;
303+ std::size_t fBufSize ;
270304
271305 void operator ()() const
272306 {
@@ -277,11 +311,25 @@ struct RChangeCompressionFunc {
277311 sealConf.fBuffer = fBuffer ;
278312 sealConf.fCompressionSettings = *fMergeOptions .fCompressionSettings ;
279313 sealConf.fWriteChecksum = fSealedPage .GetHasChecksum ();
314+ assert (fBufSize >= fSealedPage .GetDataSize () + fSealedPage .GetHasChecksum () * sizeof (std::uint64_t ));
280315 auto refSealedPage = RPageSink::SealPage (sealConf);
281316 fSealedPage = refSealedPage;
282317 }
283318};
284319
320+ struct RTaskVisitor {
321+ std::optional<ROOT::Experimental::TTaskGroup> &fGroup ;
322+
323+ template <typename T>
324+ void operator ()(T &&f)
325+ {
326+ if (fGroup )
327+ fGroup ->Run (f);
328+ else
329+ f ();
330+ }
331+ };
332+
285333struct RCommonField {
286334 const ROOT::RFieldDescriptor *fSrc ;
287335 const ROOT::RFieldDescriptor *fDst ;
@@ -778,15 +826,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
778826
779827 // Each column range potentially has a distinct compression settings
780828 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange (columnId).GetCompressionSettings ().value ();
781- // If either the compression or the encoding of the source doesn't match that of the destination, we need
782- // to reseal the page. Otherwise, if both match, we can fast merge.
829+
830+ // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
831+ // L1: compression and encoding of src and dest both match: we can simply copy the page
832+ // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
833+ // resealing it.
834+ // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
835+ // it.
836+ const bool compressionIsDifferent =
837+ colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value ();
783838 const bool needsResealing =
784- colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value () ||
785839 srcColElement->GetIdentifier ().fOnDiskType != dstColElement->GetIdentifier ().fOnDiskType ;
840+ const bool needsRecompressing = compressionIsDifferent || needsResealing;
786841
787- if (needsResealing && mergeData.fMergeOpts .fExtraVerbose ) {
842+ if (needsRecompressing && mergeData.fMergeOpts .fExtraVerbose ) {
788843 R__LOG_INFO (NTupleMergeLog ())
789- << " Resealing column " << column.fColumnName << " : { compression: " << colRangeCompressionSettings << " => "
844+ << (needsResealing ? " Resealing" : " Recompressing" ) << " column " << column.fColumnName
845+ << " : { compression: " << colRangeCompressionSettings << " => "
790846 << mergeData.fMergeOpts .fCompressionSettings .value ()
791847 << " , onDiskType: " << RColumnElementBase::GetColumnTypeName (srcColElement->GetIdentifier ().fOnDiskType )
792848 << " => " << RColumnElementBase::GetColumnTypeName (dstColElement->GetIdentifier ().fOnDiskType );
@@ -795,7 +851,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
795851 size_t pageBufferBaseIdx = sealedPageData.fBuffers .size ();
796852 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
797853 // bother reserving memory for them.
798- if (needsResealing )
854+ if (needsRecompressing )
799855 sealedPageData.fBuffers .resize (sealedPageData.fBuffers .size () + pages.GetPageInfos ().size ());
800856
801857 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
@@ -831,18 +887,37 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
831887 sealedPage.VerifyChecksumIfEnabled ().ThrowOnError ();
832888 R__ASSERT (onDiskPage && (onDiskPage->GetSize () == sealedPage.GetBufferSize ()));
833889
834- if (needsResealing ) {
890+ if (needsRecompressing ) {
835891 const auto uncompressedSize = srcColElement->GetSize () * sealedPage.GetNElements ();
836892 auto &buffer = sealedPageData.fBuffers [pageBufferBaseIdx + pageIdx];
837- buffer = MakeUninitArray<std::uint8_t >(uncompressedSize + checksumSize);
838- RChangeCompressionFunc compressTask{
839- *srcColElement, *dstColElement, mergeData.fMergeOpts , sealedPage, *fPageAlloc , buffer.get (),
840- };
841-
842- if (fTaskGroup )
843- fTaskGroup ->Run (compressTask);
844- else
845- compressTask ();
893+ const auto bufSize = uncompressedSize + checksumSize;
894+ // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
895+ // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
896+ // the actual data size after recompressing.
897+ buffer = MakeUninitArray<std::uint8_t >(bufSize);
898+
899+ if (needsResealing) {
900+ RTaskVisitor{fTaskGroup }(RResealFunc{
901+ *srcColElement,
902+ *dstColElement,
903+ mergeData.fMergeOpts ,
904+ sealedPage,
905+ *fPageAlloc ,
906+ buffer.get (),
907+ bufSize,
908+ });
909+ } else {
910+ RTaskVisitor{fTaskGroup }(RChangeCompressionFunc{
911+ *srcColElement,
912+ *dstColElement,
913+ mergeData.fMergeOpts ,
914+ sealedPage,
915+ *fPageAlloc ,
916+ buffer.get (),
917+ bufSize,
918+ mergeData.fDestination .GetWriteOptions ()
919+ });
920+ }
846921 }
847922
848923 ++pageIdx;
0 commit comments