Skip to content

Commit 7b89ff2

Browse files
authored
Merge 7f354f4 into sapling-pr-archive-ktf
2 parents 25b923d + 7f354f4 commit 7b89ff2

File tree

10 files changed

+398
-150
lines changed

10 files changed

+398
-150
lines changed

Framework/Core/include/Framework/DataProcessorInfo.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ struct DataProcessorInfo {
2525
/// Name of the associated DataProcessorSpec
2626
std::string name = "Unknown";
2727
/// The executable name of the program which holds the DataProcessorSpec
28-
std::string executable = "/bin/false";
28+
std::string executable = "";
29+
/// The plugin spec of the plugin which holds the DataProcessorSpec
30+
std::string plugin = "";
2931
/// The argument passed on the command line for this DataProcessorSpec
3032
std::vector<std::string> cmdLineArgs = {};
3133
/// The workflow options which are available for the associated DataProcessorSpec
@@ -34,6 +36,6 @@ struct DataProcessorInfo {
3436
std::vector<std::string> channels = {};
3537
};
3638

37-
} // namespace o2
39+
} // namespace o2::framework
3840

3941
#endif // O2_FRAMEWORK_CORE_DATAPROCESSORINFO_H_

Framework/Core/include/Framework/DeviceExecution.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,25 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_DEVICEEXECUTION_H
12-
#define FRAMEWORK_DEVICEEXECUTION_H
11+
#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_
12+
#define O2_FRAMEWORK_DEVICEEXECUTION_H_
1313

1414
#include <vector>
1515

16-
namespace o2
17-
{
18-
namespace framework
16+
namespace o2::framework
1917
{
2018

2119
/// This represent one single execution of a Device. It's meant to hold
2220
/// information which can change between one execution of a Device and the
2321
/// other, e.g. the executable name or the arguments it is started with.
2422
struct DeviceExecution {
23+
std::string plugin;
2524
/// The options passed to a given device
2625
std::vector<char*> args;
2726
/// The environment to be passed to a given device
2827
std::vector<char*> environ;
2928
};
3029

31-
} // namespace framework
32-
} // namespace o2
33-
#endif
30+
} // namespace o2::framework
31+
32+
#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_

Framework/Core/include/Framework/Plugins.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
4444
// using the arrow dataset API
4545
RootObjectReadingImplementation,
4646

47+
// A plugin which defines a whole workflow. This will be used to separate
48+
// workflows in shared libraries and run them via a separate loader.
49+
Workflow,
4750
// A plugin which was not initialised properly.
4851
Unknown
4952
};
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
12+
#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
13+
14+
#include "Framework/ConfigParamSpec.h"
15+
#include "Framework/CompletionPolicy.h"
16+
#include "Framework/DispatchPolicy.h"
17+
#include "Framework/ResourcePolicy.h"
18+
#include "Framework/CallbacksPolicy.h"
19+
#include "Framework/SendingPolicy.h"
20+
#include "Framework/WorkflowSpec.h"
21+
#include "Framework/ChannelConfigurationPolicy.h"
22+
#include <vector>
23+
24+
namespace o2::framework
25+
{
26+
27+
struct WorkflowDefinitionContext {
28+
std::vector<ConfigParamSpec> workflowOptions;
29+
std::vector<CompletionPolicy> completionPolicies;
30+
std::vector<DispatchPolicy> dispatchPolicies;
31+
std::vector<ResourcePolicy> resourcePolicies;
32+
std::vector<CallbacksPolicy> callbacksPolicies;
33+
std::vector<SendingPolicy> sendingPolicies;
34+
std::vector<ConfigParamSpec> extraOptions;
35+
std::vector<ChannelConfigurationPolicy> channelPolicies;
36+
std::unique_ptr<ConfigContext> configContext;
37+
38+
// For the moment, let's put them here. We should
39+
// probably move them to a different place, since these are not really part
40+
// of the workflow definition but will be there also at runtine.
41+
std::unique_ptr<ServiceRegistry> configRegistry{nullptr};
42+
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
43+
44+
o2::framework::WorkflowSpec specs;
45+
};
46+
47+
struct WorkflowDefinition {
48+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
49+
};
50+
51+
struct WorkflowPlugin {
52+
virtual o2::framework::WorkflowDefinition* create() = 0;
53+
};
54+
55+
} // namespace o2::framework
56+
#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include "Framework/CustomWorkflowTerminationHook.h"
2727
#include "Framework/CommonServices.h"
2828
#include "Framework/WorkflowCustomizationHelpers.h"
29+
#include "Framework/WorkflowDefinitionContext.h"
2930
#include "Framework/Logger.h"
31+
#include "Framework/Plugins.h"
3032
#include "Framework/CheckTypes.h"
3133
#include "Framework/StructToTuple.h"
3234
#include "ResourcePolicy.h"
@@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
125127
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
126128

127129
// This comes from the framework itself. This way we avoid code duplication.
128-
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
129-
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
130-
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
131-
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
132-
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
133-
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
134-
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
135-
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
136-
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
137-
o2::framework::ConfigContext& configContext);
130+
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);
138131

139132
void doDefaultWorkflowTerminationHook();
140133

@@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)
167160

168161
void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
169162

170-
o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
171-
o2::framework::ServiceRegistry& configRegistry,
172-
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173-
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
163+
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
164+
o2::framework::ServiceRegistry& configRegistry,
165+
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
166+
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
174167

175168
std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();
176169

177-
int mainNoCatch(int argc, char** argv)
178-
{
179-
using namespace o2::framework;
170+
char* getIdString(int argc, char** argv);
180171

181-
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
182-
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
183-
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
184-
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
185-
186-
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187-
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188-
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189-
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190-
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
191-
192-
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
193-
std::vector<ConfigParamSpec> extraOptions;
194-
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
195-
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
196-
197-
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
198-
overrideAll(configContext, specs);
199-
for (auto& spec : specs) {
200-
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
201-
}
202-
std::vector<ChannelConfigurationPolicy> channelPolicies;
203-
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
204-
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205-
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206-
return doMain(argc, argv, specs,
207-
channelPolicies, completionPolicies, dispatchPolicies,
208-
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
172+
#define STRINGIZE_NX(A) #A
173+
#define STRINGIZE(A) STRINGIZE_NX(A)
174+
175+
// This is to allow the old "executable" based behavior
176+
// Each executable will contain a plugin called InternalWorkflow
177+
// In case one wants to use the new DSO based approach, the
178+
// name of the plugin an the library name where it is located
179+
// will have to be specified at build time.
180+
#ifndef DPL_WORKFLOW_PLUGIN_NAME
181+
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
182+
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
183+
#error Missing DPL_WORKFLOW_PLUGIN_NAME
184+
#endif
185+
#define DPL_WORKFLOW_PLUGIN_LIBRARY
186+
#endif
187+
188+
consteval char const* pluginName()
189+
{
190+
return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
209191
}
210192

211-
int callMain(int argc, char** argv, int (*)(int, char**));
212-
char* getIdString(int argc, char** argv);
193+
// Executables behave this way
194+
int callMain(int argc, char** argv, char const* pluginName);
213195

214196
int main(int argc, char** argv)
215197
{
216198
using namespace o2::framework;
217199

218-
int result = callMain(argc, argv, mainNoCatch);
200+
int result = callMain(argc, argv, pluginName());
219201

220202
char* idstring = getIdString(argc, argv);
221203
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
222204
callWorkflowTermination(onWorkflowTerminationHook, idstring);
223205

224206
return result;
225207
}
208+
209+
struct WorkflowDefinition {
210+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
211+
};
212+
213+
struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
214+
o2::framework::WorkflowDefinition* create() override
215+
{
216+
return new o2::framework::WorkflowDefinition{
217+
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
218+
using namespace o2::framework;
219+
WorkflowDefinitionContext workflowContext;
220+
221+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
222+
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
223+
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
224+
225+
workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
226+
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
227+
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
228+
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
229+
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();
230+
231+
workflowContext.configRegistry = createRegistry();
232+
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);
233+
234+
workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
235+
overrideAll(*workflowContext.configContext, workflowContext.specs);
236+
for (auto& spec : workflowContext.specs) {
237+
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
238+
}
239+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
240+
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
241+
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
242+
return workflowContext;
243+
}};
244+
}
245+
};
246+
247+
// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
248+
extern "C" {
249+
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
250+
{
251+
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
252+
return previous;
253+
}
254+
}
255+
226256
#endif

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
15661566
realOdesc.add_options()("early-forward-policy", bpo::value<std::string>());
15671567
realOdesc.add_options()("session", bpo::value<std::string>());
15681568
realOdesc.add_options()("signposts", bpo::value<std::string>());
1569+
realOdesc.add_options()("workflow-plugin", bpo::value<std::string>());
15691570
filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc);
15701571
wordfree(&expansions);
15711572
return;
@@ -1711,6 +1712,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
17111712
}
17121713
O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s",
17131714
spec.id.c_str(), str.str().c_str());
1715+
// Copy the plugin over from the DataProcessingInfo
1716+
execution.plugin = pi->plugin;
17141717
}
17151718
}
17161719

@@ -1755,6 +1758,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
17551758
("network-interface", bpo::value<std::string>(), "network interface to which to bind tpc fmq ports without specified address") //
17561759
("early-forward-policy", bpo::value<EarlyForwardPolicy>()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") //
17571760
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
1761+
("workflow-plugin", bpo::value<std::string>(), "workflow configuration plugin") //
17581762
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
17591763
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
17601764
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //

Framework/Core/src/WorkflowSerializationHelpers.cxx

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
9191
IN_DATAPROCESSOR_INFO,
9292
IN_DATAPROCESSOR_INFO_NAME,
9393
IN_DATAPROCESSOR_INFO_EXECUTABLE,
94+
IN_DATAPROCESSOR_INFO_PLUGIN,
9495
IN_DATAPROCESSOR_INFO_ARGS,
9596
IN_DATAPROCESSOR_INFO_ARG,
9697
IN_DATAPROCESSOR_INFO_CHANNELS,
@@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
263264
case State::IN_DATAPROCESSOR_INFO_EXECUTABLE:
264265
s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
265266
break;
267+
case State::IN_DATAPROCESSOR_INFO_PLUGIN:
268+
s << "IN_DATAPROCESSOR_INFO_PLUGIN";
269+
break;
266270
case State::IN_DATAPROCESSOR_INFO_ARGS:
267271
s << "IN_DATAPROCESSOR_INFO_ARGS";
268272
break;
@@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
706710
push(State::IN_DATAPROCESSOR_INFO_NAME);
707711
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
708712
push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE);
713+
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) {
714+
push(State::IN_DATAPROCESSOR_INFO_PLUGIN);
709715
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
710716
push(State::IN_DATAPROCESSOR_INFO_ARGS);
711717
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
@@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
732738
} else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) {
733739
assert(metadata.size());
734740
metadata.back().executable = s;
741+
} else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) {
742+
assert(metadata.size());
743+
metadata.back().plugin = s;
735744
} else if (in(State::IN_INPUT_BINDING)) {
736745
binding = s;
737746
} else if (in(State::IN_INPUT_ORIGIN)) {
@@ -888,7 +897,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
888897
if (!states.empty()) {
889898
debug << " now in " << states.back();
890899
}
891-
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
900+
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size() + 1}, "import", "POP: %s", debug.str().c_str());
892901
return result;
893902
}
894903
bool in(State o)
@@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
12541263
w.StartObject();
12551264
w.Key("name");
12561265
w.String(info.name.c_str());
1257-
w.Key("executable");
1258-
w.String(info.executable.c_str());
1266+
if (!info.executable.empty()) {
1267+
w.Key("executable");
1268+
w.String(info.executable.c_str());
1269+
}
1270+
if (!info.plugin.empty()) {
1271+
w.Key("plugin");
1272+
w.String(info.plugin.c_str());
1273+
}
12591274
w.Key("cmdLineArgs");
12601275
w.StartArray();
12611276
for (auto& arg : info.cmdLineArgs) {

0 commit comments

Comments
 (0)