99// granted to it by virtue of its status as an Intergovernmental Organization
1010// or submit itself to any jurisdiction.
1111
12+ #include " Framework/ConcreteDataMatcher.h"
1213#include " Framework/ConfigParamSpec.h"
14+ #include " Framework/CompletionPolicy.h"
15+ #include " Framework/CompletionPolicyHelpers.h"
16+ #include " Framework/InputRecordWalker.h"
17+ #include " Framework/Logger.h"
1318
1419#include < chrono>
1520#include < thread>
@@ -29,36 +34,41 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
2934 ConfigParamSpec{" 3-layer-pipelining" , VariantType::Int, 1 , {timeHelp}});
3035}
3136
37+ void customize (std::vector<CompletionPolicy>& policies)
38+ {
39+ policies = {
40+ CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe (" merger-policy" , [](auto const &) -> bool { return true ; })};
41+ }
42+
3243#include " Framework/runDataProcessing.h"
3344#include " Framework/DataProcessorSpec.h"
3445#include " Framework/DataSpecUtils.h"
3546#include " Framework/ParallelContext.h"
36- #include " Framework/ControlService.h"
37-
38- #include " Framework/Logger.h"
3947
4048#include < vector>
4149
4250using DataHeader = o2::header::DataHeader;
4351
4452DataProcessorSpec templateProcessor ()
4553{
46- return DataProcessorSpec{" some-processor" , {
47- InputSpec{" x" , " TST" , " A" , 0 , Lifetime::Timeframe},
48- },
49- {
54+ return DataProcessorSpec{.name = " some-processor" ,
55+ .inputs = {
56+ InputSpec{" x" , " TST" , " A" , 0 , Lifetime::Timeframe},
57+ },
58+ .outputs = {
5059 OutputSpec{" TST" , " P" , 0 , Lifetime::Timeframe},
5160 },
5261 // The producer is stateful, we use a static for the state in this
5362 // particular case, but a Singleton or a captured new object would
5463 // work as well.
55- AlgorithmSpec{[](InitContext& setup) {
64+ . algorithm = AlgorithmSpec{[](InitContext& setup) {
5665 srand (setup.services ().get <ParallelContext>().index1D ());
5766 return [](ProcessingContext& ctx) {
5867 // Create a single output.
5968 size_t index = ctx.services ().get <ParallelContext>().index1D ();
60- auto & aData = ctx.outputs ().make <int >(
69+ auto & i = ctx.outputs ().make <int >(
6170 Output{" TST" , " P" , static_cast <o2::header::DataHeader::SubSpecificationType>(index)}, 1 );
71+ i[0 ] = index;
6272 std::this_thread::sleep_for (std::chrono::seconds (rand () % 5 ));
6373 };
6474 }}};
@@ -86,34 +96,43 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
8696 outputSpecs.emplace_back (" TST" , " A" , ssi);
8797 }
8898
89- workflow.push_back (DataProcessorSpec{" reader" , {}, outputSpecs, AlgorithmSpec{[jobs](InitContext& initCtx) {
90- return [jobs](ProcessingContext& ctx) {
91- for (size_t ji = 0 ; ji < jobs; ++ji) {
92- ctx.outputs ().make <int >(Output{" TST" , " A" , static_cast <o2::header::DataHeader::SubSpecificationType>(ji)},
93- 1 );
94- }
95- };
96- }}});
99+ workflow.push_back (DataProcessorSpec{
100+ .name = " reader" ,
101+ .outputs = outputSpecs,
102+ .algorithm = AlgorithmSpec{[jobs](InitContext& initCtx) {
103+ return [jobs](ProcessingContext& ctx) {
104+ static int count = 0 ;
105+ for (size_t ji = 0 ; ji < jobs; ++ji) {
106+ int & i = ctx.outputs ().make <int >(Output{" TST" , " A" , static_cast <o2::header::DataHeader::SubSpecificationType>(ji)});
107+ i = count * 100 + ji;
108+ }
109+ count++;
110+ };
111+ }}});
97112 workflow.push_back (timePipeline (DataProcessorSpec{
98- " merger" ,
99- mergeInputs (InputSpec{" x" , " TST" , " P" },
100- jobs,
101- [](InputSpec& input, size_t index) {
102- DataSpecUtils::updateMatchingSubspec (input, index);
103- }),
104- {OutputSpec{{" out" }, " TST" , " M" }},
105- AlgorithmSpec{[](InitContext& setup) {
113+ .name = " merger" ,
114+ .inputs = {InputSpec{" all" , ConcreteDataTypeMatcher{" TST" , " P" }}},
115+ .outputs = {OutputSpec{{" out" }, " TST" , " M" }},
116+ .algorithm = AlgorithmSpec{[](InitContext& setup) {
106117 return [](ProcessingContext& ctx) {
118+ LOGP (info, " Run" );
119+ for (const auto & input : o2::framework::InputRecordWalker (ctx.inputs ())) {
120+ if (input.header == nullptr ) {
121+ LOGP (error, " Missing header" );
122+ continue ;
123+ }
124+ int record = *(int *)input.payload ;
125+ LOGP (info, " Record {}" , record);
126+ }
107127 ctx.outputs ().make <int >(OutputRef (" out" , 0 ), 1 );
108128 };
109129 }}},
110130 stages));
111131
112132 workflow.push_back (DataProcessorSpec{
113- " writer" ,
114- {InputSpec{" x" , " TST" , " M" }},
115- {},
116- AlgorithmSpec{[](InitContext& setup) {
133+ .name = " writer" ,
134+ .inputs = {InputSpec{" x" , " TST" , " M" }},
135+ .algorithm = AlgorithmSpec{[](InitContext& setup) {
117136 return [](ProcessingContext& ctx) {
118137 };
119138 }}});
0 commit comments