Skip to content

Commit 5b9a212

Browse files
authored
Merge pull request #48088 from Dr15Jones/ttreeFill
Allow use of TTree::GetEntry in PoolSource
2 parents 6406cd8 + 7618354 commit 5b9a212

29 files changed

+819
-520
lines changed

DataFormats/Common/interface/EDProductGetter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ namespace edm {
9797

9898
///These can only be used internally by the framework
9999
static EDProductGetter const* switchProductGetter(EDProductGetter const*);
100+
static void setMultiThreadProductGetter(EDProductGetter const*);
101+
static void unsetMultiThreadProductGetter();
100102
static void assignEDProductGetter(EDProductGetter const*&);
101103

102104
private:

DataFormats/Common/interface/RefCoreStreamer.h

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,29 @@ namespace edm {
3333
};
3434

3535
void setRefCoreStreamerInTClass();
36-
void setRefCoreStreamer(bool resetAll = false);
37-
EDProductGetter const* setRefCoreStreamer(EDProductGetter const* ep);
36+
37+
class RefCoreStreamerGuard {
38+
public:
39+
RefCoreStreamerGuard(EDProductGetter const* ep) { setRefCoreStreamer(ep); }
40+
~RefCoreStreamerGuard() { unsetRefCoreStreamer(); }
41+
RefCoreStreamerGuard(RefCoreStreamerGuard const&) = delete;
42+
RefCoreStreamerGuard& operator=(RefCoreStreamerGuard const&) = delete;
43+
RefCoreStreamerGuard(RefCoreStreamerGuard&&) = delete;
44+
RefCoreStreamerGuard& operator=(RefCoreStreamerGuard&&) = delete;
45+
46+
private:
47+
static void unsetRefCoreStreamer();
48+
static EDProductGetter const* setRefCoreStreamer(EDProductGetter const* ep);
49+
};
50+
class MultiThreadRefCoreStreamerGuard {
51+
public:
52+
MultiThreadRefCoreStreamerGuard(EDProductGetter const* ep) { setRefCoreStreamer(ep); }
53+
~MultiThreadRefCoreStreamerGuard() { unsetRefCoreStreamer(); }
54+
55+
private:
56+
static void setRefCoreStreamer(EDProductGetter const* ep);
57+
static void unsetRefCoreStreamer();
58+
};
59+
3860
} // namespace edm
3961
#endif

DataFormats/Common/src/EDProductGetter.cc

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
//
1212

1313
// system include files
14+
#include <atomic>
1415

1516
// user include files
1617
#include "DataFormats/Common/interface/EDProductGetter.h"
1718
#include "DataFormats/Provenance/interface/ProductID.h"
1819
#include "FWCore/Utilities/interface/EDMException.h"
20+
#include "FWCore/Utilities/interface/Likely.h"
1921

2022
namespace edm {
2123
//
@@ -75,16 +77,32 @@ namespace edm {
7577
}
7678

7779
thread_local EDProductGetter const* s_productGetter = nullptr;
80+
static std::atomic<EDProductGetter const*> s_multiThreadProductGetter{nullptr};
7881
EDProductGetter const* EDProductGetter::switchProductGetter(EDProductGetter const* iNew) {
7982
//std::cout <<"switch from "<<s_productGetter<<" to "<<iNew<<std::endl;
8083
EDProductGetter const* old = s_productGetter;
8184
s_productGetter = iNew;
8285
return old;
8386
}
87+
88+
void EDProductGetter::setMultiThreadProductGetter(EDProductGetter const* prodGetter) {
89+
EDProductGetter const* expected = nullptr;
90+
while (not s_multiThreadProductGetter.compare_exchange_strong(expected, prodGetter, std::memory_order_acq_rel)) {
91+
expected = nullptr;
92+
};
93+
}
94+
95+
void EDProductGetter::unsetMultiThreadProductGetter() {
96+
s_multiThreadProductGetter.store(nullptr, std::memory_order_release);
97+
}
98+
8499
void EDProductGetter::assignEDProductGetter(EDProductGetter const*& iGetter) {
85100
//std::cout <<"assign "<<s_productGetter<<std::endl;
86-
87-
iGetter = s_productGetter;
101+
if LIKELY (s_productGetter != nullptr) {
102+
iGetter = s_productGetter;
103+
return;
104+
}
105+
iGetter = s_multiThreadProductGetter.load(std::memory_order_acquire);
88106
}
89107

90108
} // namespace edm

DataFormats/Common/src/RefCoreStreamer.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,22 @@ namespace edm {
8080
}
8181
}
8282

83-
void setRefCoreStreamer(bool) { EDProductGetter::switchProductGetter(nullptr); }
83+
void RefCoreStreamerGuard::unsetRefCoreStreamer() { EDProductGetter::switchProductGetter(nullptr); }
8484

85-
EDProductGetter const* setRefCoreStreamer(EDProductGetter const* ep) {
85+
EDProductGetter const* RefCoreStreamerGuard::setRefCoreStreamer(EDProductGetter const* ep) {
8686
EDProductGetter const* returnValue = nullptr;
8787
if (ep != nullptr) {
8888
returnValue = edm::EDProductGetter::switchProductGetter(ep);
8989
}
9090
return returnValue;
9191
}
92+
93+
void MultiThreadRefCoreStreamerGuard::setRefCoreStreamer(EDProductGetter const* ep) {
94+
edm::EDProductGetter::setMultiThreadProductGetter(ep);
95+
}
96+
97+
void MultiThreadRefCoreStreamerGuard::unsetRefCoreStreamer() {
98+
edm::EDProductGetter::unsetMultiThreadProductGetter();
99+
}
100+
92101
} // namespace edm

DataFormats/FWLite/src/DataGetterHelper.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ namespace fwlite {
130130
////END OF WORK AROUND
131131

132132
if (tcUse_) {
133-
TTreeCache* tcache = dynamic_cast<TTreeCache*>(branchMap_->getFile()->GetCacheRead());
133+
TTreeCache* tcache = dynamic_cast<TTreeCache*>(branchMap_->getFile()->GetCacheRead(nullptr));
134134

135135
if (nullptr != tcache) {
136136
if (!tcTrained_) {

FWCore/Framework/interface/ProductProvenanceRetriever.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ namespace edm {
3232
ModuleCallingContext const* moduleCallingContext,
3333
unsigned int transitionIndex,
3434
std::atomic<const std::set<ProductProvenance>*>& writeTo) const noexcept = 0;
35+
36+
virtual void unsafe_fillProvenance(unsigned int transitionIndex) const;
3537
};
3638

3739
class ProductProvenanceRetriever : public ProductProvenanceLookup {
3840
public:
3941
explicit ProductProvenanceRetriever(unsigned int iTransitionIndex);
4042
ProductProvenanceRetriever(unsigned int iTransitionIndex, edm::ProductRegistry const&);
41-
explicit ProductProvenanceRetriever(std::unique_ptr<ProvenanceReaderBase> reader);
43+
explicit ProductProvenanceRetriever(unsigned int iTransitionIndex, std::unique_ptr<ProvenanceReaderBase> reader);
4244

4345
ProductProvenanceRetriever& operator=(ProductProvenanceRetriever const&) = delete;
4446

@@ -52,10 +54,13 @@ namespace edm {
5254

5355
void readProvenanceAsync(WaitingTaskHolder task, ModuleCallingContext const* moduleCallingContext) const noexcept;
5456

57+
// Used in prompt reading mode to fill the branch at the same time
58+
// when all event data is read
59+
void unsafe_fillProvenance();
60+
5561
private:
5662
std::unique_ptr<const std::set<ProductProvenance>> readProvenance() const final;
5763
const ProductProvenanceLookup* nextRetriever() const final { return nextRetriever_.get(); }
58-
void setTransitionIndex(unsigned int transitionIndex) { transitionIndex_ = transitionIndex; }
5964

6065
edm::propagate_const<std::shared_ptr<ProductProvenanceRetriever>> nextRetriever_;
6166
std::shared_ptr<const ProvenanceReaderBase> provenanceReader_;

FWCore/Framework/src/ProductProvenanceRetriever.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,21 @@
1010
*/
1111

1212
namespace edm {
13+
void ProvenanceReaderBase::unsafe_fillProvenance(unsigned int transitionIndex) const {}
14+
1315
ProductProvenanceRetriever::ProductProvenanceRetriever(unsigned int iTransitionIndex)
1416
: ProductProvenanceLookup(), nextRetriever_(), provenanceReader_(), transitionIndex_(iTransitionIndex) {}
1517

1618
ProductProvenanceRetriever::ProductProvenanceRetriever(unsigned int iTransitionIndex,
1719
edm::ProductRegistry const& iReg)
1820
: ProductProvenanceLookup(iReg), nextRetriever_(), provenanceReader_(), transitionIndex_(iTransitionIndex) {}
1921

20-
ProductProvenanceRetriever::ProductProvenanceRetriever(std::unique_ptr<ProvenanceReaderBase> reader)
22+
ProductProvenanceRetriever::ProductProvenanceRetriever(unsigned int iTransitionIndex,
23+
std::unique_ptr<ProvenanceReaderBase> reader)
2124
: ProductProvenanceLookup(),
2225
nextRetriever_(),
2326
provenanceReader_(reader.release()),
24-
transitionIndex_(std::numeric_limits<unsigned int>::max()) {
27+
transitionIndex_(iTransitionIndex) {
2528
assert(provenanceReader_);
2629
}
2730

@@ -86,5 +89,9 @@ namespace edm {
8689
parentProcessRetriever_ = &provRetriever;
8790
}
8891

92+
void ProductProvenanceRetriever::unsafe_fillProvenance() {
93+
provenanceReader_->unsafe_fillProvenance(transitionIndex_);
94+
}
95+
8996
ProvenanceReaderBase::~ProvenanceReaderBase() {}
9097
} // namespace edm

FWCore/Integration/test/ref_merge_test_cfg.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test Refs after merge.')
66

77
parser.add_argument("--fileName", help="file to read")
8+
parser.add_argument("--promptRead", action="store_true", default=False, help="prompt read the event products")
89

910
args = parser.parse_args()
1011

1112
process = cms.Process("TEST")
1213

1314
from IOPool.Input.modules import PoolSource
14-
process.source = PoolSource(fileNames = [f"file:{args.fileName}"])
15+
process.source = PoolSource(fileNames = [f"file:{args.fileName}"], delayReadingEventProducts = not args.promptRead)
1516

1617
from FWCore.Integration.modules import OtherThingAnalyzer
1718
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))

FWCore/Integration/test/run_RefMerge.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ LOCAL_TEST_DIR=${SCRAM_TEST_PATH}
1414
echo ${test}MERGE------------------------------------------------------------
1515
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_prod1.root' --inFile2 'ref_merge_prod2.root' --outFile 'ref_merge.root' || die "cmsRun ${test}cfg.py" $?
1616

17-
echo ${test}test------------------------------------------------------------
18-
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge.root' || die "cmsRun ${test}test_cfg.py" $?
17+
18+
echo ${test}MERGE promptRead------------------------------------------------------------
19+
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge.root' --promptRead || die "cmsRun ${test}test_cfg.py" $?
1920

2021
echo ${test}keepAllProd ------------------------------------------------------------
2122
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --extraProducers --keepAllProducts --fileName 'ref_merge_prod_all.root' || die "cmsRun ${test}prod_cfg.py --keepAllProducts" $?

IOPool/Input/src/InputFile.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Holder for an input TFile.
1010
#include "FWCore/Utilities/interface/propagate_const.h"
1111

1212
#include "TFile.h"
13+
#include "TTree.h"
14+
#include "TTreeCache.h"
1315

1416
#include <map>
1517
#include <string>
@@ -46,8 +48,33 @@ namespace edm {
4648
static void reportReadBranch(InputType inputType, std::string const& branchname);
4749

4850
TObject* Get(char const* name) { return file_->Get(name); }
49-
TFileCacheRead* GetCacheRead() const { return file_->GetCacheRead(); }
50-
void SetCacheRead(TFileCacheRead* tfcr) { file_->SetCacheRead(tfcr, nullptr, TFile::kDoNotDisconnect); }
51+
std::unique_ptr<TTreeCache> createCacheWithSize(TTree& iTree, unsigned int cacheSize) {
52+
iTree.SetCacheSize(static_cast<Long64_t>(cacheSize));
53+
std::unique_ptr<TTreeCache> newCache(dynamic_cast<TTreeCache*>(file_->GetCacheRead(&iTree)));
54+
file_->SetCacheRead(nullptr, &iTree, TFile::kDoNotDisconnect);
55+
return newCache;
56+
}
57+
58+
class CacheGuard {
59+
public:
60+
CacheGuard(TFile* file, TObject* tree, TFileCacheRead* tfcr) : file_(file), tree_(tree) {
61+
file_->SetCacheRead(tfcr, tree_, TFile::kDoNotDisconnect);
62+
}
63+
CacheGuard() = delete;
64+
CacheGuard(CacheGuard const&) = delete;
65+
CacheGuard& operator=(CacheGuard const&) = delete;
66+
CacheGuard(CacheGuard&&) = delete;
67+
CacheGuard& operator=(CacheGuard&&) = delete;
68+
~CacheGuard() { file_->SetCacheRead(nullptr, tree_, TFile::kDoNotDisconnect); }
69+
70+
private:
71+
TFile* file_;
72+
TObject* tree_;
73+
};
74+
[[nodiscard]] CacheGuard setCacheReadTemporarily(TFileCacheRead* tfcr, TObject* iTree) {
75+
return CacheGuard(file_.get(), iTree, tfcr);
76+
}
77+
void clearCacheRead(TObject* iTree) { file_->SetCacheRead(nullptr, iTree, TFile::kDoNotDisconnect); }
5178
void logFileAction(char const* msg, char const* fileName) const;
5279

5380
private:

0 commit comments

Comments
 (0)