Skip to content

Commit 0d2ca8e

Browse files
authored
Merge c381e69 into sapling-pr-archive-ktf
2 parents fd3028c + c381e69 commit 0d2ca8e

File tree

4 files changed

+123
-86
lines changed

4 files changed

+123
-86
lines changed

Framework/Core/include/Framework/PluginManager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ struct PluginManager {
8787
#else
8888
auto libraryName = fmt::format("lib{}.so", loadablePlugin.library);
8989
#endif
90-
auto ret = uv_dlopen(libraryName.c_str(), &handle);
90+
auto ret = uv_dlopen(loadablePlugin.library.empty() ? nullptr : libraryName.c_str(), &handle);
9191
if (ret != 0) {
9292
LOGP(error, "Could not load library {}", loadablePlugin.library);
9393
LOG(error) << uv_dlerror(&handle);

Framework/Core/include/Framework/Plugins.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
4444
// using the arrow dataset API
4545
RootObjectReadingImplementation,
4646

47+
// A plugin which defines a whole workflow. This will be used to separate
48+
// workflows in shared libraries and run them via a separate loader.
49+
Workflow,
4750
// A plugin which was not initialised properly.
4851
Unknown
4952
};

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include "Framework/CustomWorkflowTerminationHook.h"
2727
#include "Framework/CommonServices.h"
2828
#include "Framework/WorkflowCustomizationHelpers.h"
29+
#include "Framework/WorkflowDefinitionContext.h"
2930
#include "Framework/Logger.h"
31+
#include "Framework/Plugins.h"
3032
#include "Framework/CheckTypes.h"
3133
#include "Framework/StructToTuple.h"
3234
#include "ResourcePolicy.h"
@@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
125127
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
126128

127129
// This comes from the framework itself. This way we avoid code duplication.
128-
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
129-
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
130-
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
131-
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
132-
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
133-
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
134-
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
135-
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
136-
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
137-
o2::framework::ConfigContext& configContext);
130+
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);
138131

139132
void doDefaultWorkflowTerminationHook();
140133

@@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)
167160

168161
void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
169162

170-
o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
171-
o2::framework::ServiceRegistry& configRegistry,
172-
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173-
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
163+
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
164+
o2::framework::ServiceRegistry& configRegistry,
165+
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
166+
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
174167

175168
std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();
176169

177-
int mainNoCatch(int argc, char** argv)
178-
{
179-
using namespace o2::framework;
170+
char* getIdString(int argc, char** argv);
180171

181-
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
182-
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
183-
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
184-
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
185-
186-
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187-
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188-
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189-
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190-
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
191-
192-
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
193-
std::vector<ConfigParamSpec> extraOptions;
194-
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
195-
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
196-
197-
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
198-
overrideAll(configContext, specs);
199-
for (auto& spec : specs) {
200-
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
201-
}
202-
std::vector<ChannelConfigurationPolicy> channelPolicies;
203-
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
204-
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205-
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206-
return doMain(argc, argv, specs,
207-
channelPolicies, completionPolicies, dispatchPolicies,
208-
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
172+
#define STRINGIZE_NX(A) #A
173+
#define STRINGIZE(A) STRINGIZE_NX(A)
174+
175+
// This is to allow the old "executable" based behavior
176+
// Each executable will contain a plugin called InternalWorkflow
177+
// In case one wants to use the new DSO based approach, the
178+
// name of the plugin an the library name where it is located
179+
// will have to be specified at build time.
180+
#ifndef DPL_WORKFLOW_PLUGIN_NAME
181+
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
182+
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
183+
#error Missing DPL_WORKFLOW_PLUGIN_NAME
184+
#endif
185+
#define DPL_WORKFLOW_PLUGIN_LIBRARY ""
186+
#endif
187+
188+
consteval char const* pluginName()
189+
{
190+
return DPL_WORKFLOW_PLUGIN_LIBRARY ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
209191
}
210192

211-
int callMain(int argc, char** argv, int (*)(int, char**));
212-
char* getIdString(int argc, char** argv);
193+
// Executables behave this way
194+
int callMain(int argc, char** argv, char const* pluginName);
213195

214196
int main(int argc, char** argv)
215197
{
216198
using namespace o2::framework;
217199

218-
int result = callMain(argc, argv, mainNoCatch);
200+
int result = callMain(argc, argv, pluginName());
219201

220202
char* idstring = getIdString(argc, argv);
221203
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
222204
callWorkflowTermination(onWorkflowTerminationHook, idstring);
223205

224206
return result;
225207
}
208+
209+
struct WorkflowDefinition {
210+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
211+
};
212+
213+
struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
214+
o2::framework::WorkflowDefinition* create() override
215+
{
216+
return new o2::framework::WorkflowDefinition{
217+
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
218+
using namespace o2::framework;
219+
WorkflowDefinitionContext workflowContext;
220+
221+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
222+
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
223+
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
224+
225+
workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
226+
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
227+
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
228+
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
229+
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();
230+
231+
workflowContext.configRegistry = createRegistry();
232+
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);
233+
234+
workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
235+
overrideAll(*workflowContext.configContext, workflowContext.specs);
236+
for (auto& spec : workflowContext.specs) {
237+
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
238+
}
239+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
240+
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
241+
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
242+
return workflowContext;
243+
}};
244+
}
245+
};
246+
247+
// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
248+
extern "C" {
249+
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
250+
{
251+
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
252+
return previous;
253+
}
254+
}
255+
226256
#endif

0 commit comments

Comments
 (0)