Skip to content

Commit b5cf409

Browse files
authored
Merge pull request #47036 from makortel/transformerStream
Add StreamID parameter to Transformer callback functions
2 parents 7227919 + 0e28aeb commit b5cf409

17 files changed

+123
-69
lines changed

FWCore/Framework/interface/TransformerBase.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "FWCore/Utilities/interface/EDPutToken.h"
1212
#include "FWCore/Utilities/interface/SoATuple.h"
13+
#include "FWCore/Utilities/interface/StreamID.h"
1314
#include "FWCore/Utilities/interface/TypeID.h"
1415
#include "FWCore/Utilities/interface/ProductResolverIndex.h"
1516

@@ -38,8 +39,9 @@ namespace edm {
3839
protected:
3940
//The function takes the WrapperBase corresponding to the data product from the EDPutToken
4041
// and returns the WrapperBase associated to the id and instanceName
41-
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(std::any)>;
42-
using PreTransformFunction = std::function<std::any(edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;
42+
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(edm::StreamID, std::any)>;
43+
using PreTransformFunction =
44+
std::function<std::any(edm::StreamID, edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;
4345

4446
void registerTransformImp(ProducerBase&, EDPutToken, const TypeID& id, std::string instanceName, TransformFunction);
4547
void registerTransformAsyncImp(

FWCore/Framework/interface/global/implementors.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -474,17 +474,17 @@ namespace edm {
474474

475475
template <typename G, typename F>
476476
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
477-
using ReturnTypeT = decltype(iF(std::declval<G>()));
477+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
478478
TypeID returnType(typeid(ReturnTypeT));
479479
TransformerBase::registerTransformImp(
480480
*this,
481481
EDPutToken(iToken),
482482
returnType,
483483
std::move(productInstance),
484-
[f = std::move(iF)](std::any const& iGotProduct) {
484+
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
485485
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
486486
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
487-
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
487+
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
488488
});
489489
}
490490

@@ -493,20 +493,22 @@ namespace edm {
493493
P iPre,
494494
F iF,
495495
std::string productInstance = std::string()) {
496-
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
497-
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
496+
using CacheTypeT =
497+
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
498+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
498499
TypeID returnType(typeid(ReturnTypeT));
499500
TransformerBase::registerTransformAsyncImp(
500501
*this,
501502
EDPutToken(iToken),
502503
returnType,
503504
std::move(productInstance),
504-
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
505-
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
505+
[p = std::move(iPre)](
506+
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
507+
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
506508
},
507-
[f = std::move(iF)](std::any const& iCache) {
509+
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
508510
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{},
509-
f(std::any_cast<CacheTypeT>(iCache)));
511+
f(id, std::any_cast<CacheTypeT>(iCache)));
510512
});
511513
}
512514

FWCore/Framework/interface/limited/implementors.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -462,17 +462,17 @@ namespace edm {
462462

463463
template <typename G, typename F>
464464
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
465-
using ReturnTypeT = decltype(iF(std::declval<G>()));
465+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
466466
TypeID returnType(typeid(ReturnTypeT));
467467
TransformerBase::registerTransformImp(
468468
*this,
469469
EDPutToken(iToken),
470470
returnType,
471471
std::move(productInstance),
472-
[f = std::move(iF)](std::any const& iGotProduct) {
472+
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
473473
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
474474
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
475-
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
475+
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
476476
});
477477
}
478478

@@ -481,20 +481,21 @@ namespace edm {
481481
P iPre,
482482
F iF,
483483
std::string productInstance = std::string()) {
484-
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
485-
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
484+
using CacheTypeT = decltype(iPre(std::declval<StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
485+
using ReturnTypeT = decltype(iF(std::declval<StreamID>(), std::declval<CacheTypeT>()));
486486
TypeID returnType(typeid(ReturnTypeT));
487487
TransformerBase::registerTransformAsyncImp(
488488
*this,
489489
EDPutToken(iToken),
490490
returnType,
491491
std::move(productInstance),
492-
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
493-
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
492+
[p = std::move(iPre)](
493+
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
494+
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
494495
},
495-
[f = std::move(iF)](std::any const& iCache) {
496+
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
496497
auto cache = std::any_cast<CacheTypeT>(iCache);
497-
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
498+
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
498499
});
499500
}
500501

FWCore/Framework/interface/one/implementors.h

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -354,38 +354,41 @@ namespace edm {
354354

355355
template <typename G, typename F>
356356
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
357-
using ReturnTypeT = decltype(iF(std::declval<G>()));
357+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
358358
TypeID returnType(typeid(ReturnTypeT));
359-
TransformerBase::registerTransformImp(*this,
360-
EDPutToken(iToken),
361-
returnType,
362-
std::move(productInstance),
363-
[f = std::move(iF)](edm::WrapperBase const& iGotProduct) {
364-
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
365-
WrapperBase::Emplace{},
366-
f(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product()));
367-
});
359+
TransformerBase::registerTransformImp(
360+
*this,
361+
EDPutToken(iToken),
362+
returnType,
363+
std::move(productInstance),
364+
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
365+
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
366+
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
367+
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
368+
});
368369
}
369370

370371
template <typename G, typename P, typename F>
371372
void registerTransformAsync(edm::EDPutTokenT<G> iToken,
372373
P iPre,
373374
F iF,
374375
std::string productInstance = std::string()) {
375-
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
376-
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
376+
using CacheTypeT =
377+
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
378+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
377379
TypeID returnType(typeid(ReturnTypeT));
378380
TransformerBase::registerTransformAsyncImp(
379381
*this,
380382
EDPutToken(iToken),
381383
returnType,
382384
std::move(productInstance),
383-
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
384-
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
385+
[p = std::move(iPre)](
386+
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
387+
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
385388
},
386-
[f = std::move(iF)](std::any const& iCache) {
389+
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
387390
auto cache = std::any_cast<CacheTypeT>(iCache);
388-
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
391+
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
389392
});
390393
}
391394

FWCore/Framework/interface/stream/implementors.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -328,17 +328,17 @@ namespace edm {
328328

329329
template <typename G, typename F>
330330
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
331-
using ReturnTypeT = decltype(iF(std::declval<G>()));
331+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
332332
TypeID returnType(typeid(ReturnTypeT));
333333
TransformerBase::registerTransformImp(
334334
*this,
335335
EDPutToken(iToken),
336336
returnType,
337337
std::move(productInstance),
338-
[f = std::move(iF)](std::any const& iGotProduct) {
338+
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
339339
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
340340
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
341-
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
341+
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
342342
});
343343
}
344344

@@ -347,20 +347,22 @@ namespace edm {
347347
P iPre,
348348
F iF,
349349
std::string productInstance = std::string()) {
350-
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
351-
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
350+
using CacheTypeT =
351+
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
352+
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
352353
TypeID returnType(typeid(ReturnTypeT));
353354
TransformerBase::registerTransformAsyncImp(
354355
*this,
355356
EDPutToken(iToken),
356357
returnType,
357358
std::move(productInstance),
358-
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
359-
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
359+
[p = std::move(iPre)](
360+
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
361+
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
360362
},
361-
[f = std::move(iF)](std::any const& iCache) {
363+
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
362364
auto cache = std::any_cast<CacheTypeT>(iCache);
363-
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
365+
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
364366
});
365367
}
366368

FWCore/Framework/src/TransformerBase.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
1111
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
12+
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
13+
1214
#include <optional>
1315

1416
namespace {
@@ -116,7 +118,8 @@ namespace edm {
116118
std::optional<decltype(iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex)))>
117119
handle;
118120
//transform acquiring signal
119-
TransformAcquiringSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
121+
auto const& streamContext = *mcc.getStreamContext();
122+
TransformAcquiringSignalSentry sentry(iAct, streamContext, mcc);
120123
CMS_SA_ALLOW try {
121124
handle = iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex));
122125
} catch (...) {
@@ -133,15 +136,16 @@ namespace edm {
133136
} else {
134137
//transform signal
135138
auto mcc = iEvent.moduleCallingContext();
136-
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
139+
auto const& streamContext = *mcc.getStreamContext();
140+
TransformSignalSentry sentry(iAct, streamContext, mcc);
137141
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
138-
transformInfo_.get<kTransform>(iIndex)(std::move(*cache)),
142+
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(*cache)),
139143
handle);
140144
}
141145
});
142146
WaitingTaskWithArenaHolder wta(*iHolder.group(), nextTask);
143147
CMS_SA_ALLOW try {
144-
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), wta);
148+
*cache = transformInfo_.get<kPreTransform>(iIndex)(streamContext.streamID(), *(handle->wrapper()), wta);
145149
} catch (...) {
146150
wta.doneWaiting(std::current_exception());
147151
}
@@ -153,9 +157,10 @@ namespace edm {
153157
if (handle.wrapper()) {
154158
std::any v = handle.wrapper();
155159
//transform signal
156-
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
160+
auto const& streamContext = *mcc.getStreamContext();
161+
TransformSignalSentry sentry(iAct, streamContext, mcc);
157162
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
158-
transformInfo_.get<kTransform>(iIndex)(std::move(v)),
163+
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(v)),
159164
handle);
160165
}
161166
} catch (...) {

FWCore/Framework/test/global_filter_t.cppunit.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class testGlobalFilter : public CppUnit::TestFixture {
357357
public:
358358
TransformProd(edm::ParameterSet const&) {
359359
token_ = produces<float>();
360-
registerTransform(token_, [](float iV) { return int(iV); });
360+
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
361361
}
362362

363363
bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
@@ -380,8 +380,8 @@ class testGlobalFilter : public CppUnit::TestFixture {
380380
token_ = produces<float>();
381381
registerTransformAsync(
382382
token_,
383-
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
384-
[](IntHolder iWaitValue) { return iWaitValue.value_; });
383+
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
384+
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
385385
}
386386

387387
bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {

FWCore/Framework/test/global_producer_t.cppunit.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ class testGlobalProducer : public CppUnit::TestFixture {
325325
public:
326326
TransformProd(edm::ParameterSet const&) {
327327
token_ = produces<float>();
328-
registerTransform(token_, [](float iV) { return int(iV); });
328+
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
329329
}
330330

331331
void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
@@ -347,8 +347,8 @@ class testGlobalProducer : public CppUnit::TestFixture {
347347
token_ = produces<float>();
348348
registerTransformAsync(
349349
token_,
350-
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
351-
[](IntHolder iWaitValue) { return iWaitValue.value_; });
350+
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
351+
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
352352
}
353353

354354
void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {

FWCore/Framework/test/limited_filter_t.cppunit.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class testLimitedFilter : public CppUnit::TestFixture {
391391
TransformProd(edm::ParameterSet const&)
392392
: edm::limited::EDFilterBase(s_pset), edm::limited::EDFilter<edm::Transformer>(s_pset) {
393393
token_ = produces<float>();
394-
registerTransform(token_, [](float iV) { return int(iV); });
394+
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
395395
}
396396

397397
bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {

FWCore/Framework/test/limited_producer_t.cppunit.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class testLimitedProducer : public CppUnit::TestFixture {
357357
TransformProd(edm::ParameterSet const&)
358358
: edm::limited::EDProducerBase(s_pset), edm::limited::EDProducer<edm::Transformer>(s_pset) {
359359
token_ = produces<float>();
360-
registerTransform(token_, [](float iV) { return int(iV); });
360+
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
361361
}
362362

363363
void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {

0 commit comments

Comments
 (0)