Skip to content

Commit 8b9b9fa

Browse files
committed
Fixed OutputModule issues in TestProcessor
Testing with PoolOutputModule showed issues with meta-data and missing framework calls.
1 parent 87efee3 commit 8b9b9fa

File tree

2 files changed

+126
-11
lines changed

2 files changed

+126
-11
lines changed

FWCore/TestProcessor/interface/TestProcessor.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
#include "FWCore/Framework/interface/Schedule.h"
3737
#include "FWCore/Framework/interface/EventSetupRecordKey.h"
3838
#include "FWCore/Framework/interface/DataKey.h"
39+
#include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
40+
3941
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
4042
#include "FWCore/ServiceRegistry/interface/ProcessContext.h"
4143
#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
@@ -317,13 +319,17 @@ This simulates a problem happening early in the job which causes processing not
317319
void teardownProcessing();
318320

319321
void beginJob();
322+
void respondToOpenInputFile();
323+
void openOutputFiles();
320324
void beginProcessBlock();
321325
void beginRun();
322326
void beginLuminosityBlock();
323327
void event();
324328
std::shared_ptr<LuminosityBlockPrincipal> endLuminosityBlock();
325329
std::shared_ptr<RunPrincipal> endRun();
330+
void respondToCloseInputFile();
326331
ProcessBlockPrincipal const* endProcessBlock();
332+
void closeOutputFiles();
327333
void endJob();
328334

329335
// ---------- member data --------------------------------
@@ -346,7 +352,10 @@ This simulates a problem happening early in the job which causes processing not
346352
std::shared_ptr<ProcessConfiguration const> processConfiguration_;
347353
ProcessContext processContext_;
348354

355+
MergeableRunProductProcesses mergeableRunProductProcesses_;
356+
349357
ProcessHistoryRegistry processHistoryRegistry_;
358+
ProcessHistory processHistory_;
350359
std::unique_ptr<HistoryAppender> historyAppender_;
351360

352361
PrincipalCache principalCache_;
@@ -363,9 +372,11 @@ This simulates a problem happening early in the job which causes processing not
363372
LuminosityBlockNumber_t lumiNumber_ = 1;
364373
EventNumber_t eventNumber_ = 1;
365374
bool beginJobCalled_ = false;
375+
bool respondToOpenInputFileCalled_ = false;
366376
bool beginProcessBlockCalled_ = false;
367377
bool beginRunCalled_ = false;
368378
bool beginLumiCalled_ = false;
379+
bool openOutputFilesCalled_ = false;
369380
};
370381
} // namespace test
371382
} // namespace edm

FWCore/TestProcessor/src/TestProcessor.cc

Lines changed: 115 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
#include "FWCore/Framework/interface/DelayedReader.h"
3535
#include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
3636
#include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
37+
#include "FWCore/Framework/interface/FileBlock.h"
38+
#include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
3739

3840
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
3941
#include "FWCore/ServiceRegistry/interface/SystemBounds.h"
@@ -114,10 +116,9 @@ namespace edm {
114116
emptyPSet.registerIt();
115117
auto psetid = emptyPSet.id();
116118

117-
ProcessHistory oldHistory;
118119
for (auto const& p : iConfig.extraProcesses()) {
119-
oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
120-
processHistoryRegistry_.registerProcessHistory(oldHistory);
120+
processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
121+
processHistoryRegistry_.registerProcessHistory(processHistory_);
121122
}
122123

123124
//setup the products we will be adding to the event
@@ -157,6 +158,8 @@ namespace edm {
157158
principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
158159

159160
preg_->setFrozen();
161+
mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
162+
160163
for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
161164
// Reusable event principal
162165
auto ep = std::make_shared<EventPrincipal>(preg_,
@@ -168,7 +171,8 @@ namespace edm {
168171
principalCache_.insert(std::move(ep));
169172
}
170173
for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
171-
auto rp = std::make_unique<RunPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
174+
auto rp = std::make_unique<RunPrincipal>(
175+
preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
172176
principalCache_.insert(std::move(rp));
173177
}
174178
for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
@@ -217,9 +221,16 @@ namespace edm {
217221
if (not beginJobCalled_) {
218222
beginJob();
219223
}
224+
if (not respondToOpenInputFileCalled_) {
225+
respondToOpenInputFile();
226+
}
220227
if (not beginProcessBlockCalled_) {
221228
beginProcessBlock();
222229
}
230+
if (not openOutputFilesCalled_) {
231+
openOutputFiles();
232+
}
233+
223234
if (not beginRunCalled_) {
224235
beginRun();
225236
}
@@ -235,7 +246,6 @@ namespace edm {
235246
//We want each test to have its own ES data products
236247
esHelper_->resetAllProxies();
237248
}
238-
239249
return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
240250
}
241251

@@ -247,9 +257,15 @@ namespace edm {
247257
if (not beginJobCalled_) {
248258
beginJob();
249259
}
260+
if (not respondToOpenInputFileCalled_) {
261+
respondToOpenInputFile();
262+
}
250263
if (not beginProcessBlockCalled_) {
251264
beginProcessBlock();
252265
}
266+
if (not openOutputFilesCalled_) {
267+
openOutputFiles();
268+
}
253269
if (not beginRunCalled_) {
254270
beginRun();
255271
}
@@ -271,9 +287,15 @@ namespace edm {
271287
if (not beginJobCalled_) {
272288
beginJob();
273289
}
290+
if (not respondToOpenInputFileCalled_) {
291+
respondToOpenInputFile();
292+
}
274293
if (not beginProcessBlockCalled_) {
275294
beginProcessBlock();
276295
}
296+
if (not openOutputFilesCalled_) {
297+
openOutputFiles();
298+
}
277299
if (beginRunCalled_) {
278300
assert(runNumber_ != iNum);
279301
endRun();
@@ -285,7 +307,6 @@ namespace edm {
285307
//We want each test to have its own ES data products
286308
esHelper_->resetAllProxies();
287309
}
288-
289310
return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
290311
}
291312
edm::test::Run TestProcessor::testEndRunImpl() {
@@ -296,9 +317,15 @@ namespace edm {
296317
if (not beginJobCalled_) {
297318
beginJob();
298319
}
320+
if (not respondToOpenInputFileCalled_) {
321+
respondToOpenInputFile();
322+
}
299323
if (not beginProcessBlockCalled_) {
300324
beginProcessBlock();
301325
}
326+
if (not openOutputFilesCalled_) {
327+
openOutputFiles();
328+
}
302329
if (not beginRunCalled_) {
303330
beginRun();
304331
}
@@ -339,9 +366,15 @@ namespace edm {
339366
if (not beginJobCalled_) {
340367
beginJob();
341368
}
369+
if (not respondToOpenInputFileCalled_) {
370+
respondToOpenInputFile();
371+
}
342372
if (not beginProcessBlockCalled_) {
343373
beginProcessBlock();
344374
}
375+
if (not openOutputFilesCalled_) {
376+
openOutputFiles();
377+
}
345378
if (not beginRunCalled_) {
346379
beginRun();
347380
}
@@ -360,10 +393,17 @@ namespace edm {
360393
endRun();
361394
beginRunCalled_ = false;
362395
}
396+
if (respondToOpenInputFileCalled_) {
397+
respondToCloseInputFile();
398+
}
363399
if (beginProcessBlockCalled_) {
364400
endProcessBlock();
365401
beginProcessBlockCalled_ = false;
366402
}
403+
if (openOutputFilesCalled_) {
404+
closeOutputFiles();
405+
openOutputFilesCalled_ = false;
406+
}
367407
if (beginJobCalled_) {
368408
endJob();
369409
}
@@ -419,11 +459,53 @@ namespace edm {
419459
beginProcessBlockCalled_ = true;
420460
}
421461

462+
void TestProcessor::openOutputFiles() {
463+
//make the services available
464+
ServiceRegistry::Operate operate(serviceToken_);
465+
466+
edm::FileBlock fb;
467+
schedule_->openOutputFiles(fb);
468+
openOutputFilesCalled_ = true;
469+
}
470+
471+
void TestProcessor::closeOutputFiles() {
472+
if (openOutputFilesCalled_) {
473+
//make the services available
474+
ServiceRegistry::Operate operate(serviceToken_);
475+
schedule_->closeOutputFiles();
476+
477+
openOutputFilesCalled_ = false;
478+
}
479+
}
480+
481+
void TestProcessor::respondToOpenInputFile() {
482+
respondToOpenInputFileCalled_ = true;
483+
edm::FileBlock fb;
484+
//make the services available
485+
ServiceRegistry::Operate operate(serviceToken_);
486+
schedule_->respondToOpenInputFile(fb);
487+
}
488+
489+
void TestProcessor::respondToCloseInputFile() {
490+
if (respondToOpenInputFileCalled_) {
491+
edm::FileBlock fb;
492+
//make the services available
493+
ServiceRegistry::Operate operate(serviceToken_);
494+
495+
schedule_->respondToCloseInputFile(fb);
496+
respondToOpenInputFileCalled_ = false;
497+
}
498+
}
499+
422500
void TestProcessor::beginRun() {
423501
runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
424502
runPrincipal_->clearPrincipal();
425503
assert(runPrincipal_);
426-
runPrincipal_->setAux(edm::RunAuxiliary(runNumber_, Timestamp(), Timestamp()));
504+
edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
505+
aux.setProcessHistoryID(processHistory_.id());
506+
runPrincipal_->setAux(aux);
507+
508+
runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
427509

428510
IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
429511
eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
@@ -458,12 +540,14 @@ namespace edm {
458540

459541
void TestProcessor::beginLuminosityBlock() {
460542
LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
543+
aux.setProcessHistoryID(processHistory_.id());
461544
lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
462545
lumiPrincipal_->clearPrincipal();
463546
assert(lumiPrincipal_);
464547
lumiPrincipal_->setAux(aux);
465548

466549
lumiPrincipal_->setRunPrincipal(runPrincipal_);
550+
lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
467551

468552
IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
469553
eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
@@ -503,10 +587,9 @@ namespace edm {
503587

504588
//this resets the EventPrincipal (if it had been used before)
505589
pep->clearEventPrincipal();
506-
pep->fillEventPrincipal(
507-
edm::EventAuxiliary(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false),
508-
nullptr,
509-
nullptr);
590+
edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
591+
aux.setProcessHistoryID(processHistory_.id());
592+
pep->fillEventPrincipal(aux, nullptr, nullptr);
510593
assert(lumiPrincipal_.get() != nullptr);
511594
pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
512595

@@ -535,6 +618,9 @@ namespace edm {
535618
std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
536619
auto lumiPrincipal = lumiPrincipal_;
537620
if (beginLumiCalled_) {
621+
//make the services available
622+
ServiceRegistry::Operate operate(serviceToken_);
623+
538624
beginLumiCalled_ = false;
539625
lumiPrincipal_.reset();
540626

@@ -575,6 +661,12 @@ namespace edm {
575661
false);
576662
globalWaitTask.wait();
577663
}
664+
{
665+
FinalWaitingTask globalWaitTask{taskGroup_};
666+
schedule_->writeLumiAsync(
667+
WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
668+
globalWaitTask.wait();
669+
}
578670
}
579671
lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
580672
return lumiPrincipal;
@@ -586,6 +678,9 @@ namespace edm {
586678
if (beginRunCalled_) {
587679
beginRunCalled_ = false;
588680

681+
//make the services available
682+
ServiceRegistry::Operate operate(serviceToken_);
683+
589684
IOVSyncValue ts(
590685
EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
591686
runPrincipal->endTime());
@@ -625,6 +720,15 @@ namespace edm {
625720
false);
626721
globalWaitTask.wait();
627722
}
723+
{
724+
FinalWaitingTask globalWaitTask{taskGroup_};
725+
schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
726+
*runPrincipal,
727+
&processContext_,
728+
actReg_.get(),
729+
runPrincipal->mergeableRunProductMetadata());
730+
globalWaitTask.wait();
731+
}
628732
}
629733
return runPrincipal;
630734
}

0 commit comments

Comments
 (0)