Skip to content

Commit 6e1dff4

Browse files
committed
[ntuple] Parallelize page processing in RNTupleMerger
1 parent 52c6942 commit 6e1dff4

File tree

3 files changed

+84
-52
lines changed

3 files changed

+84
-52
lines changed

tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ struct RNTupleMergeOptions {
4242
/**
4343
* \class ROOT::Experimental::Internal::RNTupleMerger
4444
* \ingroup NTuple
45-
* \brief Given a set of RPageSources merge them into an RPageSink
45+
* \brief Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
46+
* This can also be used to change the compression of a single RNTuple by just passing a single source.
4647
*/
4748
// clang-format on
4849
class RNTupleMerger {

tree/ntuple/v7/inc/ROOT/RPageStorage.hxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public:
134134
SealedPageSequence_t::const_iterator fFirst;
135135
SealedPageSequence_t::const_iterator fLast;
136136

137+
RSealedPageGroup() = default;
137138
RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
138139
: fPhysicalColumnId(d), fFirst(b), fLast(e)
139140
{

tree/ntuple/v7/src/RNTupleMerger.cxx

Lines changed: 81 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* For the list of contributors see $ROOTSYS/README/CREDITS. *
1414
*************************************************************************/
1515

16+
#include "TROOT.h"
1617
#include <ROOT/RError.hxx>
1718
#include <ROOT/RNTuple.hxx>
1819
#include <ROOT/RNTupleDescriptor.hxx>
@@ -21,11 +22,12 @@
2122
#include <ROOT/RNTupleUtil.hxx>
2223
#include <ROOT/RPageStorageFile.hxx>
2324
#include <ROOT/RClusterPool.hxx>
25+
#include <ROOT/RNTupleSerialize.hxx>
26+
#include <ROOT/RNTupleZip.hxx>
27+
#include <ROOT/TTaskGroup.hxx>
2428
#include <TError.h>
2529
#include <TFile.h>
2630
#include <TKey.h>
27-
#include "ROOT/RNTupleSerialize.hxx"
28-
#include "ROOT/RNTupleZip.hxx"
2931

3032
#include <deque>
3133

@@ -92,6 +94,7 @@ try {
9294
// Now merge
9395
Internal::RNTupleMerger merger;
9496
Internal::RNTupleMergeOptions options;
97+
// TODO: set MergeOptions depending on function input
9598
merger.Merge(sourcePtrs, *destination, options);
9699

97100
// Provide the caller with a merged anchor object (even though we've already
@@ -179,8 +182,11 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
179182
}
180183

181184
std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
182-
RNTupleDecompressor decompressor;
183-
std::vector<unsigned char> zipBuffer;
185+
std::optional<TTaskGroup> taskGroup;
186+
#ifdef R__USE_IMT
187+
if (ROOT::IsImplicitMTEnabled())
188+
taskGroup = TTaskGroup();
189+
#endif
184190

185191
// Append the sources to the destination one-by-one
186192
for (const auto &source : sources) {
@@ -228,6 +234,7 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
228234
// invalidated.
229235
std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
230236
std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
237+
std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;
231238

232239
for (const auto &column : columns) {
233240

@@ -239,76 +246,99 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
239246
}
240247

241248
const auto &columnDesc = descriptor->GetColumnDescriptor(columnId);
242-
auto colElement = RColumnElementBase::Generate(columnDesc.GetModel().GetType());
249+
const auto colElement = RColumnElementBase::Generate(columnDesc.GetModel().GetType());
243250

244251
// Now get the pages for this column in this cluster
245252
const auto &pages = clusterDesc.GetPageRange(columnId);
246253

247254
RPageStorage::SealedPageSequence_t sealedPages;
255+
sealedPages.resize(pages.fPageInfos.size());
248256

249-
const auto colRangeCompressionSettings = cluster.GetColumnRange(columnId).fCompressionSettings;
257+
const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
250258
const bool needsCompressionChange =
251259
options.fCompressionSettings != -1 && colRangeCompressionSettings != options.fCompressionSettings;
252260

253-
std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;
254261
// If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
255262
// bother reserving memory for them.
263+
size_t pageBufferBaseIdx = sealedPageBuffers.size();
256264
if (colRangeCompressionSettings != 0)
257-
sealedPageBuffers.reserve(pages.fPageInfos.size());
265+
sealedPageBuffers.resize(sealedPageBuffers.size() + pages.fPageInfos.size());
258266

259-
std::uint64_t pageNo = 0;
260267
sealedPageGroups.reserve(sealedPageGroups.size() + pages.fPageInfos.size());
268+
269+
std::uint64_t pageNo = 0;
270+
261271
// Loop over the pages
262272
for (const auto &pageInfo : pages.fPageInfos) {
263-
ROnDiskPage::Key key{columnId, pageNo};
264-
auto onDiskPage = cluster->GetOnDiskPage(key);
265-
RPageStorage::RSealedPage sealedPage;
266-
sealedPage.SetNElements(pageInfo.fNElements);
267-
sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
268-
sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage +
269-
pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum);
270-
sealedPage.SetBuffer(onDiskPage->GetAddress());
271-
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
272-
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
273-
274-
// Change compression if needed
275-
if (needsCompressionChange) {
276-
// Step 1: prepare the source data.
277-
// Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was already
278-
// uncompressed.
279-
const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
280-
zipBuffer.resize(std::max<size_t>(uncompressedSize, zipBuffer.size()));
281-
RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), uncompressedSize,
282-
zipBuffer.data());
283-
284-
// Step 2: prepare the destination buffer.
285-
if (uncompressedSize != sealedPage.GetBufferSize()) {
286-
// source column range is compressed
287-
R__ASSERT(colRangeCompressionSettings != 0);
288-
289-
// We need to reallocate sealedPage's buffer because we are going to recompress the data
290-
// with a different algorithm/level. Since we don't know a priori how big that'll be, the
291-
// only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed data.
292-
R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize);
293-
const auto &newBuf = sealedPageBuffers.emplace_back(new unsigned char[uncompressedSize]);
294-
sealedPage.SetBuffer(newBuf.get());
295-
} else {
296-
// source column range is uncompressed. We can reuse the sealedPage's buffer since it's big enough.
297-
R__ASSERT(colRangeCompressionSettings == 0);
273+
auto taskFunc = [ // values in
274+
columnId, pageNo, cluster, needsCompressionChange, colRangeCompressionSettings,
275+
pageBufferBaseIdx,
276+
// const refs in
277+
&colElement, &pageInfo, &options,
278+
// refs out
279+
&sealedPages, &sealedPageBuffers]() {
280+
assert(pageNo < sealedPages.size());
281+
assert(sealedPages.size() == sealedPageBuffers.size());
282+
283+
ROnDiskPage::Key key{columnId, pageNo};
284+
auto onDiskPage = cluster->GetOnDiskPage(key);
285+
286+
RPageStorage::RSealedPage &sealedPage = sealedPages[pageNo];
287+
sealedPage.SetNElements(pageInfo.fNElements);
288+
sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
289+
sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage +
290+
pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum);
291+
sealedPage.SetBuffer(onDiskPage->GetAddress());
292+
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
293+
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
294+
295+
// Change compression if needed
296+
if (needsCompressionChange) {
297+
// Step 1: prepare the source data.
298+
// Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was
299+
// already uncompressed.
300+
const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
301+
auto zipBuffer = std::make_unique<unsigned char[]>(uncompressedSize);
302+
RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), uncompressedSize,
303+
zipBuffer.get());
304+
305+
// Step 2: prepare the destination buffer.
306+
if (uncompressedSize != sealedPage.GetBufferSize()) {
307+
// source column range is compressed
308+
R__ASSERT(colRangeCompressionSettings != 0);
309+
310+
// We need to reallocate sealedPage's buffer because we are going to recompress the data
311+
// with a different algorithm/level. Since we don't know a priori how big that'll be, the
312+
// only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed
313+
// data.
314+
R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize);
315+
sealedPageBuffers[pageBufferBaseIdx + pageNo] =
316+
std::make_unique<unsigned char[]>(uncompressedSize);
317+
sealedPage.SetBuffer(sealedPageBuffers[pageNo].get());
318+
} else {
319+
// source column range is uncompressed. We can reuse the sealedPage's buffer since it's big
320+
// enough.
321+
R__ASSERT(colRangeCompressionSettings == 0);
322+
}
323+
324+
const auto newNBytes =
325+
RNTupleCompressor::Zip(zipBuffer.get(), uncompressedSize, options.fCompressionSettings,
326+
const_cast<void *>(sealedPage.GetBuffer()));
327+
sealedPage.SetBufferSize(newNBytes);
298328
}
329+
};
299330

300-
const auto newNBytes =
301-
RNTupleCompressor::Zip(zipBuffer.data(), uncompressedSize, options.fCompressionSettings,
302-
const_cast<void *>(sealedPage.GetBuffer()));
303-
sealedPage.SetBufferSize(newNBytes);
304-
}
305-
306-
sealedPages.push_back(std::move(sealedPage));
331+
if (taskGroup)
332+
taskGroup->Run(taskFunc);
333+
else
334+
taskFunc();
307335

308336
++pageNo;
309337

310338
} // end of loop over pages
311339

340+
if (taskGroup)
341+
taskGroup->Wait();
312342
sealedPagesV.push_back(std::move(sealedPages));
313343
sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
314344
sealedPagesV.back().cend());

0 commit comments

Comments
 (0)