Skip to content

Commit ba88af5

Browse files
committed
DPL: move topological sort in a separate file
Simplifies testing.
1 parent 1071e77 commit ba88af5

File tree

4 files changed

+116
-67
lines changed

4 files changed

+116
-67
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ o2_add_library(Framework
133133
src/TableConsumer.cxx
134134
src/TableTreeHelpers.cxx
135135
src/TopologyPolicy.cxx
136+
src/TopologyPolicyHelpers.cxx
136137
src/TextDriverClient.cxx
137138
src/TimesliceIndex.cxx
138139
src/TimingHelpers.cxx
@@ -248,6 +249,7 @@ add_executable(o2-test-framework-core
248249
test/test_TimeParallelPipelining.cxx
249250
test/test_TimesliceIndex.cxx
250251
test/test_TypeTraits.cxx
252+
test/test_TopologyPolicies.cxx
251253
test/test_Variants.cxx
252254
test/test_WorkflowHelpers.cxx
253255
test/test_WorkflowSerialization.cxx
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
13+
#define O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
14+
#include "Framework/WorkflowSpec.h"
15+
#include <vector>
16+
17+
namespace o2::framework
18+
{
19+
struct TopologyPolicyHelpers {
20+
static auto buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>;
21+
};
22+
} // namespace o2::framework
23+
#endif // O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/TopologyPolicyHelpers.h"
13+
#include "Framework/TopologyPolicy.h"
14+
15+
namespace o2::framework {
16+
namespace {
17+
void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
18+
{
19+
stream << spec.name;
20+
if (!spec.labels.empty()) {
21+
stream << "(";
22+
bool first = false;
23+
for (auto& label : spec.labels) {
24+
stream << (first ? "" : ",") << label.value;
25+
first = true;
26+
}
27+
stream << ")";
28+
}
29+
}
30+
}
31+
32+
auto TopologyPolicyHelpers::buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector < std::pair<int, int>> {
33+
std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
34+
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
35+
dependencyCheckers.reserve(physicalWorkflow.size());
36+
37+
for (auto& spec : physicalWorkflow) {
38+
for (auto& policy : topologyPolicies) {
39+
if (policy.matcher(spec)) {
40+
dependencyCheckers.push_back(policy.checkDependency);
41+
break;
42+
}
43+
}
44+
}
45+
assert(dependencyCheckers.size() == physicalWorkflow.size());
46+
// check if DataProcessorSpec at i depends on j
47+
auto checkDependencies = [&workflow = physicalWorkflow,
48+
&dependencyCheckers](int i, int j) {
49+
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
50+
return checker(workflow[i], workflow[j]);
51+
};
52+
std::vector<std::pair<int, int>> edges;
53+
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
54+
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
55+
if (i == j && checkDependencies(i, j)) {
56+
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
57+
}
58+
bool both = false;
59+
if (checkDependencies(i, j)) {
60+
edges.emplace_back(j, i);
61+
both = true;
62+
}
63+
if (checkDependencies(j, i)) {
64+
edges.emplace_back(i, j);
65+
if (both) {
66+
std::ostringstream str;
67+
describeDataProcessorSpec(str, physicalWorkflow[i]);
68+
str << " has circular dependency with ";
69+
describeDataProcessorSpec(str, physicalWorkflow[j]);
70+
str << ":\n";
71+
for (auto x : {i, j}) {
72+
str << physicalWorkflow[x].name << ":\n";
73+
str << "inputs:\n";
74+
for (auto& input : physicalWorkflow[x].inputs) {
75+
str << "- " << input << " " << (int)input.lifetime << "\n";
76+
}
77+
str << "outputs:\n";
78+
for (auto& output : physicalWorkflow[x].outputs) {
79+
str << "- " << output << " " << (int)output.lifetime << "\n";
80+
}
81+
}
82+
throw std::runtime_error(str.str());
83+
}
84+
}
85+
}
86+
}
87+
return edges;
88+
};
89+
} // namespace o2::framework

Framework/Core/src/runDataProcessing.cxx

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111
#include <memory>
12+
#include "Framework/TopologyPolicyHelpers.h"
1213
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
1314
#include <stdexcept>
1415
#include "Framework/BoostOptionsRetriever.h"
@@ -2835,19 +2836,6 @@ std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
28352836
return std::make_unique<o2::framework::ServiceRegistry>();
28362837
}
28372838

2838-
void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
2839-
{
2840-
stream << spec.name;
2841-
if (!spec.labels.empty()) {
2842-
stream << "(";
2843-
bool first = false;
2844-
for (auto& label : spec.labels) {
2845-
stream << (first ? "" : ",") << label.value;
2846-
first = true;
2847-
}
2848-
stream << ")";
2849-
}
2850-
}
28512839

28522840
// This is a toy executor for the workflow spec
28532841
// What it needs to do is:
@@ -3034,65 +3022,12 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
30343022
[](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
30353023
}
30363024

3037-
std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
3038-
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
3039-
dependencyCheckers.reserve(physicalWorkflow.size());
3040-
3041-
for (auto& spec : physicalWorkflow) {
3042-
for (auto& policy : topologyPolicies) {
3043-
if (policy.matcher(spec)) {
3044-
dependencyCheckers.push_back(policy.checkDependency);
3045-
break;
3046-
}
3047-
}
3048-
}
3049-
assert(dependencyCheckers.size() == physicalWorkflow.size());
3050-
// check if DataProcessorSpec at i depends on j
3051-
auto checkDependencies = [&workflow = physicalWorkflow,
3052-
&dependencyCheckers](int i, int j) {
3053-
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
3054-
return checker(workflow[i], workflow[j]);
3055-
};
3056-
30573025
// Create a list of all the edges, so that we can do a topological sort
30583026
// before we create the graph.
30593027
std::vector<std::pair<int, int>> edges;
30603028

30613029
if (physicalWorkflow.size() > 1) {
3062-
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
3063-
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
3064-
if (i == j && checkDependencies(i, j)) {
3065-
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
3066-
}
3067-
bool both = false;
3068-
if (checkDependencies(i, j)) {
3069-
edges.emplace_back(j, i);
3070-
both = true;
3071-
}
3072-
if (checkDependencies(j, i)) {
3073-
edges.emplace_back(i, j);
3074-
if (both) {
3075-
std::ostringstream str;
3076-
describeDataProcessorSpec(str, physicalWorkflow[i]);
3077-
str << " has circular dependency with ";
3078-
describeDataProcessorSpec(str, physicalWorkflow[j]);
3079-
str << ":\n";
3080-
for (auto x : {i, j}) {
3081-
str << physicalWorkflow[x].name << ":\n";
3082-
str << "inputs:\n";
3083-
for (auto& input : physicalWorkflow[x].inputs) {
3084-
str << "- " << input << " " << (int)input.lifetime << "\n";
3085-
}
3086-
str << "outputs:\n";
3087-
for (auto& output : physicalWorkflow[x].outputs) {
3088-
str << "- " << output << " " << (int)output.lifetime << "\n";
3089-
}
3090-
}
3091-
throw std::runtime_error(str.str());
3092-
}
3093-
}
3094-
}
3095-
}
3030+
edges = TopologyPolicyHelpers::buildEdges(physicalWorkflow);
30963031

30973032
auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair<int, int>), edges.size());
30983033
if (topoInfos.size() != physicalWorkflow.size()) {

0 commit comments

Comments
 (0)