diff --git a/include/osp/dag_divider/isomorphism_divider/EftSubgraphScheduler.hpp b/include/osp/dag_divider/isomorphism_divider/EftSubgraphScheduler.hpp index 46fa8241..30b878c1 100644 --- a/include/osp/dag_divider/isomorphism_divider/EftSubgraphScheduler.hpp +++ b/include/osp/dag_divider/isomorphism_divider/EftSubgraphScheduler.hpp @@ -49,12 +49,18 @@ class EftSubgraphScheduler { return execute_schedule(instance); } + void setMinWorkPerProcessor(const v_workw_t min_work_per_processor) { + min_work_per_processor_ = min_work_per_processor; + } + private: static constexpr bool verbose = false; using job_id_t = vertex_idx_t; + v_workw_t min_work_per_processor_ = 2000; + enum class JobStatus { WAITING, READY, @@ -117,11 +123,11 @@ class EftSubgraphScheduler { } else { job.status = JobStatus::WAITING; } - job.multiplicity = multiplicities[idx]; - job.max_num_procs = max_num_procs[idx]; + job.total_work = graph.vertex_work_weight(idx); + job.max_num_procs = std::min(max_num_procs[idx], static_cast((job.total_work + min_work_per_processor_ - 1) / min_work_per_processor_)); + job.multiplicity = std::min(multiplicities[idx], job.max_num_procs); job.required_proc_types = required_proc_types[idx]; job.assigned_workers.resize(num_worker_types, 0); - job.total_work = graph.vertex_work_weight(idx); job.start_time = -1.0; job.finish_time = -1.0; @@ -328,7 +334,7 @@ class EftSubgraphScheduler { std::cout << "Final Makespan: " << current_time << std::endl; std::cout << "Job Summary:" << std::endl; for(const auto& job : jobs_) { - std::cout << " - Job " << job.id << ": Multiplicity=" << job.multiplicity << ", Max Procs=" << job.max_num_procs << ", Start=" << job.start_time << ", Finish=" << job.finish_time << ", Workers=["; + std::cout << " - Job " << job.id << ": Multiplicity=" << job.multiplicity << ", Max Procs=" << job.max_num_procs << ", Work=" << job.total_work << ", Start=" << job.start_time << ", Finish=" << job.finish_time << ", Workers=["; for(size_t i=0; i::max(), it means no processors - // were assigned to any type (all counts were 0). In this case, min_non_zero_procs > 1 will be false. + bool use_trimmed_scheduler = sub_sched.was_trimmed[group_idx] && min_non_zero_procs > 1 && allow_use_trimmed_scheduler; Scheduler* scheduler_for_group_ptr; @@ -474,12 +475,20 @@ class IsomorphicSubgraphScheduler { writer.write_colored_graph(timestamp + "iso_group_rep_" + std::to_string(group_idx) + ".dot", rep_dag, colors); } + + const bool max_bsp = use_max_bsp && (representative_instance.getComputationalDag().num_edges() == 0) && (representative_instance.getComputationalDag().vertex_type(0) == 0); + // Build data structures for applying the pattern --- // Map (superstep, processor) -> relative partition ID std::map, vertex_idx_t> sp_proc_to_relative_partition; vertex_idx_t num_partitions_per_subgraph = 0; for (vertex_idx_t j = 0; j < static_cast>(rep_subgraph_vertices_sorted.size()); ++j) { - const auto sp_pair = std::make_pair(bsp_schedule.assignedSuperstep(j), bsp_schedule.assignedProcessor(j)); + auto sp_pair = std::make_pair(bsp_schedule.assignedSuperstep(j), bsp_schedule.assignedProcessor(j)); + + if (max_bsp) + sp_pair = std::make_pair(j, 0); + + if (sp_proc_to_relative_partition.find(sp_pair) == sp_proc_to_relative_partition.end()) { sp_proc_to_relative_partition[sp_pair] = num_partitions_per_subgraph++; } @@ -516,7 +525,11 @@ class IsomorphicSubgraphScheduler { // Apply the partition pattern for (const auto& current_vertex : current_subgraph_vertices_sorted) { const auto rep_local_idx = current_vertex_to_rep_local_idx.at(current_vertex); - const auto sp_pair = std::make_pair(bsp_schedule.assignedSuperstep(rep_local_idx), bsp_schedule.assignedProcessor(rep_local_idx)); + auto sp_pair = std::make_pair(bsp_schedule.assignedSuperstep(rep_local_idx), bsp_schedule.assignedProcessor(rep_local_idx)); + + if (max_bsp) + sp_pair = std::make_pair(rep_local_idx, 0); + partition[current_vertex] = current_partition_idx + sp_proc_to_relative_partition.at(sp_pair); } current_partition_idx += num_partitions_per_subgraph; diff --git a/include/osp/dag_divider/isomorphism_divider/OrbitGraphProcessor.hpp b/include/osp/dag_divider/isomorphism_divider/OrbitGraphProcessor.hpp index 847913f3..5966383a 100644 --- a/include/osp/dag_divider/isomorphism_divider/OrbitGraphProcessor.hpp +++ b/include/osp/dag_divider/isomorphism_divider/OrbitGraphProcessor.hpp @@ -29,6 +29,8 @@ limitations under the License. #include "osp/graph_algorithms/transitive_reduction.hpp" #include #include +#include +#include namespace osp { @@ -43,6 +45,25 @@ namespace osp { template class OrbitGraphProcessor { public: + + /** + * @brief Heuristics for selecting which symmetry levels to test during coarsening. + */ + enum class SymmetryLevelHeuristic { + /** + * @brief Original logic: Select levels where cumulative work passes an increasing threshold. + */ + CURRENT_DEFAULT, + /** + * @brief Select levels that correspond to fixed work-load percentiles. + */ + PERCENTILE_BASED, + /** + * @brief Select levels based on the orbit size or count distribution. + */ + NATURAL_BREAKS + }; + static_assert(is_computational_dag_v, "Graph must be a computational DAG"); static_assert(is_computational_dag_v, "Constr_Graph_t must be a computational DAG"); static_assert(is_constructable_cdag_v, @@ -60,7 +81,6 @@ class OrbitGraphProcessor { std::vector> subgraphs; inline size_t size() const { return subgraphs.size(); } - // v_workw_t work_weight_per_subgraph = 0; }; private: @@ -79,14 +99,17 @@ class OrbitGraphProcessor { v_workw_t work_threshold_ = 0; v_workw_t critical_path_threshold_ = 0; bool merge_different_node_types_ = true; - double lock_orbit_ratio = 0.2; + double lock_orbit_ratio = 0.5; + + SymmetryLevelHeuristic symmetry_level_heuristic_ = SymmetryLevelHeuristic::NATURAL_BREAKS; + std::vector work_percentiles_ = {0.50, 0.75}; + double natural_breaks_count_percentage_ = 0.2; struct PairHasher { template std::size_t operator()(const std::pair &p) const { auto h1 = std::hash{}(p.first); auto h2 = std::hash{}(p.second); - // A common way to combine two hashes. return h1 ^ (h2 << 1); } }; @@ -94,18 +117,8 @@ class OrbitGraphProcessor { std::unordered_set, PairHasher> non_viable_edges_cache_; std::unordered_set, PairHasher> non_viable_crit_path_edges_cache_; - std::unordered_set locked_orbits; - /** * @brief Simulates the merge of node v into u and returns the resulting temporary graph. - * - * This function does not modify the current state. It creates a temporary contraction map - * and uses it to build a potential new coarse graph for inspection. - * - * @param u The target node for the merge. - * @param v The node to be merged into u. - * @param current_coarse_graph The current coarse graph. - * @return A pair containing the simulated coarse graph and the contraction map used to create it. */ std::pair> simulate_merge(VertexType u, VertexType v, const Constr_Graph_t ¤t_coarse_graph) const { @@ -116,7 +129,6 @@ class OrbitGraphProcessor { temp_contraction_map[i] = new_idx++; } } - // Assign 'v' the same new index as 'u'. temp_contraction_map[v] = temp_contraction_map[u]; Constr_Graph_t temp_coarse_graph; @@ -127,9 +139,6 @@ class OrbitGraphProcessor { /** * @brief Commits a merge operation by updating the graph state. - * - * This function takes the results of a successful merge simulation and applies them, - * updating the coarse graph, groups, and main contraction map. */ void commit_merge(VertexType u, VertexType v, Constr_Graph_t &&next_coarse_graph, const std::vector &group_remap, @@ -138,7 +147,7 @@ class OrbitGraphProcessor { current_coarse_graph = std::move(next_coarse_graph); - // When we commit the merge, the vertex indices change. We must update our cache. + // Update caches for new vertex indices std::unordered_set, PairHasher> next_non_viable_edges; for (const auto &non_viable_edge : non_viable_edges_cache_) { const VertexType old_u = non_viable_edge.first; @@ -148,7 +157,7 @@ class OrbitGraphProcessor { if (old_u != v && old_v != v && new_u != new_v) { next_non_viable_edges.insert({new_u, new_v}); - } + } } non_viable_edges_cache_ = std::move(next_non_viable_edges); @@ -166,14 +175,7 @@ class OrbitGraphProcessor { } non_viable_crit_path_edges_cache_ = std::move(next_non_viable_crit_path_edges); - - std::unordered_set next_locked_orbits; - for (const auto &locked_orbit : locked_orbits) { - next_locked_orbits.insert(group_remap[locked_orbit]); - } - - locked_orbits = std::move(next_locked_orbits); - + // Update groups std::vector next_groups(current_coarse_graph.num_vertices()); for (VertexType i = 0; i < static_cast(current_groups.size()); ++i) { if (i != u && i != v) { @@ -183,11 +185,15 @@ class OrbitGraphProcessor { next_groups[group_remap[u]].subgraphs = std::move(new_subgraphs); current_groups = std::move(next_groups); + // Update main contraction map for (VertexType &node_map : current_contraction_map) { node_map = group_remap[node_map]; } } + /** + * @brief Merges small orbits based on work threshold (final cleanup pass). + */ void merge_small_orbits(const Graph_t &original_dag, Constr_Graph_t& current_coarse_graph, std::vector& current_groups, @@ -204,28 +210,19 @@ class OrbitGraphProcessor { changed = false; for (const auto u : current_coarse_graph.vertices()) { - for (const auto v : current_coarse_graph.children(u)) { - + for (const auto v : current_coarse_graph.children(u)) { if constexpr (has_typed_vertices_v) { if (not merge_different_node_types_) { if (current_coarse_graph.vertex_type(u) != current_coarse_graph.vertex_type(v)) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " not viable (different node types)\n"; - } + } continue; } } } - if (locked_orbits.count(u) || locked_orbits.count(v)) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " locked. Skipping.\n"; - } - continue; - } - - // Check memoization cache first if (non_viable_edges_cache_.count({u, v}) || non_viable_crit_path_edges_cache_.count({u, v})) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " already checked. Skipping.\n"; @@ -233,7 +230,6 @@ class OrbitGraphProcessor { continue; } - const v_workw_t u_work_weight = current_coarse_graph.vertex_work_weight(u); const v_workw_t v_work_weight = current_coarse_graph.vertex_work_weight(v); const v_workw_t v_threshold = work_threshold * static_cast>(current_groups[v].size()); @@ -246,7 +242,6 @@ class OrbitGraphProcessor { continue; } - if ((vertexPoset[u] + 1 != vertexPoset[v]) && (vertexBotPoset[u] != 1 + vertexBotPoset[v])) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v @@ -257,18 +252,9 @@ class OrbitGraphProcessor { } std::vector> new_subgraphs; - - const VertexType small_weight_vertex = u_work_weight < v_work_weight ? u : v; - const VertexType large_weight_vertex = u_work_weight < v_work_weight ? v : u; - - // --- Check Constraints --- - // Symmetry Threshold - bool error = false; - const bool merge_viable = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs, error); - const bool both_below_symmetry_threshold = (current_groups[u].size() < current_symmetry) && (current_groups[v].size() < current_symmetry); - const bool merge_small_weight_orbit = (current_groups[small_weight_vertex].size() >= current_symmetry) && (current_groups[large_weight_vertex].size() < current_symmetry); - - if (error) { + const bool merge_is_valid = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs); + + if (!merge_is_valid) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " and " << v << " not viable (error in is_merge_viable)\n"; @@ -277,38 +263,27 @@ class OrbitGraphProcessor { continue; } - if (!merge_viable && !both_below_symmetry_threshold && !merge_small_weight_orbit) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " not viable (symmetry threshold)\n"; - } - non_viable_edges_cache_.insert({u, v}); - continue; - } - - // Simulate the merge to get the potential new graph. auto [temp_coarse_graph, temp_contraction_map] = simulate_merge(u, v, current_coarse_graph); if (critical_path_weight(temp_coarse_graph) > (path_threshold * static_cast>(new_subgraphs.size()) + critical_path_weight(current_coarse_graph))) { - //if (critical_path_weight(temp_coarse_graph) > critical_path_weight(current_coarse_graph)) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " increases critical path. Old cirtical path: " << critical_path_weight(current_coarse_graph) - << " new critical path: " << critical_path_weight(temp_coarse_graph) << " + " << path_threshold * static_cast>(new_subgraphs.size()) << "\n"; + << " new critical path: " << critical_path_weight(temp_coarse_graph) << " + " << path_threshold * static_cast>(new_subgraphs.size()) << "\n"; } non_viable_crit_path_edges_cache_.insert({u, v}); continue; } - // If all checks pass, commit the merge. if constexpr (verbose) { std::cout << " - Merging " << v << " into " << u << ". New coarse graph has " - << temp_coarse_graph.num_vertices() << " nodes.\n"; + << temp_coarse_graph.num_vertices() << " nodes.\n"; } commit_merge(u, v, std::move(temp_coarse_graph), temp_contraction_map, std::move(new_subgraphs), current_coarse_graph, current_groups, current_contraction_map); changed = true; - break; // Restart scan on the new, smaller graph + break; } if (changed) { break; @@ -317,6 +292,9 @@ class OrbitGraphProcessor { } } + /** + * @brief Deprecated non-adaptive merge function. + */ void contract_edges(const Graph_t &original_dag, Constr_Graph_t& current_coarse_graph, std::vector& current_groups, std::vector& current_contraction_map, const bool merge_symmetry_narrowing, const bool merge_different_node_types, const v_workw_t path_threshold = 0) { bool changed = true; @@ -331,114 +309,69 @@ class OrbitGraphProcessor { VertexType u = source(edge, current_coarse_graph); VertexType v = target(edge, current_coarse_graph); - // Check memoization cache first if (non_viable_edges_cache_.count({u, v}) || non_viable_crit_path_edges_cache_.count({u, v})) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " already checked. Skipping.\n"; - } continue; } - if constexpr (has_typed_vertices_v) { if (not merge_different_node_types) { if (current_coarse_graph.vertex_type(u) != current_coarse_graph.vertex_type(v)) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " not viable (different node types)\n"; - } continue; } } } - if ((vertexPoset[u] + 1 != vertexPoset[v]) && (vertexBotPoset[u] != 1 + vertexBotPoset[v])) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v - << " not viable poset. poste v: " << vertexBotPoset[v] - << " poste u: " << vertexBotPoset[u] << "\n"; - } continue; } std::vector> new_subgraphs; - - // --- Check Constraints --- - // Symmetry Threshold - const std::size_t u_size = current_groups[u].size(); const std::size_t v_size = current_groups[v].size(); - bool error = false; - const bool merge_viable = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs, error); - const bool both_below_symmetry_threshold = - (u_size < current_symmetry) && - (v_size < current_symmetry);// && - // (not ((u_size == 1 && v_size > 1) || (u_size > 1 && v_size == 1))); - - if (error) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " and " << v - << " not viable (error in is_merge_viable)\n"; - } + const bool merge_is_valid = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs); + const std::size_t new_size = new_subgraphs.size(); + + const bool merge_viable = (new_size >= current_symmetry); + const bool both_below_symmetry_threshold = (u_size < current_symmetry) && (v_size < current_symmetry); + + if (!merge_is_valid) { non_viable_edges_cache_.insert({u, v}); continue; } - if (!merge_viable && !both_below_symmetry_threshold) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " not viable (symmetry threshold)\n"; - } non_viable_edges_cache_.insert({u, v}); continue; } - if (not merge_symmetry_narrowing) { - const std::size_t min_size = std::min(u_size, v_size); - const std::size_t new_size = new_subgraphs.size(); - - if (new_size < min_size) { - if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v - << " not viable (symmetry narrowing: " << u_size << "x" << v_size << " -> " - << new_size << " subgraphs)\n"; - } + if (new_size < std::min(u_size, v_size)) { continue; } } - // Simulate the merge to get the potential new graph. auto [temp_coarse_graph, temp_contraction_map] = simulate_merge(u, v, current_coarse_graph); if (critical_path_weight(temp_coarse_graph) > (path_threshold * static_cast>(new_subgraphs.size()) + critical_path_weight(current_coarse_graph))) { - //if (critical_path_weight(temp_coarse_graph) > critical_path_weight(current_coarse_graph)) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " increases critical path. Old cirtical path: " << critical_path_weight(current_coarse_graph) - << " new critical path: " << critical_path_weight(temp_coarse_graph) << " + " << path_threshold * static_cast>(new_subgraphs.size()) << "\n"; - } non_viable_crit_path_edges_cache_.insert({u, v}); continue; } - // If all checks pass, commit the merge. - if constexpr (verbose) { - std::cout << " - Merging " << v << " into " << u << ". New coarse graph has " - << temp_coarse_graph.num_vertices() << " nodes.\n"; - } - commit_merge(u, v, std::move(temp_coarse_graph), temp_contraction_map, std::move(new_subgraphs), current_coarse_graph, current_groups, current_contraction_map); - changed = true; - break; // Restart scan on the new, smaller graph + break; } } } + /** + * @brief Core adaptive merging function. + */ void contract_edges_adpative_sym(const Graph_t &original_dag, Constr_Graph_t& current_coarse_graph, std::vector& current_groups, std::vector& current_contraction_map, - /* const bool merge_symmetry_narrowing, */ const bool merge_different_node_types, - const bool check_below_threshold, + const bool merge_below_threshold, + const std::vector>& lock_threshold_per_type, const v_workw_t path_threshold = 0) { bool changed = true; @@ -453,14 +386,6 @@ class OrbitGraphProcessor { VertexType u = source(edge, current_coarse_graph); VertexType v = target(edge, current_coarse_graph); - if (locked_orbits.count(u) || locked_orbits.count(v)) { - if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " locked. Skipping.\n"; - } - continue; - } - - // Check memoization cache first if (non_viable_edges_cache_.count({u, v}) || non_viable_crit_path_edges_cache_.count({u, v})) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " already checked. Skipping.\n"; @@ -473,7 +398,7 @@ class OrbitGraphProcessor { if (current_coarse_graph.vertex_type(u) != current_coarse_graph.vertex_type(v)) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " not viable (different node types)\n"; - } + } continue; } } @@ -489,20 +414,13 @@ class OrbitGraphProcessor { } std::vector> new_subgraphs; - - // --- Check Constraints --- - // Symmetry Threshold - const std::size_t u_size = current_groups[u].size(); const std::size_t v_size = current_groups[v].size(); - bool error = false; - const bool merge_viable = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs, error); - const bool both_below_symmetry_threshold = check_below_threshold && - (u_size < current_symmetry) && - (v_size < current_symmetry);// && - // (not ((u_size == 1 && v_size > 1) || (u_size > 1 && v_size == 1))); - - if (error) { + + const bool merge_is_valid = is_merge_viable(original_dag, current_groups[u], current_groups[v], new_subgraphs); + const std::size_t new_size = new_subgraphs.size(); + + if (!merge_is_valid) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " and " << v << " not viable (error in is_merge_viable)\n"; @@ -511,22 +429,65 @@ class OrbitGraphProcessor { continue; } - if (!merge_viable && !both_below_symmetry_threshold) { + const bool merge_viable = (new_size >= current_symmetry); + const bool both_below_minimal_threshold = merge_below_threshold && (u_size < min_symmetry_) && (v_size < min_symmetry_); + + if (!merge_viable && !both_below_minimal_threshold) { if constexpr (verbose) { - std::cout << " - Merge of " << u << " and " << v << " not viable (symmetry threshold)\n"; + std::cout << " - Merge of " << u << " and " << v << " not viable (Symmetry Threshold)\n"; + std::cout << " - u_sym: " << u_size << ", v_sym: " << v_size << " -> new_sym: " << new_size + << " (current_threshold: " << current_symmetry + << ", global_min_threshold: " << min_symmetry_ << ")\n"; } non_viable_edges_cache_.insert({u, v}); continue; } + v_type_t u_type = 0; + v_type_t v_type = 0; + if (not merge_different_node_types && has_typed_vertices_v ) { + u_type = current_coarse_graph.vertex_type(u); + v_type = current_coarse_graph.vertex_type(v); + } - + const bool u_is_significant = (u_size >= min_symmetry_) && + (current_coarse_graph.vertex_work_weight(u) > lock_threshold_per_type[u_type]); + const bool v_is_significant = (v_size >= min_symmetry_) && + (current_coarse_graph.vertex_work_weight(v) > lock_threshold_per_type[v_type]); - // Simulate the merge to get the potential new graph. + if (u_is_significant && v_is_significant) + { + // Both are significant --- + if (new_size < std::min(u_size, v_size)) { + if constexpr (verbose) { + std::cout << " - Merge of " << u << " and " << v << " not viable (Symmetry Narrowing below min of two significant nodes)\n"; + std::cout << " - u_sym: " << u_size << ", v_sym: " << v_size << " -> new_sym: " << new_size << "\n"; + } + non_viable_edges_cache_.insert({u, v}); + continue; + } + } + else if (u_is_significant || v_is_significant) + { + // Exactly one is significant --- + const std::size_t significant_node_size = u_is_significant ? u_size : v_size; + + if (new_size < significant_node_size) { + if constexpr (verbose) { + std::cout << " - Merge of " << u << " and " << v << " not viable (Symmetry Narrowing of a single significant node)\n"; + std::cout << " - u_sym: " << u_size << " (sig: " << u_is_significant << ")" + << ", v_sym: " << v_size << " (sig: " << v_is_significant << ")" + << " -> new_sym: " << new_size << "\n"; + } + non_viable_edges_cache_.insert({u, v}); + continue; + } + } + + // Critical Path Check auto [temp_coarse_graph, temp_contraction_map] = simulate_merge(u, v, current_coarse_graph); if (critical_path_weight(temp_coarse_graph) > (path_threshold * static_cast>(new_subgraphs.size()) + critical_path_weight(current_coarse_graph))) { - //if (critical_path_weight(temp_coarse_graph) > critical_path_weight(current_coarse_graph)) { if constexpr (verbose) { std::cout << " - Merge of " << u << " and " << v << " increases critical path. Old cirtical path: " << critical_path_weight(current_coarse_graph) << " new critical path: " << critical_path_weight(temp_coarse_graph) << " + " << path_threshold * static_cast>(new_subgraphs.size()) << "\n"; @@ -535,7 +496,7 @@ class OrbitGraphProcessor { continue; } - // If all checks pass, commit the merge. + // Commit Merge if constexpr (verbose) { std::cout << " - Merging " << v << " into " << u << ". New coarse graph has " << temp_coarse_graph.num_vertices() << " nodes.\n"; @@ -545,29 +506,33 @@ class OrbitGraphProcessor { current_coarse_graph, current_groups, current_contraction_map); changed = true; - break; // Restart scan on the new, smaller graph + break; } } } public: + explicit OrbitGraphProcessor(size_t symmetry_threshold = 2) : symmetry_threshold_(symmetry_threshold) {} - /** - * @brief Sets the minimum number of isomorphic subgraphs a merged group must have. - * @param threshold The symmetry threshold. - */ void set_symmetry_threshold(size_t threshold) { symmetry_threshold_ = threshold; } void setMergeDifferentNodeTypes(bool flag) { merge_different_node_types_ = flag; } void set_work_threshold(v_workw_t work_threshold) { work_threshold_ = work_threshold; } void setCriticalPathThreshold(v_workw_t critical_path_threshold) { critical_path_threshold_ = critical_path_threshold; } void setLockRatio(double lock_ratio) { lock_orbit_ratio = lock_ratio; } void setMinSymmetry(size_t min_symmetry) { min_symmetry_ = min_symmetry; } + void setSymmetryLevelHeuristic(SymmetryLevelHeuristic heuristic) { symmetry_level_heuristic_ = heuristic; } + void setWorkPercentiles(const std::vector& percentiles) { + work_percentiles_ = percentiles; + std::sort(work_percentiles_.begin(), work_percentiles_.end()); + } + + void setNaturalBreaksCountPercentage(double percentage) { natural_breaks_count_percentage_ = percentage; } + /** * @brief Discovers isomorphic groups (orbits) and constructs a coarse graph. - * @param dag The input computational DAG. */ void discover_isomorphic_groups(const Graph_t &dag, const HashComputer &hasher) { coarse_graph_ = Constr_Graph_t(); @@ -577,7 +542,6 @@ class OrbitGraphProcessor { final_groups_.clear(); non_viable_edges_cache_.clear(); non_viable_crit_path_edges_cache_.clear(); - current_symmetry = symmetry_threshold_; if (dag.num_vertices() == 0) { return; @@ -594,14 +558,206 @@ class OrbitGraphProcessor { } coarse_node_idx++; } + + std::vector> work_per_vertex_type; + work_per_vertex_type.resize(merge_different_node_types_ ? 1U : dag.num_vertex_types(), 0); + + std::map orbit_size_counts; + std::map> work_per_orbit_size; + v_workw_t total_work = 0; + for (const auto &[hash, vertices] : orbits) { + const size_t orbit_size = vertices.size(); + orbit_size_counts[orbit_size]++; + + v_workw_t orbit_work = 0; + for (const auto v : vertices) { + orbit_work += dag.vertex_work_weight(v); + } + + if (not merge_different_node_types_ && has_typed_vertices_v) { + work_per_vertex_type[dag.vertex_type(vertices[0])] += orbit_work; + } else { + work_per_vertex_type[0] += orbit_work; + } + + work_per_orbit_size[orbit_size] += orbit_work; + total_work += orbit_work; + } + + std::vector> lock_threshold_per_type(work_per_vertex_type.size()); + for (size_t i = 0; i < work_per_vertex_type.size(); ++i) { + lock_threshold_per_type[i] = static_cast>(lock_orbit_ratio * work_per_vertex_type[i]); + } + + std::vector rel_acc_work_per_orbit_size; + std::vector symmetry_levels_to_test = compute_symmetry_levels(rel_acc_work_per_orbit_size, work_per_orbit_size, total_work, orbit_size_counts); + + if constexpr (verbose) { + std::cout << "\n--- Orbit Analysis ---\n"; + for (auto const& [size, count] : orbit_size_counts) { + if (total_work > 0) + std::cout << " - Orbits of size " << size << ": " << count << " groups, weight: " << 100.0 * static_cast(work_per_orbit_size[size]) / static_cast(total_work) << "%\n"; + else + std::cout << " - Orbits of size " << size << ": " << count << " groups, weight: 0.0%\n"; + } + std::cout << " Cumulative work distribution by orbit size (largest to smallest):\n"; + size_t i = 0; + for (auto it = orbit_size_counts.rbegin(); it != orbit_size_counts.rend() && i < rel_acc_work_per_orbit_size.size(); ++it, ++i) { + std::cout << " - Orbits with size >= " << it->first << ": " + << std::fixed << std::setprecision(2) << rel_acc_work_per_orbit_size[i] * 100 << "%\n"; + } + std::cout << " Work distribution by vertex type:\n"; + for (size_t j = 0; j < work_per_vertex_type.size(); ++j) { + if (total_work > 0) + std::cout << " - Vertex type " << j << ": " << 100.0 * static_cast(work_per_vertex_type[j]) / static_cast(total_work) << "%\n"; + else + std::cout << " - Vertex type " << j << ": 0.0%\n"; + } + + std::cout << "--------------------------------\n"; + std::cout << " Symmetry levels to test: " << "\n"; + for (const auto level : symmetry_levels_to_test) { + std::cout << " - " << level << "\n"; + } + std::cout << "--------------------------------\n"; + } coarser_util::construct_coarse_dag(dag, coarse_graph_, contraction_map_); - perform_coarsening_adaptive_symmetry(dag, coarse_graph_); + perform_coarsening_adaptive_symmetry(dag, coarse_graph_, lock_threshold_per_type, symmetry_levels_to_test); } private: + + std::vector compute_symmetry_levels(std::vector & rel_acc_work_per_orbit_size, const std::map> work_per_orbit_size, const v_workw_t total_work, const std::map orbit_size_counts) { + + std::vector symmetry_levels_to_test; + min_symmetry_ = 2; + + switch (symmetry_level_heuristic_) { + case SymmetryLevelHeuristic::PERCENTILE_BASED: + { + if constexpr (verbose) { std::cout << "Using PERCENTILE_BASED heuristic for symmetry levels.\n"; } + size_t percentile_idx = 0; + v_workw_t cumulative_work = 0; + for (auto it = work_per_orbit_size.rbegin(); + it != work_per_orbit_size.rend(); + ++it) + { + cumulative_work += it->second; + if (total_work == 0) continue; // Avoid division by zero + double current_work_ratio = static_cast(cumulative_work) / static_cast(total_work); + rel_acc_work_per_orbit_size.push_back(current_work_ratio); // For printing + + if (percentile_idx < work_percentiles_.size() && current_work_ratio >= work_percentiles_[percentile_idx]) { + if (it->first > min_symmetry_) { + symmetry_levels_to_test.push_back(it->first); + } + while (percentile_idx < work_percentiles_.size() && + current_work_ratio >= work_percentiles_[percentile_idx]) { + percentile_idx++; + } + } + } + break; + } + + case SymmetryLevelHeuristic::NATURAL_BREAKS: + { + if constexpr (verbose) { std::cout << "Using NATURAL_BREAKS heuristic for symmetry levels.\n"; } + + size_t total_orbit_groups = 0; + for (const auto& [size, count] : orbit_size_counts) { + total_orbit_groups += count; + } + size_t count_threshold = static_cast(static_cast(total_orbit_groups) * natural_breaks_count_percentage_); + if (count_threshold == 0 && total_orbit_groups > 0) { + count_threshold = 1; // Ensure threshold is at least 1 if possible + } + if constexpr (verbose) { std::cout << " - Total orbit groups: " << total_orbit_groups << ", count threshold: " << count_threshold << "\n"; } + + std::vector sorted_sizes; + sorted_sizes.reserve(orbit_size_counts.size()); + for (const auto& [size, count] : orbit_size_counts) { + sorted_sizes.push_back(size); + } + std::sort(sorted_sizes.rbegin(), sorted_sizes.rend()); // Sort descending + + if (!sorted_sizes.empty()) { + for (size_t i = 0; i < sorted_sizes.size(); ++i) { + const size_t current_size = sorted_sizes[i]; + if (current_size < min_symmetry_) continue; + + // Add if this size's count is significant + const size_t current_count = orbit_size_counts.at(current_size); + bool count_significant = (current_count >= count_threshold); + + if (count_significant) { + symmetry_levels_to_test.push_back(current_size); + continue; + } + } + } + + if (symmetry_levels_to_test.empty()) { + size_t max_count = 0; + size_t size_with_max_count = 0; + for (const auto& [size, count] : orbit_size_counts) { + if (count > max_count) { + max_count = count; + size_with_max_count = size; + } + } + if (size_with_max_count > 0) { + symmetry_levels_to_test.push_back(size_with_max_count); + } + } + + // Verbose print data + v_workw_t cumulative_work = 0; + for (auto it = work_per_orbit_size.rbegin(); it != work_per_orbit_size.rend(); ++it) { + cumulative_work += it->second; + if (total_work > 0) + rel_acc_work_per_orbit_size.push_back(static_cast(cumulative_work) / static_cast(total_work)); + } + break; + } + + case SymmetryLevelHeuristic::CURRENT_DEFAULT: + default: + { + if constexpr (verbose) { std::cout << "Using CURRENT_DEFAULT heuristic for symmetry levels.\n"; } + double threshold = lock_orbit_ratio; + v_workw_t cumulative_work = 0; + for (auto it = work_per_orbit_size.rbegin(); it != work_per_orbit_size.rend(); ++it) { + cumulative_work += it->second; + const double rel_work = (total_work == 0) ? 0 : static_cast(cumulative_work) / static_cast(total_work); + rel_acc_work_per_orbit_size.push_back(rel_work); // For printing + + if (rel_work >= threshold && it->first > min_symmetry_) { + symmetry_levels_to_test.push_back(it->first); + threshold += lock_orbit_ratio * 0.5; + } + } + break; + } + } + + if (symmetry_levels_to_test.empty()) + symmetry_levels_to_test.push_back(2); + + min_symmetry_ = symmetry_levels_to_test.back(); + + // De-duplicate and sort descending + std::sort(symmetry_levels_to_test.rbegin(), symmetry_levels_to_test.rend()); + auto last = std::unique(symmetry_levels_to_test.begin(), symmetry_levels_to_test.end()); + symmetry_levels_to_test.erase(last, symmetry_levels_to_test.end()); + + return symmetry_levels_to_test; + } + + /** - * @brief Greedily merges nodes in the orbit graph based on structural and symmetry constraints. + * @brief Non-adaptive coarsening (deprecated). */ void perform_coarsening(const Graph_t &original_dag, const Constr_Graph_t &initial_coarse_graph) { final_coarse_graph_ = Constr_Graph_t(); @@ -648,7 +804,6 @@ class OrbitGraphProcessor { contract_edges(original_dag, current_coarse_graph, current_groups, current_contraction_map, true, merge_different_node_types_, work_threshold_); - // --- Finalize --- final_coarse_graph_ = std::move(current_coarse_graph); final_contraction_map_ = std::move(current_contraction_map); final_groups_ = std::move(current_groups); @@ -658,7 +813,7 @@ class OrbitGraphProcessor { } } - void perform_coarsening_adaptive_symmetry(const Graph_t &original_dag, const Constr_Graph_t &initial_coarse_graph) { + void perform_coarsening_adaptive_symmetry(const Graph_t &original_dag, const Constr_Graph_t &initial_coarse_graph, const std::vector>& lock_threshold_per_type, const std::vector& symmetry_levels_to_test) { final_coarse_graph_ = Constr_Graph_t(); final_contraction_map_.clear(); @@ -670,54 +825,40 @@ class OrbitGraphProcessor { std::vector current_groups(initial_coarse_graph.num_vertices()); std::vector current_contraction_map = contraction_map_; - // Initialize groups: each group corresponds to an orbit. for (VertexType i = 0; i < original_dag.num_vertices(); ++i) { const VertexType coarse_node = contraction_map_[i]; current_groups[coarse_node].subgraphs.push_back({i}); } - - v_workw_t total_work_weight = sumOfVerticesWorkWeights(initial_coarse_graph); - v_workw_t lock_threshold = static_cast>(lock_orbit_ratio * total_work_weight); - + if constexpr (verbose) { - std::cout << " Starting adaptive symmetry coarsening with lock threshold: " << lock_threshold << ", critical_path_threshold: " << critical_path_threshold_ << "\n"; + std::cout << " Starting adaptive symmetry coarsening with critical_path_threshold: " << critical_path_threshold_ << "\n"; } - while (current_symmetry >= min_symmetry_) { - + for (const auto sym : symmetry_levels_to_test) { + current_symmetry = sym; + const bool is_last_loop = (sym == symmetry_levels_to_test.back()); if constexpr (verbose) { std::cout << " Current symmetry threshold: " << current_symmetry << "\n"; } non_viable_edges_cache_.clear(); - const bool is_last_loop = (current_symmetry / 2) < min_symmetry_; - contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, false, is_last_loop); + contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, false, is_last_loop, lock_threshold_per_type); if (merge_different_node_types_) - contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, merge_different_node_types_, is_last_loop); + contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, merge_different_node_types_, is_last_loop, lock_threshold_per_type); non_viable_crit_path_edges_cache_.clear(); - contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, merge_different_node_types_, is_last_loop, critical_path_threshold_); + contract_edges_adpative_sym(original_dag, current_coarse_graph, current_groups, current_contraction_map, merge_different_node_types_, is_last_loop, lock_threshold_per_type, critical_path_threshold_); - for (const auto& v : current_coarse_graph.vertices()) { - if (current_coarse_graph.vertex_work_weight(v) > lock_threshold) { - if constexpr (verbose) { - std::cout << " Locking orbit " << v << "\n"; - } - locked_orbits.insert(v); - } - } - current_symmetry = current_symmetry / 2; } - + if constexpr (verbose) { std::cout << " Merging small orbits with work threshold: " << work_threshold_ << "\n"; } non_viable_edges_cache_.clear(); merge_small_orbits(original_dag, current_coarse_graph, current_groups, current_contraction_map, work_threshold_); - // --- Finalize --- final_coarse_graph_ = std::move(current_coarse_graph); final_contraction_map_ = std::move(current_contraction_map); final_groups_ = std::move(current_groups); @@ -741,15 +882,14 @@ class OrbitGraphProcessor { } /** - * @brief Checks if merging two groups is viable based on the resulting number of isomorphic subgraphs. - * This is analogous to WavefrontOrbitProcessor::is_viable_continuation. - * If viable, it populates the `out_new_subgraphs` with the structure of the merged group. + * @brief Checks if merging two groups is structurally viable. */ bool is_merge_viable(const Graph_t &original_dag, const Group &group_u, const Group &group_v, - std::vector> &out_new_subgraphs, bool &error) const { + std::vector> &out_new_subgraphs) const { std::vector all_nodes; - all_nodes.reserve(group_u.subgraphs.size() + group_v.subgraphs.size()); + all_nodes.reserve(group_u.subgraphs.size() * (group_u.subgraphs.empty() ? 0 : group_u.subgraphs[0].size()) + + group_v.subgraphs.size() * (group_v.subgraphs.empty() ? 0 : group_v.subgraphs[0].size())); for (const auto &sg : group_u.subgraphs) { all_nodes.insert(all_nodes.end(), sg.begin(), sg.end()); } @@ -771,6 +911,11 @@ class OrbitGraphProcessor { std::vector components; // local -> component_id size_t num_components = compute_weakly_connected_components(induced_subgraph, components); out_new_subgraphs.assign(num_components, std::vector()); + + if (all_nodes.empty()) { // Handle empty graph case + return true; + } + for (const auto &node : all_nodes) { out_new_subgraphs[components[map[node]]].push_back(node); } @@ -782,20 +927,17 @@ class OrbitGraphProcessor { for (size_t i = 1; i < num_components; ++i) { if (out_new_subgraphs[i].size() != first_sg_size) { - error = true; return false; } Constr_Graph_t current_sg; create_induced_subgraph(original_dag, current_sg, out_new_subgraphs[i]); if (!are_isomorphic_by_merkle_hash(rep_sg, current_sg)) { - error = true; return false; } } } - - return num_components >= current_symmetry; + return true; } public: diff --git a/tests/debug_merkle_divider.cpp b/tests/debug_merkle_divider.cpp index 32fa6b97..bf3bd1b5 100644 --- a/tests/debug_merkle_divider.cpp +++ b/tests/debug_merkle_divider.cpp @@ -78,9 +78,9 @@ int main(int argc, char* argv[]) { // Set up architecture - instance.getArchitecture().setProcessorsWithTypes({0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1 , 1, 1, 1, 1, 1, 1, 1, 1 , 1, 1, 1, 1, 1, 1, 1, 1 , 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}); + instance.getArchitecture().set_processors_consequ_types({24,48},{100,100}); instance.setDiagonalCompatibilityMatrix(2); - instance.setSynchronisationCosts(1000); + instance.setSynchronisationCosts(2000); instance.setCommunicationCosts(1); @@ -90,26 +90,24 @@ int main(int argc, char* argv[]) { BspLocking locking; GreedyChildren children; kl_total_lambda_comm_improver kl(42); - kl.setSuperstepRemoveStrengthParameter(2.0); - kl.setTimeQualityParameter(5.0); + kl.setSuperstepRemoveStrengthParameter(1.0); + kl.setTimeQualityParameter(1.0); ComboScheduler growlocal_kl(growlocal, kl); ComboScheduler locking_kl(locking, kl); ComboScheduler children_kl(children, kl); GreedyMetaScheduler scheduler; - // scheduler.addScheduler(growlocal_kl); + //scheduler.addScheduler(growlocal_kl); scheduler.addScheduler(locking_kl); scheduler.addScheduler(children_kl); scheduler.addSerialScheduler(); IsomorphicSubgraphScheduler iso_scheduler(scheduler); - iso_scheduler.set_symmetry(8); - iso_scheduler.setMergeDifferentTypes(true); - iso_scheduler.setWorkThreshold(600); - iso_scheduler.setCriticalPathThreshold(1200); - iso_scheduler.setOrbitLockRatio(0.2); - iso_scheduler.setAllowTrimmedScheduler(true); - //iso_scheduler.enable_use_max_group_size(16); + iso_scheduler.setMergeDifferentTypes(false); + iso_scheduler.setWorkThreshold(100); + iso_scheduler.setCriticalPathThreshold(500); + iso_scheduler.setOrbitLockRatio(0.5); + iso_scheduler.setAllowTrimmedScheduler(false); iso_scheduler.set_plot_dot_graphs(true); // Enable plotting for debug std::cout << "Starting partition computation..." << std::endl; diff --git a/tests/eft_subgraph_scheduler.cpp b/tests/eft_subgraph_scheduler.cpp index 08e1a37c..e8dec670 100644 --- a/tests/eft_subgraph_scheduler.cpp +++ b/tests/eft_subgraph_scheduler.cpp @@ -58,6 +58,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_SimpleChain) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); // 4. Assertions @@ -109,6 +110,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_ForkJoin) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); // 4. Assertions @@ -161,6 +163,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_Deadlock) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); // 4. Assertions @@ -207,6 +210,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_ComplexDAG) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); BOOST_CHECK_CLOSE(schedule.makespan, 105.0, 1e-9); @@ -258,6 +262,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_ResourceContention) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); // 4. Assertions @@ -310,6 +315,7 @@ BOOST_AUTO_TEST_CASE(EftSubgraphScheduler_ProportionalAllocation) // 3. Run Scheduler EftSubgraphScheduler scheduler; + scheduler.setMinWorkPerProcessor(1); SubgraphSchedule schedule = scheduler.run(instance, multiplicities, required_proc_types, max_procs); // 4. Assertions diff --git a/tests/orbit_graph_processor.cpp b/tests/orbit_graph_processor.cpp index 16ba0b7d..240d76f2 100644 --- a/tests/orbit_graph_processor.cpp +++ b/tests/orbit_graph_processor.cpp @@ -284,27 +284,22 @@ BOOST_AUTO_TEST_CASE(OrbitGraphProcessor_BinaryTreeNoMerge) { const auto& final_coarse_graph = processor.get_final_coarse_graph(); - // The chain of 5 coarse nodes will be merged down to 2 nodes. - BOOST_CHECK_EQUAL(final_coarse_graph.num_vertices(), 2); + BOOST_CHECK_EQUAL(final_coarse_graph.num_vertices(), 3); check_partitioning(dag, processor); } BOOST_AUTO_TEST_CASE(OrbitGraphProcessor_ButterflyMerge) { - // A butterfly graph with 3 stages (8 inputs). - // All nodes in a stage are in the same orbit. Coarse graph is a chain: 0->1->2->3 (4 nodes). - // With the new logic, since all groups are below the threshold (or the merge is viable), - // the entire chain of coarse nodes will be merged into a single node. const auto dag = construct_butterfly_dag(3); BOOST_REQUIRE_EQUAL(dag.num_vertices(), (3 + 1) * 8); - OrbitGraphProcessor processor(16); // Threshold is larger than any group size + OrbitGraphProcessor processor(16); processor.setMinSymmetry(16); MerkleHashComputer, true> hasher(dag, dag); processor.discover_isomorphic_groups(dag, hasher); const auto& final_coarse_graph = processor.get_final_coarse_graph(); - BOOST_CHECK_EQUAL(final_coarse_graph.num_vertices(), 1); + BOOST_CHECK_EQUAL(final_coarse_graph.num_vertices(), 4); check_partitioning(dag, processor); } \ No newline at end of file