Skip to content

Commit 52c6942

Browse files
committed
[ntuple] add logic to change compression in RNTupleMerger
1 parent 9b50b56 commit 52c6942

File tree

3 files changed

+67
-7
lines changed

3 files changed

+67
-7
lines changed

tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#ifndef ROOT7_RNTupleMerger
1717
#define ROOT7_RNTupleMerger
1818

19+
#include "Compression.h"
1920
#include <ROOT/RError.hxx>
2021
#include <ROOT/RNTupleDescriptor.hxx>
2122
#include <ROOT/RNTupleUtil.hxx>
@@ -30,6 +31,13 @@ namespace ROOT {
3031
namespace Experimental {
3132
namespace Internal {
3233

34+
struct RNTupleMergeOptions {
35+
/// If `fCompressionSettings == -1` (the default), the merger will not change the compression
36+
/// of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
37+
/// compression algorithm and level.
38+
int fCompressionSettings = -1;
39+
};
40+
3341
// clang-format off
3442
/**
3543
* \class ROOT::Experimental::Internal::RNTupleMerger
@@ -74,7 +82,8 @@ private:
7482

7583
public:
7684
/// Merge a given set of sources into the destination
77-
void Merge(std::span<RPageSource *> sources, RPageSink &destination);
85+
void Merge(std::span<RPageSource *> sources, RPageSink &destination,
86+
const RNTupleMergeOptions &options = RNTupleMergeOptions());
7887

7988
}; // end of class RNTupleMerger
8089

tree/ntuple/v7/inc/ROOT/RPageStorage.hxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -648,11 +648,11 @@ public:
648648
/// Another version of PopulatePage that allows to specify cluster-relative indexes
649649
virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;
650650

651-
/// Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage. The sealed page
652-
/// can be used subsequently in a call to RPageSink::CommitSealedPage.
653-
/// The fSize and fNElements member of the sealedPage parameters are always set. If sealedPage.fBuffer is nullptr,
651+
/// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page
652+
/// can be used subsequently in a call to `RPageSink::CommitSealedPage`.
653+
/// The `fSize` and `fNElements` member of the sealedPage parameters are always set. If `sealedPage.fBuffer` is `nullptr`,
654654
/// no data will be copied but the returned size information can be used by the caller to allocate a large enough
655-
/// buffer and call LoadSealedPage again.
655+
/// buffer and call `LoadSealedPage` again.
656656
virtual void
657657
LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0;
658658

tree/ntuple/v7/src/RNTupleMerger.cxx

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include <TError.h>
2525
#include <TFile.h>
2626
#include <TKey.h>
27+
#include "ROOT/RNTupleSerialize.hxx"
28+
#include "ROOT/RNTupleZip.hxx"
2729

2830
#include <deque>
2931

@@ -89,7 +91,8 @@ try {
8991

9092
// Now merge
9193
Internal::RNTupleMerger merger;
92-
merger.Merge(sourcePtrs, *destination);
94+
Internal::RNTupleMergeOptions options;
95+
merger.Merge(sourcePtrs, *destination, options);
9396

9497
// Provide the caller with a merged anchor object (even though we've already
9598
// written it).
@@ -165,7 +168,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::AddColumnsFromField(std::vecto
165168
}
166169

167170
////////////////////////////////////////////////////////////////////////////////
168-
void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination)
171+
void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination,
172+
const RNTupleMergeOptions &options)
169173
{
170174
std::vector<RColumnInfo> columns;
171175
RCluster::ColumnSet_t columnSet;
@@ -175,6 +179,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
175179
}
176180

177181
std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
182+
RNTupleDecompressor decompressor;
183+
std::vector<unsigned char> zipBuffer;
178184

179185
// Append the sources to the destination one-by-one
180186
for (const auto &source : sources) {
@@ -232,11 +238,24 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
232238
continue;
233239
}
234240

241+
const auto &columnDesc = descriptor->GetColumnDescriptor(columnId);
242+
auto colElement = RColumnElementBase::Generate(columnDesc.GetModel().GetType());
243+
235244
// Now get the pages for this column in this cluster
236245
const auto &pages = clusterDesc.GetPageRange(columnId);
237246

238247
RPageStorage::SealedPageSequence_t sealedPages;
239248

249+
const auto colRangeCompressionSettings = cluster.GetColumnRange(columnId).fCompressionSettings;
250+
const bool needsCompressionChange =
251+
options.fCompressionSettings != -1 && colRangeCompressionSettings != options.fCompressionSettings;
252+
253+
std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;
254+
// If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
255+
// bother reserving memory for them.
256+
if (colRangeCompressionSettings != 0)
257+
sealedPageBuffers.reserve(pages.fPageInfos.size());
258+
240259
std::uint64_t pageNo = 0;
241260
sealedPageGroups.reserve(sealedPageGroups.size() + pages.fPageInfos.size());
242261
// Loop over the pages
@@ -252,6 +271,38 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
252271
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
253272
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
254273

274+
// Change compression if needed
275+
if (needsCompressionChange) {
276+
// Step 1: prepare the source data.
277+
// Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was already
278+
// uncompressed.
279+
const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
280+
zipBuffer.resize(std::max<size_t>(uncompressedSize, zipBuffer.size()));
281+
RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), uncompressedSize,
282+
zipBuffer.data());
283+
284+
// Step 2: prepare the destination buffer.
285+
if (uncompressedSize != sealedPage.GetBufferSize()) {
286+
// source column range is compressed
287+
R__ASSERT(colRangeCompressionSettings != 0);
288+
289+
// We need to reallocate sealedPage's buffer because we are going to recompress the data
290+
// with a different algorithm/level. Since we don't know a priori how big that'll be, the
291+
// only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed data.
292+
R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize);
293+
const auto &newBuf = sealedPageBuffers.emplace_back(new unsigned char[uncompressedSize]);
294+
sealedPage.SetBuffer(newBuf.get());
295+
} else {
296+
// source column range is uncompressed. We can reuse the sealedPage's buffer since it's big enough.
297+
R__ASSERT(colRangeCompressionSettings == 0);
298+
}
299+
300+
const auto newNBytes =
301+
RNTupleCompressor::Zip(zipBuffer.data(), uncompressedSize, options.fCompressionSettings,
302+
const_cast<void *>(sealedPage.GetBuffer()));
303+
sealedPage.SetBufferSize(newNBytes);
304+
}
305+
255306
sealedPages.push_back(std::move(sealedPage));
256307

257308
++pageNo;

0 commit comments

Comments
 (0)