Skip to content

Commit ed1bba5

Browse files
committed
Handle RNTuple optimizations
- properly pass optimizations to RNTupleWriter - can specify per Field optimizations
1 parent 11cfc9f commit ed1bba5

File tree

5 files changed

+175
-59
lines changed

5 files changed

+175
-59
lines changed

FWIO/RNTupleTempOutput/interface/RNTupleTempOutputModule.h

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ namespace edm::rntuple_temp {
8484
AuxItemArray const& auxItems() const { return auxItems_; }
8585

8686
struct OutputItem {
87-
explicit OutputItem(ProductDescription const* bd, EDGetToken const& token, int splitLevel, int basketSize);
87+
explicit OutputItem(ProductDescription const* bd, EDGetToken const& token, bool streamerProduct);
8888

8989
BranchID branchID() const { return productDescription_->branchID(); }
9090
std::string const& branchName() const { return productDescription_->branchName(); }
@@ -97,31 +97,18 @@ namespace edm::rntuple_temp {
9797
void const*& product() { return product_; }
9898
void const** productPtr() { return &product_; }
9999
void setProduct(void const* iProduct) { product_ = iProduct; }
100-
int splitLevel() const { return splitLevel_; }
101-
int basketSize() const { return basketSize_; }
100+
101+
bool streamerProduct() const { return streamerProduct_; }
102102

103103
private:
104104
ProductDescription const* productDescription_;
105105
EDGetToken token_;
106106
void const* product_;
107-
int splitLevel_;
108-
int basketSize_;
107+
bool streamerProduct_;
109108
};
110109

111110
using OutputItemList = std::vector<OutputItem>;
112111

113-
struct SpecialSplitLevelForBranch {
114-
SpecialSplitLevelForBranch(std::string const& iBranchName, int iSplitLevel)
115-
: branch_(convert(iBranchName)),
116-
splitLevel_(iSplitLevel < 1 ? 1 : iSplitLevel) //minimum is 1
117-
{}
118-
bool match(std::string const& iBranchName) const;
119-
std::regex convert(std::string const& iGlobBranchExpression) const;
120-
121-
std::regex branch_;
122-
int splitLevel_;
123-
};
124-
125112
struct AliasForBranch {
126113
AliasForBranch(std::string const& iBranchName, std::string const& iAlias)
127114
: branch_{convert(iBranchName)}, alias_{iAlias} {}
@@ -133,6 +120,16 @@ namespace edm::rntuple_temp {
133120
std::string alias_;
134121
};
135122

123+
struct SetStreamerForDataProduct {
124+
SetStreamerForDataProduct(std::string const& iName, bool iUseStreamer)
125+
: branch_(convert(iName)), useStreamer_(iUseStreamer) {}
126+
bool match(std::string const& iName) const;
127+
std::regex convert(std::string const& iGlobBranchExpression) const;
128+
129+
std::regex branch_;
130+
bool useStreamer_;
131+
};
132+
136133
std::vector<OutputItemList> const& selectedOutputItemList() const { return selectedOutputItemList_; }
137134

138135
std::vector<OutputItemList>& selectedOutputItemList() { return selectedOutputItemList_; }
@@ -141,6 +138,9 @@ namespace edm::rntuple_temp {
141138

142139
std::vector<AliasForBranch> const& aliasForBranches() const { return aliasForBranches_; }
143140

141+
std::vector<std::string> const& noSplitSubFields() const { return noSplitSubFields_; }
142+
bool allProductsUseStreamer() const { return allProductsUseStreamer_; }
143+
144144
protected:
145145
///allow inheriting classes to override but still be able to call this method in the overridden version
146146
bool shouldWeCloseFile() const override;
@@ -186,7 +186,6 @@ namespace edm::rntuple_temp {
186186
RootServiceChecker rootServiceChecker_;
187187
AuxItemArray auxItems_;
188188
std::vector<OutputItemList> selectedOutputItemList_;
189-
std::vector<SpecialSplitLevelForBranch> specialSplitLevelForBranches_;
190189
std::vector<AliasForBranch> aliasForBranches_;
191190
std::unique_ptr<edm::ProductRegistry const> reg_;
192191
std::string const fileName_;
@@ -210,6 +209,10 @@ namespace edm::rntuple_temp {
210209
std::string statusFileName_;
211210
std::string overrideGUID_;
212211
std::vector<std::string> processesWithSelectedMergeableRunProducts_;
212+
213+
std::vector<std::string> noSplitSubFields_;
214+
std::vector<SetStreamerForDataProduct> overrideStreamer_;
215+
bool allProductsUseStreamer_;
213216
};
214217
} // namespace edm::rntuple_temp
215218

FWIO/RNTupleTempOutput/src/RNTupleTempOutputModule.cc

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,56 @@
2929
#include <sstream>
3030
#include "boost/algorithm/string.hpp"
3131

32-
namespace {
33-
edm::rntuple_temp::RNTupleTempOutputModule::Optimizations fromConfig(edm::ParameterSet const& iConfig) {
34-
edm::rntuple_temp::RNTupleTempOutputModule::Optimizations opts;
35-
opts.approxZippedClusterSize = iConfig.getUntrackedParameter<unsigned long long>("approxZippedClusterSize");
36-
opts.maxUnzippedClusterSize = iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedClusterSize");
37-
opts.initialUnzippedPageSize = iConfig.getUntrackedParameter<unsigned long long>("initialUnzippedPageSize");
38-
opts.maxUnzippedPageSize = iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedPageSize");
39-
opts.pageBufferBudget = iConfig.getUntrackedParameter<unsigned long long>("pageBufferBudget");
40-
opts.useBufferedWrite = iConfig.getUntrackedParameter<bool>("useBufferedWrite");
41-
opts.useDirectIO = iConfig.getUntrackedParameter<bool>("useDirectIO");
42-
return opts;
32+
namespace edm::rntuple_temp {
33+
inline bool RNTupleTempOutputModule::SetStreamerForDataProduct::match(std::string const& iBranchName) const {
34+
return std::regex_match(iBranchName, branch_);
35+
}
36+
37+
std::regex RNTupleTempOutputModule::SetStreamerForDataProduct::convert(
38+
std::string const& iGlobBranchExpression) const {
39+
std::string tmp(iGlobBranchExpression);
40+
boost::replace_all(tmp, "*", ".*");
41+
boost::replace_all(tmp, "?", ".");
42+
return std::regex(tmp);
4343
}
44-
} // namespace
44+
45+
namespace {
46+
std::vector<RNTupleTempOutputModule::SetStreamerForDataProduct> fromConfig(
47+
std::vector<edm::ParameterSet> const& iConfig) {
48+
std::vector<RNTupleTempOutputModule::SetStreamerForDataProduct> returnValue;
49+
returnValue.reserve(iConfig.size());
50+
51+
for (auto const& prod : iConfig) {
52+
returnValue.emplace_back(prod.getUntrackedParameter<std::string>("product"),
53+
prod.getUntrackedParameter<bool>("useStreamer"));
54+
}
55+
return returnValue;
56+
}
57+
58+
std::optional<bool> useStreamer(std::string const& iName,
59+
std::vector<RNTupleTempOutputModule::SetStreamerForDataProduct> const& iSpecial) {
60+
auto nameNoDot = iName.substr(0, iName.size() - 1);
61+
for (auto const& prod : iSpecial) {
62+
if (prod.match(nameNoDot)) {
63+
return prod.useStreamer_;
64+
}
65+
}
66+
return {};
67+
}
68+
69+
edm::rntuple_temp::RNTupleTempOutputModule::Optimizations fromConfig(edm::ParameterSet const& iConfig) {
70+
edm::rntuple_temp::RNTupleTempOutputModule::Optimizations opts;
71+
opts.approxZippedClusterSize = iConfig.getUntrackedParameter<unsigned long long>("approxZippedClusterSize");
72+
opts.maxUnzippedClusterSize = iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedClusterSize");
73+
opts.initialUnzippedPageSize = iConfig.getUntrackedParameter<unsigned long long>("initialUnzippedPageSize");
74+
opts.maxUnzippedPageSize = iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedPageSize");
75+
opts.pageBufferBudget = iConfig.getUntrackedParameter<unsigned long long>("pageBufferBudget");
76+
opts.useBufferedWrite = iConfig.getUntrackedParameter<bool>("useBufferedWrite");
77+
opts.useDirectIO = iConfig.getUntrackedParameter<bool>("useDirectIO");
78+
return opts;
79+
}
80+
} // namespace
81+
} // namespace edm::rntuple_temp
4582
namespace edm::rntuple_temp {
4683
RNTupleTempOutputModule::RNTupleTempOutputModule(ParameterSet const& pset)
4784
: edm::one::OutputModuleBase::OutputModuleBase(pset),
@@ -65,7 +102,14 @@ namespace edm::rntuple_temp {
65102
productDependencies_(),
66103
rootOutputFile_(),
67104
statusFileName_(),
68-
overrideGUID_(pset.getUntrackedParameter<std::string>("overrideGUID")) {
105+
overrideGUID_(pset.getUntrackedParameter<std::string>("overrideGUID")),
106+
noSplitSubFields_(pset.getUntrackedParameterSet("fieldLevelOptimizations")
107+
.getUntrackedParameter<std::vector<std::string>>("noSplitSubFields")),
108+
overrideStreamer_(
109+
fromConfig(pset.getUntrackedParameterSet("fieldLevelOptimizations")
110+
.getUntrackedParameter<std::vector<edm::ParameterSet>>("overrideDataProductStreamer"))),
111+
allProductsUseStreamer_(
112+
pset.getUntrackedParameterSet("fieldLevelOptimizations").getUntrackedParameter<bool>("useStreamer")) {
69113
if (pset.getUntrackedParameter<bool>("writeStatusFile")) {
70114
std::ostringstream statusfilename;
71115
statusfilename << moduleLabel_ << '_' << getpid();
@@ -107,9 +151,8 @@ namespace edm::rntuple_temp {
107151

108152
RNTupleTempOutputModule::OutputItem::OutputItem(ProductDescription const* bd,
109153
EDGetToken const& token,
110-
int splitLevel,
111-
int basketSize)
112-
: productDescription_(bd), token_(token), product_(nullptr), splitLevel_(splitLevel), basketSize_(basketSize) {}
154+
bool streamerProduct)
155+
: productDescription_(bd), token_(token), product_(nullptr), streamerProduct_(streamerProduct) {}
113156

114157
namespace {
115158
std::regex convertBranchExpression(std::string const& iGlobBranchExpression) {
@@ -120,15 +163,6 @@ namespace edm::rntuple_temp {
120163
}
121164
} // namespace
122165

123-
inline bool RNTupleTempOutputModule::SpecialSplitLevelForBranch::match(std::string const& iBranchName) const {
124-
return std::regex_match(iBranchName, branch_);
125-
}
126-
127-
std::regex RNTupleTempOutputModule::SpecialSplitLevelForBranch::convert(
128-
std::string const& iGlobBranchExpression) const {
129-
return convertBranchExpression(iGlobBranchExpression);
130-
}
131-
132166
bool RNTupleTempOutputModule::AliasForBranch::match(std::string const& iBranchName) const {
133167
return std::regex_match(iBranchName, branch_);
134168
}
@@ -144,14 +178,12 @@ namespace edm::rntuple_temp {
144178

145179
// Fill outputItemList with an entry for each branch.
146180
for (auto const& kept : keptVector) {
147-
int splitLevel = ProductDescription::invalidSplitLevel;
148-
int basketSize = ProductDescription::invalidBasketSize;
149-
150181
ProductDescription const& prod = *kept.first;
151182
if (branchType == InProcess && processName != prod.processName()) {
152183
continue;
153184
}
154-
outputItemList.emplace_back(&prod, kept.second, splitLevel, basketSize);
185+
bool streamerProduct = allProductsUseStreamer_ or useStreamer(prod.branchName(), overrideStreamer_);
186+
outputItemList.emplace_back(&prod, kept.second, streamerProduct);
155187
}
156188
}
157189

@@ -463,6 +495,26 @@ namespace edm::rntuple_temp {
463495
desc.addUntracked("rntupleWriteOptions", optimizations)
464496
->setComment("Options to control RNTuple specific output features.");
465497
}
498+
{
499+
ParameterSetDescription fieldLevel;
500+
fieldLevel.addUntracked<std::vector<std::string>>("noSplitSubFields", {})
501+
->setComment(
502+
"fully qualified subfield names for fields which should not be split. A single value of 'all' means all "
503+
"possible subfields will be unsplit");
504+
fieldLevel.addUntracked<bool>("useStreamer", false)
505+
->setComment("Use streamer storage for top level fields when storing data products");
506+
507+
{
508+
ParameterSetDescription specialStreamer;
509+
specialStreamer.addUntracked<std::string>("product")->setComment(
510+
"Name of data product needing a special split setting. The name can contain wildcards '*' and '?'");
511+
specialStreamer.addUntracked<bool>("useStreamer", true)
512+
->setComment("Explicitly set if should or should not use streamer (default is to use streamer)");
513+
fieldLevel.addVPSetUntracked("overrideDataProductStreamer", specialStreamer, {});
514+
}
515+
desc.addUntracked("fieldLevelOptimizations", fieldLevel)
516+
->setComment("Options to control specializing how Fields are stored.");
517+
}
466518
OutputModule::fillDescription(desc);
467519
}
468520

FWIO/RNTupleTempOutput/src/RootOutputFile.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,8 @@ namespace edm::rntuple_temp {
180180
theRNTuple->addField(fixName(desc.branchName()),
181181
desc.wrappedName(),
182182
item.productPtr(),
183-
item.splitLevel(),
184-
item.basketSize(),
185-
item.productDescription()->produced());
183+
item.streamerProduct() or om_->allProductsUseStreamer(),
184+
om_->noSplitSubFields());
186185
//make sure we always store product registry info for all branches we create
187186
branchesWithStoredHistory_.insert(item.branchID());
188187
}

FWIO/RNTupleTempOutput/src/RootOutputRNTuple.cc

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,80 @@ namespace edm {
129129
options.SetPageBufferBudget(config.pageBufferBudget);
130130
options.SetUseBufferedWrite(config.useBufferedWrite);
131131
options.SetUseDirectIO(config.useDirectIO);
132-
writer_ = ROOT::RNTupleWriter::Append(std::move(model_), name_, *filePtr_, ROOT::RNTupleWriteOptions());
132+
writer_ = ROOT::RNTupleWriter::Append(std::move(model_), name_, *filePtr_, options);
133133
}
134134

135+
namespace {
136+
/* By default RNTuple will take a multi-byte intrinsic data type and break
137+
it into multiple output fields to separate the high-bytes from the low-bytes (or mantessa from exponent).
138+
This typically allows for better compression. Empirically we have found that some important
139+
member data of some classes actually take more space on disk when this is done.
140+
This function allows one to override the default RNTuple behavior and instead store
141+
all bytes of a data type in one field. To do that one must find the storage type (typeName) and
142+
explicitly pass the correct variable to `SetColumnRepresentatives`).
143+
*/
144+
void noSplitField(ROOT::RFieldBase& iField) {
145+
auto const& typeName = iField.GetTypeName();
146+
if (typeName == "std::uint16_t") {
147+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt16}});
148+
} else if (typeName == "std::uint32_t") {
149+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt32}});
150+
} else if (typeName == "std::uint64_t") {
151+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt64}});
152+
} else if (typeName == "std::int16_t") {
153+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt16}});
154+
} else if (typeName == "std::int32_t") {
155+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt32}});
156+
} else if (typeName == "std::int64_t") {
157+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt64}});
158+
} else if (typeName == "float") {
159+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal32}});
160+
} else if (typeName == "double") {
161+
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal64}});
162+
}
163+
}
164+
165+
void findSubFieldsForNoSplitThenApply(ROOT::RFieldBase& iField, std::vector<std::string> const& iNoSplitFields) {
166+
for (auto const& name : iNoSplitFields) {
167+
if (name.starts_with(iField.GetFieldName())) {
168+
bool found = false;
169+
for (auto& subfield : iField) {
170+
if (subfield.GetQualifiedFieldName() == name) {
171+
found = true;
172+
noSplitField(subfield);
173+
break;
174+
}
175+
}
176+
if (not found) {
177+
throw edm::Exception(edm::errors::Configuration)
178+
<< "The data product was found but the requested subfield '" << name << "' is not part of the class";
179+
}
180+
}
181+
}
182+
}
183+
} // namespace
184+
135185
void RootOutputRNTuple::addField(std::string const& branchName,
136186
std::string const& className,
137187
void const** pProd,
138-
int splitLevel,
139-
int basketSize,
140-
bool produced) {
141-
auto field = ROOT::RFieldBase::Create(branchName, className).Unwrap();
142-
model_->AddField(std::move(field));
188+
bool useStreamer,
189+
std::vector<std::string> const& iNoSplitFields) {
190+
const bool noSplitSubFields = (iNoSplitFields.size() == 1 and iNoSplitFields[0] == "all") ? true : false;
191+
if (useStreamer) {
192+
auto field = std::make_unique<ROOT::RStreamerField>(branchName, className);
193+
model_->AddField(std::move(field));
194+
} else {
195+
auto field = ROOT::RFieldBase::Create(branchName, className).Unwrap();
196+
if (noSplitSubFields) {
197+
//use the 'conventional' way to store fields
198+
for (auto& subfield : *field) {
199+
noSplitField(subfield);
200+
}
201+
} else if (not iNoSplitFields.empty()) {
202+
findSubFieldsForNoSplitThenApply(*field, iNoSplitFields);
203+
}
204+
model_->AddField(std::move(field));
205+
}
143206
producedBranches_.push_back(model_->GetToken(branchName));
144207
producedBranchPointers_.push_back(const_cast<void**>(pProd));
145208
}

FWIO/RNTupleTempOutput/src/RootOutputRNTuple.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ namespace edm {
6262
void addField(std::string const& branchName,
6363
std::string const& className,
6464
void const** pProd,
65-
int splitLevel,
66-
int basketSize,
67-
bool produced);
65+
bool useStreamer,
66+
std::vector<std::string> const& iNoSplitFields);
6867

6968
void fill();
7069

0 commit comments

Comments
 (0)