Skip to content

Commit 27cacb8

Browse files
authored
Merge pull request #48712 from Dr15Jones/prefetchRandomNumberGenerator
prefetch data for RandomNumberGenerator
2 parents 20df235 + a90e47e commit 27cacb8

File tree

1 file changed

+124
-67
lines changed

1 file changed

+124
-67
lines changed

FWCore/Framework/src/EventProcessor.cc

Lines changed: 124 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,6 +1600,28 @@ namespace edm {
16001600
}
16011601
}
16021602

1603+
namespace {
1604+
void prefetchForRandomNumberGeneratorAsync(edm::WaitingTaskHolder const& iTask,
1605+
edm::Principal const& iPrincipal,
1606+
edm::ServiceToken const& iServiceToken) {
1607+
edm::ServiceRegistry::Operate operate(iServiceToken);
1608+
Service<edm::RandomNumberGenerator> rng;
1609+
if (rng.isAvailable()) {
1610+
auto consumer = rng->consumer();
1611+
if (consumer) {
1612+
// Prefetch products the module declares it consumes
1613+
std::vector<ProductResolverIndexAndSkipBit> const& items = consumer->itemsToGetFrom(iPrincipal.branchType());
1614+
1615+
for (auto const& item : items) {
1616+
ProductResolverIndex productResolverIndex = item.productResolverIndex();
1617+
bool skipCurrentProcess = item.skipCurrentProcess();
1618+
iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, iServiceToken, nullptr);
1619+
}
1620+
}
1621+
}
1622+
}
1623+
} // namespace
1624+
16031625
void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
16041626
std::shared_ptr<RunProcessingStatus> iRunStatus,
16051627
edm::WaitingTaskHolder iHolder) {
@@ -1661,78 +1683,100 @@ namespace edm {
16611683
}
16621684
LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
16631685
Service<RandomNumberGenerator> rng;
1664-
if (rng.isAvailable()) {
1665-
LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1666-
lb.setConsumer(rng->consumer());
1667-
rng->preBeginLumi(lb);
1668-
}
16691686

16701687
EventSetupImpl const& es = status->eventSetupImpl();
16711688

16721689
using namespace edm::waiting_task::chain;
1673-
chain::first([this, status](auto nextTask) mutable {
1674-
if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1675-
readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1676-
} else {
1677-
setNeedToCallNext(true);
1678-
}
1679-
}) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1680-
LumiTransitionInfo transitionInfo(lumiPrincipal, es);
1681-
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1682-
schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1683-
}) | ifThen(looper_, [this, status, &es](auto nextTask) {
1684-
looper_->prefetchAsync(
1685-
nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1686-
}) | ifThen(looper_, [this, status, &es](auto nextTask) {
1687-
ServiceRegistry::Operate operateLooper(serviceToken_);
1688-
looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1689-
}) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1690-
status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1691-
1692-
if (iException) {
1693-
WaitingTaskHolder copyHolder(holder);
1694-
copyHolder.doneWaiting(*iException);
1695-
globalEndLumiAsync(holder, status);
1696-
endRunAsync(iRunStatus, holder);
1697-
} else {
1698-
status->globalBeginDidSucceed();
1699-
1700-
EventSetupImpl const& es = status->eventSetupImpl();
1701-
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1702-
1703-
streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1704-
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1705-
streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1706-
if (!status->shouldStreamStartLumi()) {
1707-
return;
1708-
}
1709-
streamQueues_[i].pause();
1710-
1711-
auto& event = principalCache_.eventPrincipal(i);
1712-
auto lp = status->lumiPrincipal().get();
1713-
streamLumiStatus_[i] = std::move(status);
1714-
++streamLumiActive_;
1715-
event.setLuminosityBlockPrincipal(lp);
1716-
LumiTransitionInfo transitionInfo(*lp, es);
1717-
using namespace edm::waiting_task::chain;
1718-
chain::first([this, i, &transitionInfo](auto nextTask) {
1719-
schedule_->processOneStreamAsync<Traits>(
1720-
std::move(nextTask), i, transitionInfo, serviceToken_);
1721-
}) |
1722-
then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1723-
auto nextTask) {
1724-
if (exceptionFromBeginStreamLumi) {
1725-
WaitingTaskHolder copyHolder(nextTask);
1726-
copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1727-
}
1728-
handleNextEventForStreamAsync(std::move(nextTask), i);
1690+
chain::firstIf(rng.isAvailable() and rng->consumer() != nullptr,
1691+
[this, &lumiPrincipal](auto nextTask) {
1692+
//handle RandomNumberGenerator prefetching and preBeginLumi
1693+
chain::first([this, &lumiPrincipal](auto nextTask) {
1694+
prefetchForRandomNumberGeneratorAsync(
1695+
std::move(nextTask), lumiPrincipal, serviceToken_);
1696+
}) | then([this, &lumiPrincipal](auto nextTask) {
1697+
ServiceRegistry::Operate operate(serviceToken_);
1698+
Service<RandomNumberGenerator> rng;
1699+
if (rng.isAvailable()) {
1700+
LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1701+
lb.setConsumer(rng->consumer());
1702+
rng->preBeginLumi(lb);
1703+
}
1704+
}) | runLast(nextTask);
1705+
}) |
1706+
then([this, status](auto nextTask) mutable {
1707+
if (lastTransitionType().itemPosition() !=
1708+
InputSource::ItemPosition::LastItemToBeMerged) {
1709+
readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1710+
} else {
1711+
setNeedToCallNext(true);
1712+
}
1713+
}) |
1714+
then([this, status, &es, &lumiPrincipal](auto nextTask) {
1715+
LumiTransitionInfo transitionInfo(lumiPrincipal, es);
1716+
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1717+
schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1718+
}) |
1719+
ifThen(looper_,
1720+
[this, status, &es](auto nextTask) {
1721+
looper_->prefetchAsync(nextTask,
1722+
serviceToken_,
1723+
Transition::BeginLuminosityBlock,
1724+
*(status->lumiPrincipal()),
1725+
es);
1726+
}) |
1727+
ifThen(looper_,
1728+
[this, status, &es](auto nextTask) {
1729+
ServiceRegistry::Operate operateLooper(serviceToken_);
1730+
looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1731+
}) |
1732+
then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1733+
status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1734+
1735+
if (iException) {
1736+
WaitingTaskHolder copyHolder(holder);
1737+
copyHolder.doneWaiting(*iException);
1738+
globalEndLumiAsync(holder, status);
1739+
endRunAsync(iRunStatus, holder);
1740+
} else {
1741+
status->globalBeginDidSucceed();
1742+
1743+
EventSetupImpl const& es = status->eventSetupImpl();
1744+
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1745+
1746+
streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1747+
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1748+
streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1749+
if (!status->shouldStreamStartLumi()) {
1750+
return;
1751+
}
1752+
streamQueues_[i].pause();
1753+
1754+
auto& event = principalCache_.eventPrincipal(i);
1755+
auto lp = status->lumiPrincipal().get();
1756+
streamLumiStatus_[i] = std::move(status);
1757+
++streamLumiActive_;
1758+
event.setLuminosityBlockPrincipal(lp);
1759+
LumiTransitionInfo transitionInfo(*lp, es);
1760+
using namespace edm::waiting_task::chain;
1761+
chain::first([this, i, &transitionInfo](auto nextTask) {
1762+
schedule_->processOneStreamAsync<Traits>(
1763+
std::move(nextTask), i, transitionInfo, serviceToken_);
17291764
}) |
1730-
runLast(std::move(holder));
1765+
then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1766+
auto nextTask) {
1767+
if (exceptionFromBeginStreamLumi) {
1768+
WaitingTaskHolder copyHolder(nextTask);
1769+
copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1770+
}
1771+
handleNextEventForStreamAsync(std::move(nextTask), i);
1772+
}) |
1773+
runLast(std::move(holder));
1774+
});
1775+
} // end for loop over streams
17311776
});
1732-
} // end for loop over streams
1733-
});
1734-
}
1735-
}) | runLast(postSourceTask);
1777+
}
1778+
}) |
1779+
runLast(postSourceTask);
17361780
} catch (...) {
17371781
status->resetResources();
17381782
queueWhichWaitsForIOVsToFinish_.resume();
@@ -2295,7 +2339,20 @@ namespace edm {
22952339

22962340
EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl();
22972341
using namespace edm::waiting_task::chain;
2298-
chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2342+
chain::firstIf(rng.isAvailable() and rng->consumer() != nullptr, [this, pep](auto nextTask) {
2343+
//handle RandomNumberGenerator prefetching and postEventRead
2344+
chain::first([this, pep](auto nextTask) {
2345+
prefetchForRandomNumberGeneratorAsync(std::move(nextTask), *pep, serviceToken_);
2346+
}) | then([this, pep](auto nextTask) {
2347+
ServiceRegistry::Operate operate(serviceToken_);
2348+
Service<RandomNumberGenerator> rng;
2349+
if (rng.isAvailable()) {
2350+
Event ev(*pep, ModuleDescription(), nullptr);
2351+
ev.setConsumer(rng->consumer());
2352+
rng->postEventRead(ev);
2353+
}
2354+
}) | runLast(nextTask);
2355+
}) | then([this, &es, pep, iStreamIndex](auto nextTask) {
22992356
EventTransitionInfo info(*pep, es);
23002357
schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
23012358
}) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {

0 commit comments

Comments
 (0)