Skip to content

Commit b914499

Browse files
committed
Update Principal held ProductRegistry just before it is needed
Reading a new file can cause the ProductRegistry to be modified which may require the Principal to update its internal data structure. Now check if the ProductRegistry changed just before the Principal will be used for data processing.
1 parent 5b531f0 commit b914499

File tree

8 files changed

+42
-49
lines changed

8 files changed

+42
-49
lines changed

DataFormats/Provenance/interface/ProductRegistry.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ namespace edm {
9393

9494
ProductList::size_type size() const { return productList_.size(); }
9595

96+
//If the value differs between two versions of the main registry then
97+
// one must update any related meta data
98+
using CacheID = ProductList::size_type;
99+
CacheID cacheIdentifier() const { return size(); }
100+
96101
void print(std::ostream& os) const;
97102

98103
bool anyProducts(BranchType const brType) const;

FWCore/Framework/interface/PrincipalCache.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ namespace edm {
5151
void insert(std::unique_ptr<LuminosityBlockPrincipal>);
5252
void insert(std::shared_ptr<EventPrincipal>);
5353

54-
void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const>);
55-
56-
void adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const>);
57-
5854
private:
5955
std::unique_ptr<ProcessBlockPrincipal> processBlockPrincipal_;
6056
std::unique_ptr<ProcessBlockPrincipal> inputProcessBlockPrincipal_;

FWCore/Framework/src/EventProcessor.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,24 +1037,14 @@ namespace edm {
10371037
streamRunStatus_[0]->runPrincipal()->preReadFile();
10381038
}
10391039

1040-
auto sizeBefore = input_->productRegistry().size();
1040+
auto oldCacheID = input_->productRegistry().cacheIdentifier();
10411041
fb_ = input_->readFile();
10421042
//incase the input's registry changed
1043-
if (input_->productRegistry().size() != sizeBefore) {
1043+
if (input_->productRegistry().cacheIdentifier() != oldCacheID) {
10441044
auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
10451045
temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
10461046
preg_ = std::move(temp);
1047-
//This handles are presently unused Run/Lumis
1048-
principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_));
1049-
if (streamLumiActive_ > 0) {
1050-
//Can update the active ones now, even before an `end` transition is called because no OutputModule
1051-
// supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only
1052-
// dropped products can change the ProductRegistry, only changes in Event can cause that.
1053-
streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1054-
streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1055-
}
10561047
}
1057-
principalCache_.adjustEventsToNewProductRegistry(preg());
10581048
if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
10591049
fb_->setNotFastClonable(FileBlock::ParallelProcesses);
10601050
}
@@ -2038,6 +2028,7 @@ namespace edm {
20382028

20392029
std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
20402030
auto rp = principalCache_.getAvailableRunPrincipalPtr();
2031+
//a new file may have been opened since the last use of this Run
20412032
rp->possiblyUpdateAfterAddition(preg());
20422033
assert(rp);
20432034
rp->setAux(*input_->runAuxiliary());
@@ -2052,6 +2043,9 @@ namespace edm {
20522043

20532044
void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
20542045
RunPrincipal& runPrincipal = *iStatus.runPrincipal();
2046+
//If a file open happened and we are continuing the Run we may need
2047+
// to do the update
2048+
runPrincipal.possiblyUpdateAfterAddition(preg());
20552049

20562050
runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
20572051
{
@@ -2063,6 +2057,7 @@ namespace edm {
20632057

20642058
std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
20652059
auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
2060+
//A new file may have been opened since the last use of the LuminosityBlock
20662061
lbp->possiblyUpdateAfterAddition(preg());
20672062
assert(lbp);
20682063
lbp->setAux(*input_->luminosityBlockAuxiliary());
@@ -2081,6 +2076,9 @@ namespace edm {
20812076
input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
20822077
input_->processHistoryRegistry().reducedProcessHistoryID(
20832078
input_->luminosityBlockAuxiliary()->processHistoryID()));
2079+
//If a file was opened and the LuminosityBlock is continuing
2080+
// we may need to do the update
2081+
lumiPrincipal.possiblyUpdateAfterAddition(preg());
20842082
lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
20852083
{
20862084
SendSourceTerminationSignalIfException sentry(actReg_.get());
@@ -2386,6 +2384,8 @@ namespace edm {
23862384
//TODO this will have to become per stream
23872385
auto& event = principalCache_.eventPrincipal(iStreamIndex);
23882386
StreamContext streamContext(event.streamID(), &processContext_);
2387+
// a new file may have been read since the last time this event was used
2388+
event.possiblyUpdateAfterAddition(preg());
23892389

23902390
SendSourceTerminationSignalIfException sentry(actReg_.get());
23912391
input_->readEvent(event, streamContext);

FWCore/Framework/src/Principal.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ namespace edm {
149149
}
150150

151151
void Principal::possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const> iProd) {
152-
if (iProd.get() != preg_.get()) {
152+
if (iProd.get() != preg_.get() || iProd->cacheIdentifier() != preg_->cacheIdentifier()) {
153153
preg_ = iProd;
154154
adjustIndexesAfterProductRegistryAddition();
155155
}

FWCore/Framework/src/PrincipalCache.cc

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,4 @@ namespace edm {
3939
assert(iStreamIndex < eventPrincipals_.size());
4040
eventPrincipals_[iStreamIndex] = ep;
4141
}
42-
43-
void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const> reg) {
44-
for (auto& eventPrincipal : eventPrincipals_) {
45-
if (eventPrincipal) {
46-
eventPrincipal->possiblyUpdateAfterAddition(reg);
47-
}
48-
}
49-
}
50-
51-
void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const> iReg) {
52-
//Need to temporarily hold all the runs to clear out the runHolder_
53-
std::vector<std::shared_ptr<RunPrincipal>> tempRunPrincipals;
54-
while (auto p = runHolder_.tryToGet()) {
55-
p->possiblyUpdateAfterAddition(iReg);
56-
tempRunPrincipals.emplace_back(std::move(p));
57-
}
58-
//Need to temporarily hold all the lumis to clear out the lumiHolder_
59-
std::vector<std::shared_ptr<LuminosityBlockPrincipal>> tempLumiPrincipals;
60-
while (auto p = lumiHolder_.tryToGet()) {
61-
p->possiblyUpdateAfterAddition(iReg);
62-
tempLumiPrincipals.emplace_back(std::move(p));
63-
}
64-
}
65-
6642
} // namespace edm

FWCore/Integration/test/provenance_prod_cfg.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
parser = ArgumentParser(description='Write streamer output file for provenance read test')
55
parser.add_argument("--consumeProd2", help="add an extra producer to the job and drop on output", action="store_true")
6+
parser.add_argument("--diffRun", help="use a different run number", action="store_true")
7+
parser.add_argument("--diffLumi", help="use a different LuminosityBlock number", action="store_true")
68
args = parser.parse_args()
79

810

@@ -11,11 +13,16 @@
1113
from FWCore.Modules.modules import EmptySource
1214

1315
runNumber = 1
16+
lumiNumber = 1
17+
if args.diffRun:
18+
runNumber = 2
19+
if args.diffLumi:
20+
lumiNumber=2
1421
eventNumber = 1
1522
if args.consumeProd2:
1623
eventNumber = 2
1724

18-
process.source = EmptySource(firstRun = runNumber, firstEvent = eventNumber )
25+
process.source = EmptySource(firstRun = runNumber, firstEvent = eventNumber, firstLuminosityBlock = lumiNumber )
1926

2027
from FWCore.Framework.modules import AddIntsProducer, IntProducer
2128

FWCore/Integration/test/provenance_test.sh

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,13 @@ function die { echo $1: status $2 ; exit $2; }
99
# The check makes sure the provenance in the ProductRegistry is properly updated when the new file is read
1010
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py || die 'Failed in provenance_prod_cfg.py' $?
1111
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 || die 'Failed in provenance_prod_cfg.py --consumeProd2' $?
12-
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance' $?
12+
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance' $?
13+
14+
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 --diffRun || die 'Failed in provenance_prod_cfg.py --consumeProd2 --diffRun' $?
15+
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance with diffRun' $?
16+
17+
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 --diffLumi || die 'Failed in provenance_prod_cfg.py --consumeProd2 --diffLumi' $?
18+
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance with diffLumi' $?
19+
20+
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 --diffLumi --diffRun|| die 'Failed in provenance_prod_cfg.py --consumeProd2 --diffLumi --diffRun' $?
21+
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance with diffLumi & diffRun' $?

FWCore/TestProcessor/src/TestSourceProcessor.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,12 @@ namespace edm::test {
224224
//make the services available
225225
ServiceRegistry::Operate operate(serviceToken_);
226226

227+
auto oldCacheID = source_->productRegistry().cacheIdentifier();
227228
fb_ = source_->readFile();
228229
//incase the input's registry changed
229-
const size_t size = preg_->size();
230-
preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
231-
if (size < preg_->size()) {
232-
principalCache_.adjustIndexesAfterProductRegistryAddition(preg_);
230+
if (oldCacheID != source_->productRegistry().cacheIdentifier()) {
231+
preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
233232
}
234-
principalCache_.adjustEventsToNewProductRegistry(preg_);
235-
236233
source_->fillProcessBlockHelper();
237234
ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
238235
while (source_->nextProcessBlock(processBlockPrincipal)) {
@@ -263,6 +260,7 @@ namespace edm::test {
263260

264261
//NOTE: should probably handle merging as well
265262
runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
263+
runPrincipal_->possiblyUpdateAfterAddition(preg_);
266264
runPrincipal_->setAux(*source_->runAuxiliary());
267265
source_->readRun(*runPrincipal_, *historyAppender_);
268266

@@ -280,6 +278,7 @@ namespace edm::test {
280278

281279
lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
282280
assert(lumiPrincipal_);
281+
lumiPrincipal_->possiblyUpdateAfterAddition(preg_);
283282
lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary());
284283
source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_);
285284

@@ -295,6 +294,7 @@ namespace edm::test {
295294
ServiceRegistry::Operate operate(serviceToken_);
296295

297296
auto& event = principalCache_.eventPrincipal(0);
297+
event.possiblyUpdateAfterAddition(preg_);
298298
StreamContext streamContext(event.streamID(), &processContext_);
299299

300300
source_->readEvent(event, streamContext);

0 commit comments

Comments
 (0)