From 093cf4d337623465edc6e1c4bc3dc3277f0d049d Mon Sep 17 00:00:00 2001 From: papp-pal-andras Date: Wed, 29 Oct 2025 14:12:54 +0100 Subject: [PATCH 1/4] ILP for comm scheduling --- include/osp/bsp/model/BspSchedule.hpp | 22 ++ include/osp/bsp/model/BspScheduleCS.hpp | 22 ++ .../CoptCommScheduleOptimizer.hpp | 355 ++++++++++++++++++ tests/ilp_bsp_scheduler.cpp | 46 ++- 4 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp diff --git a/include/osp/bsp/model/BspSchedule.hpp b/include/osp/bsp/model/BspSchedule.hpp index 299e1716..2a2f8cc1 100644 --- a/include/osp/bsp/model/BspSchedule.hpp +++ b/include/osp/bsp/model/BspSchedule.hpp @@ -834,6 +834,28 @@ class BspSchedule : public IBspSchedule, public IBspScheduleEval comm_phase_empty(number_of_supersteps, true); + for (const auto& node : instance->vertices()) + for (const auto &child : instance->getComputationalDag().children(node)) + if(node_to_processor_assignment[node] != node_to_processor_assignment[child]) + comm_phase_empty[node_to_superstep_assignment[child]-1] = false; + + std::vector new_step_index(number_of_supersteps); + unsigned current_index = 0; + for(unsigned step = 0; step < number_of_supersteps; ++step) + { + new_step_index[step] = current_index; + if(!comm_phase_empty[step]) + current_index++; + } + for (const auto& node : instance->vertices()) + node_to_superstep_assignment[node] = new_step_index[node_to_superstep_assignment[node]]; + + setNumberOfSupersteps(current_index); + } }; } // namespace osp \ No newline at end of file diff --git a/include/osp/bsp/model/BspScheduleCS.hpp b/include/osp/bsp/model/BspScheduleCS.hpp index 81fc2269..66ee5d6e 100644 --- a/include/osp/bsp/model/BspScheduleCS.hpp +++ b/include/osp/bsp/model/BspScheduleCS.hpp @@ -472,6 +472,28 @@ class BspScheduleCS : public BspSchedule { } } } + + virtual void shrinkSchedule() override { + + std::vector comm_phase_empty(this->number_of_supersteps, true); + for (auto const &[key, val] : commSchedule) + comm_phase_empty[val] = false; + + std::vector new_step_index(this->number_of_supersteps); + unsigned current_index = 0; + for(unsigned step = 0; step < this->number_of_supersteps; ++step) + { + new_step_index[step] = current_index; + if(!comm_phase_empty[step]) + current_index++; + } + for (const auto& node : this->instance->vertices()) + this->node_to_superstep_assignment[node] = new_step_index[this->node_to_superstep_assignment[node]]; + for (auto &[key, val] : commSchedule) + val = new_step_index[val]; + + this->setNumberOfSupersteps(current_index); + } }; } // namespace osp \ No newline at end of file diff --git a/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp new file mode 100644 index 00000000..96b206fe --- /dev/null +++ b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp @@ -0,0 +1,355 @@ +#pragma once + +#include +#include + +#include "osp/bsp/model/BspScheduleCS.hpp" +#include "osp/bsp/model/BspScheduleRecomp.hpp" +#include "osp/bsp/scheduler/Scheduler.hpp" + +namespace osp { + +/** + * @class CoptCommScheduleOptimizer + * @brief A class that represents a scheduler using the COPT solver for optimizing the communication schedule part of + * a BSP schedule, with the assignment of vertices to processors and supersteps fixed. + */ + +template +class CoptCommScheduleOptimizer { + + static_assert(is_computational_dag_v, "CoptFullScheduler can only be used with computational DAGs."); + + bool num_supersteps_can_change = true; + + unsigned int timeLimitSeconds = 600; + + protected: + + VarArray superstep_used_var; + VarArray max_comm_superstep_var; + std::vector>> comm_processor_to_processor_superstep_node_var; + + void setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model, bool num_supersteps_can_change); + + void setInitialSolution(BspScheduleCS& schedule, Model &model); + + bool canShrinkResultingSchedule(unsigned number_of_supersteps) const; + + void updateCommSchedule(BspScheduleCS& schedule) const; + + public: + + using KeyTriple = std::tuple, unsigned int, unsigned int>; + + virtual RETURN_STATUS improveSchedule(BspScheduleCS &schedule); + + virtual std::string getScheduleName() const { return "ILPCommunication"; } + + virtual void setTimeLimitSeconds(unsigned int limit) { timeLimitSeconds = limit; } + inline unsigned int getTimeLimitSeconds() const { return timeLimitSeconds; } + virtual void setNumSuperstepsCanChange(bool can_change_) { num_supersteps_can_change = can_change_; } +}; + + +template +RETURN_STATUS CoptCommScheduleOptimizer::improveSchedule(BspScheduleCS& schedule) { + + Envr env; + Model model = env.CreateModel("bsp_schedule_cs"); + + setupVariablesConstraintsObjective(schedule, model, true); + + setInitialSolution(schedule, model); + + model.SetDblParam(COPT_DBLPARAM_TIMELIMIT, timeLimitSeconds); + model.SetIntParam(COPT_INTPARAM_THREADS, 128); + + model.Solve(); + + if (model.GetIntAttr(COPT_INTATTR_HASMIPSOL)) + { + updateCommSchedule(schedule); + if (canShrinkResultingSchedule(schedule.numberOfSupersteps())) + schedule.shrinkSchedule(); + } + + if (model.GetIntAttr(COPT_INTATTR_MIPSTATUS) == COPT_MIPSTATUS_OPTIMAL) { + return RETURN_STATUS::OSP_SUCCESS; + } else if (model.GetIntAttr(COPT_INTATTR_MIPSTATUS) == COPT_MIPSTATUS_INF_OR_UNB) { + return RETURN_STATUS::ERROR; + } else { + if (model.GetIntAttr(COPT_INTATTR_HASMIPSOL)) + return RETURN_STATUS::BEST_FOUND; + else + return RETURN_STATUS::TIMEOUT; + } +} + +template +bool CoptCommScheduleOptimizer::canShrinkResultingSchedule(unsigned number_of_supersteps) const { + + for (unsigned step = 0; step < number_of_supersteps - 1; step++) { + + if (superstep_used_var[static_cast(step)].Get(COPT_DBLINFO_VALUE) <= 0.01) + return true; + } + return false; +} + +template +void CoptCommScheduleOptimizer::updateCommSchedule(BspScheduleCS& schedule) const { + + std::map& cs = schedule.getCommunicationSchedule(); + cs.clear(); + + for (const auto &node : schedule.getInstance().vertices()) { + + for (unsigned int p_from = 0; p_from < schedule.getInstance().numberOfProcessors(); p_from++) { + for (unsigned int p_to = 0; p_to < schedule.getInstance().numberOfProcessors(); p_to++) { + if (p_from != p_to) { + for (unsigned int step = 0; step < schedule.numberOfSupersteps(); step++) { + if (comm_processor_to_processor_superstep_node_var[p_from][p_to][step][static_cast(node)].Get( + COPT_DBLINFO_VALUE) >= .99) { + cs[std::make_tuple(node, p_from, p_to)] = step; + } + } + } + } + } + } +} + +template +void CoptCommScheduleOptimizer::setInitialSolution(BspScheduleCS& schedule, Model &model){ + + const Graph_t& DAG = schedule.getInstance().getComputationalDag(); + const BspArchitecture& arch = schedule.getInstance().getArchitecture(); + const unsigned& num_processors = schedule.getInstance().numberOfProcessors(); + const unsigned& num_supersteps = schedule.numberOfSupersteps(); + const auto &cs = schedule.getCommunicationSchedule(); + + std::vector > first_at(DAG.num_vertices(), std::vector(num_processors, std::numeric_limits::max())); + for (const auto &node : DAG.vertices()) + first_at[node][schedule.assignedProcessor(node)] = schedule.assignedSuperstep(node); + + for (const auto &node : DAG.vertices()) { + + for (unsigned p1 = 0; p1 < num_processors; p1++) { + + for (unsigned p2 = 0; p2 < num_processors; p2++) { + + if(p1 == p2) + continue; + + for (unsigned step = 0; step < num_supersteps; step++) { + + const auto &key = std::make_tuple(node, p1, p2); + if (cs.find(key) != cs.end() && cs.at(key) == step) { + model.SetMipStart(comm_processor_to_processor_superstep_node_var[p1][p2][step][static_cast(node)], 1); + first_at[node][p2] = std::min(first_at[node][p2], step+1); + } else { + model.SetMipStart(comm_processor_to_processor_superstep_node_var[p1][p2][step][static_cast(node)], 0); + } + } + } + } + } + + for (const auto &node : DAG.vertices()) + for (unsigned proc = 0; proc < num_processors; proc++) + for (unsigned step = 0; step < num_supersteps; step++) + { + if(step >= first_at[node][proc]) + model.SetMipStart(comm_processor_to_processor_superstep_node_var[proc][proc][step] + [static_cast(node)], 1); + else + model.SetMipStart(comm_processor_to_processor_superstep_node_var[proc][proc][step] + [static_cast(node)], 0); + } + + if(num_supersteps_can_change) + { + std::vector comm_phase_used(num_supersteps, 0); + for (auto const &[key, val] : cs) + comm_phase_used[val] = 1; + for (unsigned step = 0; step < num_supersteps; step++) + model.SetMipStart(superstep_used_var[static_cast(step)], comm_phase_used[step]); + } + + std::vector>> send(num_supersteps, std::vector>(num_processors, 0)); + std::vector>> rec(num_supersteps, std::vector>(num_processors, 0)); + + for (const auto &[key, val] : cs) { + send[val][std::get<1>(key)] += DAG.vertex_comm_weight(std::get<0>(key)) * arch.sendCosts(std::get<1>(key), std::get<2>(key)); + rec[val][std::get<2>(key)] += DAG.vertex_comm_weight(std::get<0>(key)) * arch.sendCosts(std::get<1>(key), std::get<2>(key)); + } + + for (unsigned step = 0; step < num_supersteps; step++) { + + v_commw_t max_comm = 0; + for (unsigned proc = 0; proc < num_processors; proc++) { + max_comm = std::max(max_comm, send[step][proc]); + max_comm = std::max(max_comm, rec[step][proc]); + } + + model.SetMipStart(max_comm_superstep_var[static_cast(step)], max_comm); + } + + model.LoadMipStart(); + model.SetIntParam(COPT_INTPARAM_MIPSTARTMODE, 2); +} + +template +void CoptCommScheduleOptimizer::setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model, bool num_supersteps_can_change) { + + const unsigned &max_number_supersteps = schedule.numberOfSupersteps(); + const unsigned &num_processors = schedule.getInstance().numberOfProcessors(); + const unsigned num_vertices = static_cast(schedule.getInstance().numberOfVertices()); + + // variables indicating if superstep is used at all + if (num_supersteps_can_change) { + superstep_used_var = model.AddVars(static_cast(max_number_supersteps), COPT_BINARY, "superstep_used"); + } + + max_comm_superstep_var = model.AddVars(static_cast(max_number_supersteps), COPT_INTEGER, "max_comm_superstep"); + + // communicate node from p1 to p2 at superstep + + comm_processor_to_processor_superstep_node_var = std::vector>>(num_processors, + std::vector>(num_processors, std::vector(max_number_supersteps))); + + for (unsigned p1 = 0; p1 < num_processors; p1++) { + + for (unsigned p2 = 0; p2 < num_processors; p2++) { + + for (unsigned step = 0; step < max_number_supersteps; step++) { + + comm_processor_to_processor_superstep_node_var[p1][p2][step] = model.AddVars(static_cast(num_vertices), + COPT_BINARY, "comm_processor_to_processor_superstep_node"); + } + } + } + + if (num_supersteps_can_change) { + unsigned M = num_processors * num_processors * num_vertices; + for (unsigned int step = 0; step < schedule.numberOfSupersteps(); step++) { + + Expr expr; + + for (unsigned p1 = 0; p1 < num_processors; p1++) { + + for (unsigned p2 = 0; p2 < num_processors; p2++) { + + if (p1 != p2) { + for (unsigned node = 0; node < num_vertices; node++) { + + expr += comm_processor_to_processor_superstep_node_var[p1][p2][step][static_cast(node)]; + } + } + } + } + + model.AddConstr(expr <= M * superstep_used_var[static_cast(step)]); + } + } + // precedence constraint: if task is computed then all of its predecessors must have been present + // and vertex is present where it was computed + for (unsigned node = 0; node < num_vertices; node++) { + + const unsigned &processor = schedule.assignedProcessor(node); + const unsigned &superstep = schedule.assignedSuperstep(node); + Expr expr; + unsigned num_com_edges = 0; + for (const auto &pred : schedule.getInstance().getComputationalDag().parents(node)) { + + if (schedule.assignedProcessor(node) != schedule.assignedProcessor(pred)) { + num_com_edges += 1; + expr += comm_processor_to_processor_superstep_node_var[processor][processor][superstep][static_cast(pred)]; + + model.AddConstr( + comm_processor_to_processor_superstep_node_var[schedule.assignedProcessor(pred)][schedule.assignedProcessor(pred)] + [schedule.assignedSuperstep(pred)][static_cast(pred)] == 1); + } + } + + if (num_com_edges > 0) + model.AddConstr(expr >= num_com_edges); + } + + // combines two constraints: node can only be communicated if it is present; and node is present if it was computed + // or communicated + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + for (unsigned int node = 0; node < num_vertices; node++) { + + if (processor == schedule.assignedProcessor(node) && step >= schedule.assignedSuperstep(node)) + continue; + + Expr expr1, expr2; + if (step > 0) { + + for (unsigned int p_from = 0; p_from < num_processors; p_from++) { + expr1 += comm_processor_to_processor_superstep_node_var[p_from][processor][step - 1][static_cast(node)]; + } + } + + for (unsigned int p_to = 0; p_to < num_processors; p_to++) { + expr2 += comm_processor_to_processor_superstep_node_var[processor][p_to][step][static_cast(node)]; + } + + model.AddConstr(num_processors * expr1 >= expr2); + } + } + } + + for (unsigned step = 0; step < max_number_supersteps; step++) { + for (unsigned processor = 0; processor < num_processors; processor++) { + + Expr expr1, expr2; + for (unsigned node = 0; node < num_vertices; node++) { + + for (unsigned p_to = 0; p_to < num_processors; p_to++) { + if (processor != p_to) { + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(node) * + schedule.getInstance().sendCosts(processor, p_to) * + comm_processor_to_processor_superstep_node_var[processor][p_to][step][static_cast(node)]; + } + } + + for (unsigned int p_from = 0; p_from < num_processors; p_from++) { + if (processor != p_from) { + expr2 += schedule.getInstance().getComputationalDag().vertex_comm_weight(node) * + schedule.getInstance().sendCosts(p_from, processor) * + comm_processor_to_processor_superstep_node_var[p_from][processor][step][static_cast(node)]; + } + } + + } + + model.AddConstr(max_comm_superstep_var[static_cast(step)] >= expr1); + model.AddConstr(max_comm_superstep_var[static_cast(step)] >= expr2); + } + } + + /* + Objective function + */ + Expr expr; + + if (num_supersteps_can_change) { + + for (unsigned int step = 0; step < max_number_supersteps; step++) { + expr += schedule.getInstance().communicationCosts() * max_comm_superstep_var[static_cast(step)] + + schedule.getInstance().synchronisationCosts() * superstep_used_var[static_cast(step)]; + } + } else { + + for (unsigned int step = 0; step < max_number_supersteps; step++) { + expr += schedule.getInstance().communicationCosts() * max_comm_superstep_var[static_cast(step)]; + } + } + model.SetObjective(expr - schedule.getInstance().synchronisationCosts(), COPT_MINIMIZE); +} + +} // namespace osp \ No newline at end of file diff --git a/tests/ilp_bsp_scheduler.cpp b/tests/ilp_bsp_scheduler.cpp index 8d5326c1..607abbbc 100644 --- a/tests/ilp_bsp_scheduler.cpp +++ b/tests/ilp_bsp_scheduler.cpp @@ -31,6 +31,7 @@ limitations under the License. #include "osp/bsp/scheduler/IlpSchedulers/CoptFullScheduler.hpp" #include "osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp" +#include "osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp" using namespace osp; @@ -177,4 +178,47 @@ BOOST_AUTO_TEST_CASE(test_full) { BOOST_CHECK_EQUAL(RETURN_STATUS::OSP_SUCCESS, result); BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); -}; \ No newline at end of file +}; + +BOOST_AUTO_TEST_CASE(test_cs) { + + using graph = computational_dag_edge_idx_vector_impl_def_t; + + BspInstance instance; + instance.setNumberOfProcessors(4); + instance.setCommunicationCosts(3); + instance.setSynchronisationCosts(5); + + // Getting root git directory + std::filesystem::path cwd = std::filesystem::current_path(); + std::cout << cwd << std::endl; + while ((!cwd.empty()) && (cwd.filename() != "OneStopParallel")) { + cwd = cwd.parent_path(); + std::cout << cwd << std::endl; + } + + bool status = file_reader::readComputationalDagHyperdagFormatDB( + (cwd / "data/spaa/tiny/instance_pregel.hdag").string(), instance.getComputationalDag()); + + BOOST_CHECK(status); + + BspSchedule schedule(instance); + GreedyBspScheduler greedy; + greedy.computeSchedule(schedule); + BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); + BspScheduleCS schedule_cs(schedule); + BOOST_CHECK(schedule_cs.hasValidCommSchedule()); + + CoptCommScheduleOptimizer scheduler; + scheduler.setTimeLimitSeconds(10); + const auto before = schedule_cs.compute_cs_communication_costs(); + const auto result = scheduler.improveSchedule(schedule_cs); + BOOST_CHECK_EQUAL(RETURN_STATUS::OSP_SUCCESS, result); + const auto after = schedule_cs.compute_cs_communication_costs(); + std::cout< "<= after); +}; + From 607a1c3f032937740f0c0a591671b4a25504c881 Mon Sep 17 00:00:00 2001 From: papp-pal-andras Date: Wed, 29 Oct 2025 16:37:43 +0100 Subject: [PATCH 2/4] license fix --- .../CoptCommScheduleOptimizer.hpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp index 96b206fe..a09776f1 100644 --- a/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp +++ b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp @@ -1,3 +1,21 @@ +/* +Copyright 2024 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner +*/ + #pragma once #include From 5da37987a284d883bcda8d598706b91fa0bd850b Mon Sep 17 00:00:00 2001 From: papp-pal-andras Date: Wed, 12 Nov 2025 23:49:49 +0100 Subject: [PATCH 3/4] partial ILP for BSP scheduling resurrected --- include/osp/bsp/model/BspScheduleCS.hpp | 110 ++- .../IlpSchedulers/CoptPartialScheduler.hpp | 717 ++++++++++++++++++ tests/ilp_bsp_scheduler.cpp | 50 ++ 3 files changed, 871 insertions(+), 6 deletions(-) create mode 100644 include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp diff --git a/include/osp/bsp/model/BspScheduleCS.hpp b/include/osp/bsp/model/BspScheduleCS.hpp index 66ee5d6e..db847871 100644 --- a/include/osp/bsp/model/BspScheduleCS.hpp +++ b/include/osp/bsp/model/BspScheduleCS.hpp @@ -475,24 +475,122 @@ class BspScheduleCS : public BspSchedule { virtual void shrinkSchedule() override { - std::vector comm_phase_empty(this->number_of_supersteps, true); + std::vector comm_phase_latest_dependency(this->number_of_supersteps, 0); + std::vector > first_at = getFirstPresence(); + for (auto const &[key, val] : commSchedule) - comm_phase_empty[val] = false; + if(this->assignedProcessor(std::get<0>(key)) != std::get<1>(key)) + comm_phase_latest_dependency[val] = std::max(comm_phase_latest_dependency[val], first_at[std::get<0>(key)][std::get<1>(key)]); + + + for (const auto &node : BspSchedule::instance->getComputationalDag().vertices()) + for (const auto &child : BspSchedule::instance->getComputationalDag().children(node)) + if(this->assignedProcessor(node) != this->assignedProcessor(child)) + comm_phase_latest_dependency[this->assignedSuperstep(child)] = std::max(comm_phase_latest_dependency[this->assignedSuperstep(child)], first_at[node][this->assignedProcessor(child)]); + + std::vector comm_phase_deleted(this->number_of_supersteps, false); + for(unsigned step = this->number_of_supersteps-1; step < this->number_of_supersteps; --step) + { + unsigned limit = 0; + while(step > limit) + { + limit = std::max(limit, comm_phase_latest_dependency[step]); + if(step > limit) + { + comm_phase_deleted[step] = true; + --step; + } + } + } std::vector new_step_index(this->number_of_supersteps); - unsigned current_index = 0; + unsigned current_index = std::numeric_limits::max(); for(unsigned step = 0; step < this->number_of_supersteps; ++step) { - new_step_index[step] = current_index; - if(!comm_phase_empty[step]) + if(!comm_phase_deleted[step]) current_index++; + + new_step_index[step] = current_index; } for (const auto& node : this->instance->vertices()) this->node_to_superstep_assignment[node] = new_step_index[this->node_to_superstep_assignment[node]]; for (auto &[key, val] : commSchedule) val = new_step_index[val]; - this->setNumberOfSupersteps(current_index); + this->setNumberOfSupersteps(current_index+1); + } + + std::vector > getFirstPresence() const { + + std::vector > first_at(BspSchedule::instance->numberOfVertices(), + std::vector(BspSchedule::instance->numberOfProcessors(), std::numeric_limits::max())); + + for (const auto &node : BspSchedule::instance->getComputationalDag().vertices()) + first_at[node][this->assignedProcessor(node)] = this->assignedSuperstep(node); + + for (auto const &[key, val] : commSchedule) + first_at[std::get<0>(key)][std::get<2>(key)] = + std::min(first_at[std::get<0>(key)][std::get<2>(key)], val + 1); // TODO: replace by staleness after merge + + return first_at; + } + + // remove unneeded comm. schedule entries - these can happen in ILPs, partial ILPs, etc. + void cleanCommSchedule(){ + + // data that is already present before it arrives + std::vector > > arrives_at(BspSchedule::instance->numberOfVertices(), + std::vector >(BspSchedule::instance->numberOfProcessors())); + for (const auto &node : BspSchedule::instance->getComputationalDag().vertices()) + arrives_at[node][this->assignedProcessor(node)].insert(this->assignedSuperstep(node)); + + for (auto const &[key, val] : commSchedule) + arrives_at[std::get<0>(key)][std::get<2>(key)].insert(val); + + std::vector toErase; + for (auto const &[key, val] : commSchedule) + { + auto itr = arrives_at[std::get<0>(key)][std::get<2>(key)].begin(); + if(*itr < val) + toErase.push_back(key); + else if(*itr == val && ++itr != arrives_at[std::get<0>(key)][std::get<2>(key)].end() && *itr == val) + { + toErase.push_back(key); + arrives_at[std::get<0>(key)][std::get<2>(key)].erase(itr); + } + } + + for(const KeyTriple& key : toErase) + commSchedule.erase(key); + + // data that is not used after being sent + std::vector > > used_at(BspSchedule::instance->numberOfVertices(), + std::vector >(BspSchedule::instance->numberOfProcessors())); + for (const auto &node : BspSchedule::instance->getComputationalDag().vertices()) + for (const auto &child : BspSchedule::instance->getComputationalDag().children(node)) + used_at[node][this->assignedProcessor(child)].insert(this->assignedSuperstep(child)); + + for (auto const &[key, val] : commSchedule) + used_at[std::get<0>(key)][std::get<1>(key)].insert(val); + + // (need to visit cs entries in reverse superstep order here) + std::vector > entries(this->number_of_supersteps); + for (auto const &[key, val] : commSchedule) + entries[val].push_back(key); + + toErase.clear(); + for(unsigned step = this->number_of_supersteps-1; step < this->number_of_supersteps; --step) + for(const KeyTriple& key : entries[step]) + if(used_at[std::get<0>(key)][std::get<2>(key)].empty() || + *used_at[std::get<0>(key)][std::get<2>(key)].rbegin() <= step) + { + toErase.push_back(key); + auto itr = used_at[std::get<0>(key)][std::get<1>(key)].find(step); + used_at[std::get<0>(key)][std::get<1>(key)].erase(itr); + } + + for(const KeyTriple& key : toErase) + commSchedule.erase(key); } }; diff --git a/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp b/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp new file mode 100644 index 00000000..a97855d4 --- /dev/null +++ b/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp @@ -0,0 +1,717 @@ +/* +Copyright 2024 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner +*/ + +#pragma once + +#include +#include + +#include "osp/bsp/model/BspScheduleCS.hpp" +#include "osp/bsp/scheduler/Scheduler.hpp" + +namespace osp { + +/** + * @class CoptPartialScheduler + * @brief A class that represents a scheduler using the COPT solver for optimizing a specific segment of + * a BSP schedule, from a starting superstep to and ending superstep. + */ + +template +class CoptPartialScheduler { + + static_assert(is_computational_dag_v, "CoptPartialScheduler can only be used with computational DAGs."); + + using KeyTriple = std::tuple, unsigned int, unsigned int>; + + unsigned int timeLimitSeconds = 600; + + protected: + + unsigned start_superstep = 1, end_superstep = 3; + + std::vector> node_global_ID; + std::unordered_map, vertex_idx_t> node_local_ID; + + std::vector> source_global_ID; + std::unordered_map, vertex_idx_t> source_local_ID; + + std::vector > node_needed_after_on_proc, source_needed_after_on_proc; + std::vector, unsigned, unsigned, unsigned> > fixed_comm_steps; + std::set > source_present_before; + + unsigned max_number_supersteps; + + VarArray superstep_used_var; + VarArray keep_fixed_comm_step; + + std::vector> node_to_processor_superstep_var; + std::vector>> comm_processor_to_processor_superstep_node_var; + std::vector> comm_to_processor_superstep_source_var; + + void setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model); + + void setInitialSolution(const BspScheduleCS& schedule, Model &model); + + void updateSchedule(BspScheduleCS& schedule) const; + + void setupVertexMaps(const BspScheduleCS& schedule); + + public: + + virtual RETURN_STATUS improveSchedule(BspScheduleCS &schedule); + + virtual std::string getScheduleName() const { return "ILPPartial"; } + + virtual void setTimeLimitSeconds(unsigned int limit) { timeLimitSeconds = limit; } + inline unsigned int getTimeLimitSeconds() const { return timeLimitSeconds; } + virtual void setStartAndEndSuperstep(unsigned start_, unsigned end_) { start_superstep = start_; end_superstep = end_; } +}; + +template +RETURN_STATUS CoptPartialScheduler::improveSchedule(BspScheduleCS& schedule) { + + Envr env; + Model model = env.CreateModel("bsp_schedule_partial"); + + setupVertexMaps(schedule); + + setupVariablesConstraintsObjective(schedule, model); + + setInitialSolution(schedule, model); + + model.SetDblParam(COPT_DBLPARAM_TIMELIMIT, timeLimitSeconds); + model.SetIntParam(COPT_INTPARAM_THREADS, 128); + + model.Solve(); + + if (model.GetIntAttr(COPT_INTATTR_HASMIPSOL)) + updateSchedule(schedule); + + if (model.GetIntAttr(COPT_INTATTR_MIPSTATUS) == COPT_MIPSTATUS_OPTIMAL) { + return RETURN_STATUS::OSP_SUCCESS; + } else if (model.GetIntAttr(COPT_INTATTR_MIPSTATUS) == COPT_MIPSTATUS_INF_OR_UNB) { + return RETURN_STATUS::ERROR; + } else { + if (model.GetIntAttr(COPT_INTATTR_HASMIPSOL)) + return RETURN_STATUS::BEST_FOUND; + else + return RETURN_STATUS::TIMEOUT; + } +} + +template +void CoptPartialScheduler::setInitialSolution(const BspScheduleCS& schedule, Model &model){ + + const Graph_t& DAG = schedule.getInstance().getComputationalDag(); + const unsigned& num_processors = schedule.getInstance().numberOfProcessors(); + const auto &cs = schedule.getCommunicationSchedule(); + + for (const vertex_idx_t &node : DAG.vertices()) + { + if(node_local_ID.find(node) == node_local_ID.end()) + continue; + for (unsigned proc = 0; proc < num_processors; proc++) + for(unsigned step = 0; step < max_number_supersteps; ++step) + { + if (schedule.assignedProcessor(node) == proc && schedule.assignedSuperstep(node) == start_superstep + step) + model.SetMipStart(node_to_processor_superstep_var[node_local_ID[node]][proc][static_cast(step)], 1); + else + model.SetMipStart(node_to_processor_superstep_var[node_local_ID[node]][proc][static_cast(step)], 0); + } + } + + for (unsigned index = 0; index < fixed_comm_steps.size(); ++index) + model.SetMipStart(keep_fixed_comm_step[static_cast(index)], 1); + + for (const auto &node : DAG.vertices()) { + + if(node_local_ID.find(node) == node_local_ID.end()) + continue; + + for (unsigned p1 = 0; p1 < num_processors; p1++) { + + for (unsigned p2 = 0; p2 < num_processors; p2++) { + + if(p1 == p2) + continue; + + for (unsigned step = 0; step < max_number_supersteps && step <= end_superstep - start_superstep; step++) { + + const auto &key = std::make_tuple(node, p1, p2); + if (cs.find(key) != cs.end() && cs.at(key) == start_superstep + step) + model.SetMipStart(comm_processor_to_processor_superstep_node_var[p1][p2][step][static_cast(node_local_ID[node])], 1); + else + model.SetMipStart(comm_processor_to_processor_superstep_node_var[p1][p2][step][static_cast(node_local_ID[node])], 0); + } + } + } + } + + for (const auto &source : DAG.vertices()) { + + if(source_local_ID.find(source) == source_local_ID.end()) + continue; + + for (unsigned proc = 0; proc < num_processors; proc++) + { + if(proc == schedule.assignedProcessor(source)) + continue; + + for (unsigned step = 0; step < max_number_supersteps + 1 && step <= end_superstep - start_superstep + 1; step++) { + + const auto &key = std::make_tuple(source, schedule.assignedProcessor(source), proc); + if (cs.find(key) != cs.end() && cs.at(key) == start_superstep + step - 1) + model.SetMipStart(comm_to_processor_superstep_source_var[proc][step][static_cast(source_local_ID[source])], 1); + else if(step > 0) + model.SetMipStart(comm_to_processor_superstep_source_var[proc][step][static_cast(source_local_ID[source])], 0); + } + } + } + + model.LoadMipStart(); + model.SetIntParam(COPT_INTPARAM_MIPSTARTMODE, 2); +} + +template +void CoptPartialScheduler::updateSchedule(BspScheduleCS& schedule) const { + + unsigned number_of_supersteps = 0; + + while (number_of_supersteps < max_number_supersteps && + superstep_used_var[static_cast(number_of_supersteps)].Get(COPT_DBLINFO_VALUE) >= .99) { + number_of_supersteps++; + } + + const int offset = static_cast(number_of_supersteps) - static_cast(end_superstep - start_superstep + 1); + + for (vertex_idx_t node = 0; node < schedule.getInstance().numberOfVertices(); node++) + if(schedule.assignedSuperstep(node) > end_superstep) + schedule.setAssignedSuperstep(node, static_cast(static_cast(schedule.assignedSuperstep(node)) + offset)); + + for (vertex_idx_t node = 0; node < schedule.getInstance().numberOfVertices(); node++) { + + if(node_local_ID.find(node) == node_local_ID.end()) + continue; + + for (unsigned processor = 0; processor < schedule.getInstance().numberOfProcessors(); processor++) { + + for (unsigned step = 0; step < max_number_supersteps; step++) { + + if (node_to_processor_superstep_var[node_local_ID.at(node)][processor][static_cast(step)].Get(COPT_DBLINFO_VALUE) >= .99) + { + schedule.setAssignedSuperstep(node, start_superstep + step); + schedule.setAssignedProcessor(node, processor); + } + } + } + } + + std::map& commSchedule = schedule.getCommunicationSchedule(); + + std::vector toErase; + for (const auto &[key, val] : schedule.getCommunicationSchedule()) + { + if (val > end_superstep) + commSchedule[key] = static_cast(static_cast(val) + offset); + else if (static_cast(val) >= static_cast(start_superstep) - 1) + toErase.push_back(key); + } + for(const KeyTriple& key : toErase) + commSchedule.erase(key); + + for (unsigned index = 0; index < fixed_comm_steps.size(); ++index) + { + const auto& entry = fixed_comm_steps[index]; + if (keep_fixed_comm_step[static_cast(index)].Get(COPT_DBLINFO_VALUE) >= .99 && + std::get<3>(entry) < start_superstep + number_of_supersteps) + commSchedule[std::make_tuple(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry))] = std::get<3>(entry); + else + commSchedule[std::make_tuple(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry))] = start_superstep-1; + } + + for (vertex_idx_t node = 0; node < node_global_ID.size(); node++) { + + for (unsigned int p_from = 0; p_from < schedule.getInstance().numberOfProcessors(); p_from++) { + for (unsigned int p_to = 0; p_to < schedule.getInstance().numberOfProcessors(); p_to++) { + if (p_from != p_to) { + for (unsigned int step = 0; step < max_number_supersteps; step++) { + if (comm_processor_to_processor_superstep_node_var[p_from][p_to][step][static_cast(node)].Get(COPT_DBLINFO_VALUE) >= .99) { + commSchedule[std::make_tuple(node_global_ID[node], p_from, p_to)] = start_superstep + step; + break; + } + } + } + } + } + } + + for (vertex_idx_t source = 0; source < source_global_ID.size(); source++) { + + for (unsigned int p_to = 0; p_to < schedule.getInstance().numberOfProcessors(); p_to++) { + if (source_present_before.find(std::make_pair(source, p_to)) == source_present_before.end()) { + for (unsigned int step = 0; step < max_number_supersteps + 1; step++) { + if (comm_to_processor_superstep_source_var[p_to][step][static_cast(source)].Get(COPT_DBLINFO_VALUE) >= .99) { + commSchedule[std::make_tuple(source_global_ID[source], schedule.assignedProcessor(source_global_ID[source]), p_to)] = + start_superstep - 1 + step; + break; + } + } + } + } + } + + schedule.cleanCommSchedule(); + schedule.shrinkSchedule(); + +}; + + +template +void CoptPartialScheduler::setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model) { + + const vertex_idx_t num_vertices = static_cast>(node_global_ID.size()); + const vertex_idx_t num_sources = static_cast>(source_global_ID.size()); + const unsigned num_processors = schedule.getInstance().numberOfProcessors(); + + /* + Variables + */ + // variables indicating if superstep is used at all + superstep_used_var = model.AddVars(static_cast(max_number_supersteps), COPT_BINARY, "superstep_used"); + + // variables for assigments of nodes to processor and superstep + node_to_processor_superstep_var = std::vector>(num_vertices, std::vector(num_processors)); + + for (unsigned int node = 0; node < num_vertices; node++) { + + for (unsigned int processor = 0; processor < num_processors; processor++) { + + node_to_processor_superstep_var[node][processor] = + model.AddVars(static_cast(max_number_supersteps), COPT_BINARY, "node_to_processor_superstep"); + } + } + + // communicate node from p1 to p2 at superstep + + comm_processor_to_processor_superstep_node_var = std::vector>>(num_processors, + std::vector>(num_processors, std::vector(max_number_supersteps))); + + for (unsigned int p1 = 0; p1 < num_processors; p1++) { + for (unsigned int p2 = 0; p2 < num_processors; p2++) { + for (unsigned int step = 0; step < max_number_supersteps; step++) { + + comm_processor_to_processor_superstep_node_var[p1][p2][step] = + model.AddVars(static_cast(num_vertices), COPT_BINARY, "comm_processor_to_processor_superstep_node"); + } + } + } + + // communicate nodes in supersteps smaller than start_superstep + comm_to_processor_superstep_source_var = std::vector>(num_processors, std::vector(max_number_supersteps + 1)); + std::vector> present_on_processor_superstep_source_var = std::vector>(num_processors, std::vector(max_number_supersteps)); + + for (unsigned int proc = 0; proc < num_processors; proc++) { + for (unsigned int step = 0; step < max_number_supersteps + 1; step++) { + + comm_to_processor_superstep_source_var[proc][step] = + model.AddVars(static_cast(num_sources), COPT_BINARY, "comm_to_processor_superstep_source"); + + if(step < max_number_supersteps) + present_on_processor_superstep_source_var[proc][step] = + model.AddVars(static_cast(num_sources), COPT_BINARY, "present_on_processor_superstep_source"); + } + } + + VarArray max_comm_superstep_var = model.AddVars(static_cast(max_number_supersteps + 1), COPT_INTEGER, "max_comm_superstep"); + + VarArray max_work_superstep_var = model.AddVars(static_cast(max_number_supersteps), COPT_INTEGER, "max_work_superstep"); + + keep_fixed_comm_step = model.AddVars(static_cast(fixed_comm_steps.size()), COPT_BINARY, "keep_fixed_comm_step"); + + /* + Constraints + */ + + // use consecutive supersteps starting from 0 + model.AddConstr(superstep_used_var[0] == 1); + + for (unsigned int step = 0; step < max_number_supersteps - 1; step++) { + model.AddConstr(superstep_used_var[static_cast(step)] >= superstep_used_var[static_cast(step + 1)]); + } + + // superstep is used at all + unsigned large_constant = static_cast(num_vertices+num_sources) * num_processors * num_processors * 2; + for (unsigned int step = 0; step < max_number_supersteps; step++) { + + Expr expr; + for (vertex_idx_t node = 0; node < num_vertices; node++) { + + for (unsigned int processor = 0; processor < num_processors; processor++) { + expr += node_to_processor_superstep_var[node][processor][static_cast(step)]; + + for (unsigned int p_other = 0; p_other < num_processors; p_other++) + if(processor != p_other) + expr += comm_processor_to_processor_superstep_node_var[processor][p_other][step][static_cast(node)]; + } + } + for (vertex_idx_t source = 0; source < num_sources; source++) + for (unsigned int processor = 0; processor < num_processors; processor++) + if(source_present_before.find(std::make_pair(source, processor)) == source_present_before.end()) + expr += comm_to_processor_superstep_source_var[processor][step+1][static_cast(source)]; + + model.AddConstr(expr <= large_constant * superstep_used_var[static_cast(step)]); + } + + // nodes are assigend + for (vertex_idx_t node = 0; node < num_vertices; node++) { + + Expr expr; + for (unsigned int processor = 0; processor < num_processors; processor++) { + + for (unsigned int step = 0; step < max_number_supersteps; step++) { + expr += node_to_processor_superstep_var[node][processor][static_cast(step)]; + } + } + + model.AddConstr(expr == 1); + } + + // precedence constraint: if task is computed then all of its predecessors must have been present + for (vertex_idx_t node = 0; node < num_vertices; node++) { + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + + Expr expr; + unsigned num_terms = 0; + for (const auto &pred : schedule.getInstance().getComputationalDag().parents(node_global_ID[node])) + { + if(node_local_ID.find(pred) != node_local_ID.end()) + { + ++num_terms; + expr += comm_processor_to_processor_superstep_node_var[processor][processor][step][static_cast(node_local_ID[pred])]; + } + else if(source_local_ID.find(pred) != source_local_ID.end() && + source_present_before.find(std::make_pair(source_local_ID[pred], processor)) == source_present_before.end()) + { + ++num_terms; + expr += present_on_processor_superstep_source_var[processor][step][static_cast(source_local_ID[pred])]; + } + } + + if(num_terms > 0) + model.AddConstr(expr >= num_terms * node_to_processor_superstep_var[node][processor][static_cast(step)]); + } + } + } + + // combines two constraints: node can only be communicated if it is present; and node is present if it was computed + // or communicated + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + for (vertex_idx_t node = 0; node < num_vertices; node++) { + + Expr expr1, expr2; + if (step > 0) { + + for (unsigned int p_from = 0; p_from < num_processors; p_from++) { + expr1 += comm_processor_to_processor_superstep_node_var[p_from][processor][step - 1][static_cast(node)]; + } + } + + expr1 += node_to_processor_superstep_var[node][processor][static_cast(step)]; + + for (unsigned int p_to = 0; p_to < num_processors; p_to++) { + expr2 += comm_processor_to_processor_superstep_node_var[processor][p_to][step][static_cast(node)]; + } + + model.AddConstr(num_processors * (expr1) >= expr2); + } + } + } + + // combines two constraints: node can only be communicated if it is present; and node is present if it was computed + // or communicated + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + for (vertex_idx_t source_node = 0; source_node < num_sources; source_node++) { + + if(source_present_before.find(std::make_pair(source_node, processor)) != source_present_before.end()) + continue; + + Expr expr1 = comm_to_processor_superstep_source_var[processor][step][static_cast(source_node)]; + if (step > 0) + expr1 += present_on_processor_superstep_source_var[processor][step-1][static_cast(source_node)]; + + Expr expr2 = present_on_processor_superstep_source_var[processor][step][static_cast(source_node)]; + + model.AddConstr(expr1 >= expr2); + } + } + } + + // boundary conditions at the end + for(const std::pair, unsigned>& node_and_proc : node_needed_after_on_proc) + { + Expr expr; + for (unsigned int p_from = 0; p_from < num_processors; p_from++) + expr += comm_processor_to_processor_superstep_node_var[p_from][node_and_proc.second][max_number_supersteps - 1][static_cast(node_and_proc.first)]; + + model.AddConstr(expr >= 1); + } + + for(const std::pair, unsigned>& source_and_proc : source_needed_after_on_proc) + { + Expr expr = present_on_processor_superstep_source_var[source_and_proc.second][max_number_supersteps - 1][static_cast(source_and_proc.first)]; + expr += comm_to_processor_superstep_source_var[source_and_proc.second][max_number_supersteps][static_cast(source_and_proc.first)]; + model.AddConstr(expr >= 1); + } + + // cost calculation - work + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + + Expr expr; + for (unsigned int node = 0; node < num_vertices; node++) { + expr += schedule.getInstance().getComputationalDag().vertex_work_weight(node_global_ID[node]) * + node_to_processor_superstep_var[node][processor][static_cast(step)]; + } + + model.AddConstr(max_work_superstep_var[static_cast(step)] >= expr); + } + } + + // cost calculation - comm + for (unsigned int step = 0; step < max_number_supersteps; step++) { + for (unsigned int processor = 0; processor < num_processors; processor++) { + + Expr expr1, expr2; + for (vertex_idx_t node = 0; node < num_vertices; node++) { + for (unsigned int p_other = 0; p_other < num_processors; p_other++) { + if (processor != p_other) { + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(node_global_ID[node]) * + schedule.getInstance().sendCosts(processor, p_other) * + comm_processor_to_processor_superstep_node_var[processor][p_other][step][static_cast(node)]; + expr2 += schedule.getInstance().getComputationalDag().vertex_comm_weight(node_global_ID[node]) * + schedule.getInstance().sendCosts(p_other, processor) * + comm_processor_to_processor_superstep_node_var[p_other][processor][step][static_cast(node)]; + } + } + } + + for (vertex_idx_t source = 0; source < num_sources; source++) { + const unsigned origin_proc = schedule.assignedProcessor(source_global_ID[source]); + if(origin_proc == processor) + { + for (unsigned int p_other = 0; p_other < num_processors; p_other++) + { + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(source_global_ID[source]) * + schedule.getInstance().sendCosts(processor, p_other) * + comm_to_processor_superstep_source_var[p_other][step + 1][static_cast(source)]; + } + } + expr2 += + schedule.getInstance().getComputationalDag().vertex_comm_weight(source_global_ID[source]) * + schedule.getInstance().sendCosts(origin_proc, processor) * + comm_to_processor_superstep_source_var[processor][step + 1][static_cast(source)]; + } + + for (unsigned index = 0; index < fixed_comm_steps.size(); ++index) + { + const auto& entry = fixed_comm_steps[index]; + if(std::get<3>(entry) != start_superstep + step) + continue; + if(std::get<1>(entry) == processor) + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(std::get<0>(entry)) * + schedule.getInstance().sendCosts(processor, std::get<2>(entry)) * + keep_fixed_comm_step[static_cast(index)]; + if(std::get<2>(entry) == processor) + expr2 += schedule.getInstance().getComputationalDag().vertex_comm_weight(std::get<0>(entry)) * + schedule.getInstance().sendCosts(std::get<1>(entry), processor) * + keep_fixed_comm_step[static_cast(index)]; + } + + model.AddConstr(max_comm_superstep_var[static_cast(step + 1)] >= expr1); + model.AddConstr(max_comm_superstep_var[static_cast(step + 1)] >= expr2); + } + } + + // cost calculation - first comm phase handled separately + for (unsigned int processor = 0; processor < num_processors; processor++) { + + Expr expr1, expr2; + for (vertex_idx_t source = 0; source < num_sources; source++) { + const unsigned origin_proc = schedule.assignedProcessor(source_global_ID[source]); + if(origin_proc == processor) + { + for (unsigned int p_other = 0; p_other < num_processors; p_other++) + { + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(source_global_ID[source]) * + schedule.getInstance().sendCosts(processor, p_other) * + comm_to_processor_superstep_source_var[p_other][0][static_cast(source)]; + } + } + expr2 += + schedule.getInstance().getComputationalDag().vertex_comm_weight(source_global_ID[source]) * + schedule.getInstance().sendCosts(origin_proc, processor) * + comm_to_processor_superstep_source_var[processor][0][static_cast(source)]; + } + + for (unsigned index = 0; index < fixed_comm_steps.size(); ++index) + { + const auto& entry = fixed_comm_steps[index]; + if(std::get<1>(entry) == processor) + expr1 += schedule.getInstance().getComputationalDag().vertex_comm_weight(std::get<0>(entry)) * + schedule.getInstance().sendCosts(processor, std::get<2>(entry)) * + (1-keep_fixed_comm_step[static_cast(index)]); + if(std::get<2>(entry) == processor) + expr2 += schedule.getInstance().getComputationalDag().vertex_comm_weight(std::get<0>(entry)) * + schedule.getInstance().sendCosts(std::get<1>(entry), processor) * + (1-keep_fixed_comm_step[static_cast(index)]); + } + + model.AddConstr(max_comm_superstep_var[0] >= expr1); + model.AddConstr(max_comm_superstep_var[0] >= expr2); + } + + /* + Objective function + */ + Expr expr; + + for (unsigned int step = 0; step < max_number_supersteps; step++) { + expr += max_work_superstep_var[static_cast(step)] + schedule.getInstance().communicationCosts() * max_comm_superstep_var[static_cast(step + 1)] + + schedule.getInstance().synchronisationCosts() * superstep_used_var[static_cast(step)]; + } + + expr += schedule.getInstance().communicationCosts() * max_comm_superstep_var[0]; + + model.SetObjective(expr, COPT_MINIMIZE); +}; + +template +void CoptPartialScheduler::setupVertexMaps(const BspScheduleCS& schedule) { + + node_local_ID.clear(); + node_global_ID.clear(); + source_local_ID.clear(); + source_global_ID.clear(); + + node_needed_after_on_proc.clear(); + source_needed_after_on_proc.clear(); + fixed_comm_steps.clear(); + source_present_before.clear(); + + std::vector > first_at = schedule.getFirstPresence(); + + max_number_supersteps = end_superstep - start_superstep + 3; + + for (unsigned node = 0; node < schedule.getInstance().numberOfVertices(); node++) { + + if (schedule.assignedSuperstep(node) >= start_superstep && schedule.assignedSuperstep(node) <= end_superstep) { + + node_local_ID[node] = static_cast>(node_global_ID.size()); + node_global_ID.push_back(node); + + for (const auto &pred : schedule.getInstance().getComputationalDag().parents(node)) { + + if (schedule.assignedSuperstep(pred) < start_superstep) { + + if (source_local_ID.find(pred) == source_local_ID.end()) { + source_local_ID[pred] = static_cast>(source_global_ID.size()); + source_global_ID.push_back(pred); + } + + } else if (schedule.assignedSuperstep(pred) > end_superstep) { + + throw std::invalid_argument("Initial Schedule might be invalid?!"); + } + } + } + } + + // find where the sources are already present before the segment + for(const auto& source_and_ID : source_local_ID) + { + vertex_idx_t source = source_and_ID.first; + for(unsigned proc = 0; proc < schedule.getInstance().numberOfProcessors(); ++proc) + if(first_at[source][proc] < start_superstep) + source_present_before.emplace(std::make_pair(source_and_ID.second, proc)); + } + + // collect values that are needed by the end of the segment + for(const auto& source_and_ID : source_local_ID) + { + vertex_idx_t source = source_and_ID.first; + + std::set procs_needing_this; + for (const auto &succ : schedule.getInstance().getComputationalDag().children(source)) + if(schedule.assignedProcessor(succ) != schedule.assignedProcessor(source) && + schedule.assignedSuperstep(succ) > end_superstep) + procs_needing_this.insert(schedule.assignedProcessor(succ)); + + for(unsigned proc1 = 0; proc1 < schedule.getInstance().numberOfProcessors(); ++proc1) + for(unsigned proc2 = 0; proc2 < schedule.getInstance().numberOfProcessors(); ++proc2) + { + if(proc1 == proc2) + continue; + auto itr = schedule.getCommunicationSchedule().find(std::make_tuple(source, proc1, proc2)); + if (itr != schedule.getCommunicationSchedule().end() && itr->second > end_superstep) + procs_needing_this.insert(schedule.assignedProcessor(proc1)); + } + + for(unsigned proc : procs_needing_this) + if(first_at[source][proc] >= start_superstep && first_at[source][proc] <= end_superstep + 1) + source_needed_after_on_proc.emplace_back(source_and_ID.second, proc); + } + for(const auto& node_and_ID : node_local_ID) + { + vertex_idx_t node = node_and_ID.first; + + std::set procs_needing_this; + for (const auto &succ : schedule.getInstance().getComputationalDag().children(node)) + if(schedule.assignedSuperstep(succ) > end_superstep) + procs_needing_this.insert(schedule.assignedProcessor(succ)); + + for(unsigned proc1 = 0; proc1 < schedule.getInstance().numberOfProcessors(); ++proc1) + for(unsigned proc2 = 0; proc2 < schedule.getInstance().numberOfProcessors(); ++proc2) + { + auto itr = schedule.getCommunicationSchedule().find(std::make_tuple(node, proc1, proc2)); + if (itr != schedule.getCommunicationSchedule().end() && proc1 != proc2 && itr->second > end_superstep) + procs_needing_this.insert(schedule.assignedProcessor(proc1)); + } + + for(unsigned proc : procs_needing_this) + if(first_at[node][proc] <= end_superstep + 1) + node_needed_after_on_proc.emplace_back(node_and_ID.second, proc); + } + + + // comm steps that just happen to be in this interval, but not connected to the nodes within + for (const auto &[key, val] : schedule.getCommunicationSchedule()) + { + vertex_idx_t source = std::get<0>(key); + if(source_local_ID.find(source) == source_local_ID.end() && + schedule.assignedSuperstep(source) < start_superstep && + val >= start_superstep - 1 && val <= end_superstep) + fixed_comm_steps.emplace_back(std::get<0>(key), std::get<1>(key), std::get<2>(key), val); + } + +}; + +} // namespace osp \ No newline at end of file diff --git a/tests/ilp_bsp_scheduler.cpp b/tests/ilp_bsp_scheduler.cpp index 607abbbc..e5f58f74 100644 --- a/tests/ilp_bsp_scheduler.cpp +++ b/tests/ilp_bsp_scheduler.cpp @@ -32,6 +32,7 @@ limitations under the License. #include "osp/bsp/scheduler/IlpSchedulers/CoptFullScheduler.hpp" #include "osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp" #include "osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp" +#include "osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp" using namespace osp; @@ -222,3 +223,52 @@ BOOST_AUTO_TEST_CASE(test_cs) { BOOST_CHECK(before >= after); }; +BOOST_AUTO_TEST_CASE(test_partial) { + + using graph = computational_dag_edge_idx_vector_impl_def_t; + + BspInstance instance; + instance.setNumberOfProcessors(3); + instance.setCommunicationCosts(3); + instance.setSynchronisationCosts(5); + + // Getting root git directory + std::filesystem::path cwd = std::filesystem::current_path(); + std::cout << cwd << std::endl; + while ((!cwd.empty()) && (cwd.filename() != "OneStopParallel")) { + cwd = cwd.parent_path(); + std::cout << cwd << std::endl; + } + + bool status = file_reader::readComputationalDagHyperdagFormatDB( + (cwd / "data/spaa/tiny/instance_pregel.hdag").string(), instance.getComputationalDag()); + + BOOST_CHECK(status); + + BspSchedule schedule_init(instance); + GreedyBspScheduler greedy; + greedy.computeSchedule(schedule_init); + BOOST_CHECK(schedule_init.satisfiesPrecedenceConstraints()); + BspScheduleCS schedule(schedule_init); + BOOST_CHECK(schedule.hasValidCommSchedule()); + + CoptPartialScheduler scheduler; + scheduler.setTimeLimitSeconds(10); + scheduler.setStartAndEndSuperstep(0, 2); + auto cost_before = schedule.computeCosts(); + auto result = scheduler.improveSchedule(schedule); + BOOST_CHECK(result == RETURN_STATUS::OSP_SUCCESS || result == RETURN_STATUS::BEST_FOUND); + BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); + BOOST_CHECK(schedule.hasValidCommSchedule()); + auto cost_mid = schedule.computeCosts(); + BOOST_CHECK(cost_mid <= cost_before); + scheduler.setStartAndEndSuperstep(2, 5); + result = scheduler.improveSchedule(schedule); + BOOST_CHECK(result == RETURN_STATUS::OSP_SUCCESS || result == RETURN_STATUS::BEST_FOUND); + BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); + BOOST_CHECK(schedule.hasValidCommSchedule()); + auto cost_after = schedule.computeCosts(); + BOOST_CHECK(cost_after <= cost_mid); + +}; + From 3d65e95ae20a63c6289b2dc93f96d7de1386e261 Mon Sep 17 00:00:00 2001 From: ppapp Date: Wed, 19 Nov 2025 14:56:05 +0100 Subject: [PATCH 4/4] renaming and merge fixes --- include/osp/bsp/model/BspSchedule.hpp | 2 +- include/osp/bsp/model/BspScheduleCS.hpp | 3 ++- .../IlpSchedulers/CoptCommScheduleOptimizer.hpp | 9 +++++---- .../IlpSchedulers/CoptPartialScheduler.hpp | 8 +++++--- .../TotalCommunicationScheduler.hpp | 16 ++++++++-------- .../LocalSearch/KernighanLin/kl_base.hpp | 6 +++--- .../pebblers/pebblingILP/PebblingPartialILP.hpp | 9 +-------- .../pebblingILP/partialILP/AcyclicDagDivider.hpp | 2 -- .../partialILP/SubproblemMultiScheduling.hpp | 2 +- 9 files changed, 26 insertions(+), 31 deletions(-) diff --git a/include/osp/bsp/model/BspSchedule.hpp b/include/osp/bsp/model/BspSchedule.hpp index 2dd85a28..b05b9693 100644 --- a/include/osp/bsp/model/BspSchedule.hpp +++ b/include/osp/bsp/model/BspSchedule.hpp @@ -830,7 +830,7 @@ class BspSchedule : public IBspSchedule, public IBspScheduleEval comm_phase_empty(number_of_supersteps, true); for (const auto& node : instance->vertices()) diff --git a/include/osp/bsp/model/BspScheduleCS.hpp b/include/osp/bsp/model/BspScheduleCS.hpp index eac3ff9f..9aebdee9 100644 --- a/include/osp/bsp/model/BspScheduleCS.hpp +++ b/include/osp/bsp/model/BspScheduleCS.hpp @@ -468,7 +468,7 @@ class BspScheduleCS : public BspSchedule { } } - virtual void shrinkSchedule() override { + virtual void shrinkByMergingSupersteps() override { std::vector comm_phase_latest_dependency(this->number_of_supersteps, 0); std::vector > first_at = getFirstPresence(); @@ -515,6 +515,7 @@ class BspScheduleCS : public BspSchedule { this->setNumberOfSupersteps(current_index+1); } + // for each vertex v and processor p, find the first superstep where v is present on p by the end of the compute phase std::vector > getFirstPresence() const { std::vector > first_at(BspSchedule::instance->numberOfVertices(), diff --git a/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp index a09776f1..f3a66b70 100644 --- a/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp +++ b/include/osp/bsp/scheduler/IlpSchedulers/CoptCommScheduleOptimizer.hpp @@ -48,7 +48,7 @@ class CoptCommScheduleOptimizer { VarArray max_comm_superstep_var; std::vector>> comm_processor_to_processor_superstep_node_var; - void setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model, bool num_supersteps_can_change); + void setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model); void setInitialSolution(BspScheduleCS& schedule, Model &model); @@ -59,6 +59,7 @@ class CoptCommScheduleOptimizer { public: using KeyTriple = std::tuple, unsigned int, unsigned int>; + virtual ~CoptCommScheduleOptimizer() = default; virtual RETURN_STATUS improveSchedule(BspScheduleCS &schedule); @@ -76,7 +77,7 @@ RETURN_STATUS CoptCommScheduleOptimizer::improveSchedule(BspScheduleCS< Envr env; Model model = env.CreateModel("bsp_schedule_cs"); - setupVariablesConstraintsObjective(schedule, model, true); + setupVariablesConstraintsObjective(schedule, model); setInitialSolution(schedule, model); @@ -89,7 +90,7 @@ RETURN_STATUS CoptCommScheduleOptimizer::improveSchedule(BspScheduleCS< { updateCommSchedule(schedule); if (canShrinkResultingSchedule(schedule.numberOfSupersteps())) - schedule.shrinkSchedule(); + schedule.shrinkByMergingSupersteps(); } if (model.GetIntAttr(COPT_INTATTR_MIPSTATUS) == COPT_MIPSTATUS_OPTIMAL) { @@ -219,7 +220,7 @@ void CoptCommScheduleOptimizer::setInitialSolution(BspScheduleCS -void CoptCommScheduleOptimizer::setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model, bool num_supersteps_can_change) { +void CoptCommScheduleOptimizer::setupVariablesConstraintsObjective(const BspScheduleCS& schedule, Model& model) { const unsigned &max_number_supersteps = schedule.numberOfSupersteps(); const unsigned &num_processors = schedule.getInstance().numberOfProcessors(); diff --git a/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp b/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp index a97855d4..10fa2243 100644 --- a/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp +++ b/include/osp/bsp/scheduler/IlpSchedulers/CoptPartialScheduler.hpp @@ -81,6 +81,8 @@ class CoptPartialScheduler { virtual void setTimeLimitSeconds(unsigned int limit) { timeLimitSeconds = limit; } inline unsigned int getTimeLimitSeconds() const { return timeLimitSeconds; } virtual void setStartAndEndSuperstep(unsigned start_, unsigned end_) { start_superstep = start_; end_superstep = end_; } + + virtual ~CoptPartialScheduler() = default; }; template @@ -277,7 +279,7 @@ void CoptPartialScheduler::updateSchedule(BspScheduleCS& sched } schedule.cleanCommSchedule(); - schedule.shrinkSchedule(); + schedule.shrinkByMergingSupersteps(); }; @@ -466,7 +468,7 @@ void CoptPartialScheduler::setupVariablesConstraintsObjective(const Bsp } // boundary conditions at the end - for(const std::pair, unsigned>& node_and_proc : node_needed_after_on_proc) + for(const std::pair, unsigned> node_and_proc : node_needed_after_on_proc) { Expr expr; for (unsigned int p_from = 0; p_from < num_processors; p_from++) @@ -475,7 +477,7 @@ void CoptPartialScheduler::setupVariablesConstraintsObjective(const Bsp model.AddConstr(expr >= 1); } - for(const std::pair, unsigned>& source_and_proc : source_needed_after_on_proc) + for(const std::pair, unsigned> source_and_proc : source_needed_after_on_proc) { Expr expr = present_on_processor_superstep_source_var[source_and_proc.second][max_number_supersteps - 1][static_cast(source_and_proc.first)]; expr += comm_to_processor_superstep_source_var[source_and_proc.second][max_number_supersteps][static_cast(source_and_proc.first)]; diff --git a/include/osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp b/include/osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp index 4f588da2..d152d259 100644 --- a/include/osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp +++ b/include/osp/bsp/scheduler/IlpSchedulers/TotalCommunicationScheduler.hpp @@ -95,7 +95,7 @@ class TotalCommunicationScheduler : public Scheduler { for (unsigned processor = 0; processor < instance_ptr->numberOfProcessors(); processor++) { - for (unsigned step = 0; step < (unsigned)(*node_to_processor_superstep_var_ptr)[0][0].Size(); + for (unsigned step = 0; step < static_cast((*node_to_processor_superstep_var_ptr)[0][0].Size()); step++) { assert(size < std::numeric_limits::max()); @@ -170,7 +170,7 @@ class TotalCommunicationScheduler : public Scheduler { for (unsigned processor = 0; processor < instance_ptr->numberOfProcessors(); processor++) { - for (unsigned step = 0; step < (unsigned)(*node_to_processor_superstep_var_ptr)[0][0].Size(); + for (unsigned step = 0; step < static_cast((*node_to_processor_superstep_var_ptr)[0][0].Size()); step++) { assert(step <= std::numeric_limits::max()); if (GetSolution( @@ -203,7 +203,7 @@ class TotalCommunicationScheduler : public Scheduler { for (unsigned processor = 0; processor < instance_ptr->numberOfProcessors(); processor++) { - for (unsigned step = 0; step < (unsigned)(*node_to_processor_superstep_var_ptr)[0][0].Size(); + for (unsigned step = 0; step < static_cast((*node_to_processor_superstep_var_ptr)[0][0].Size()); step++) { if (schedule.assignedProcessor(node) == processor && schedule.assignedSuperstep(node) == step) { @@ -438,7 +438,7 @@ class TotalCommunicationScheduler : public Scheduler { expr += node_to_processor_superstep_var[node][processor][static_cast(step)]; } } - model.AddConstr(expr <= (double)(instance.numberOfVertices() * instance.numberOfProcessors()) * + model.AddConstr(expr <= static_cast(instance.numberOfVertices() * instance.numberOfProcessors()) * superstep_used_var.GetVar(static_cast(step))); } @@ -576,13 +576,13 @@ class TotalCommunicationScheduler : public Scheduler { assert(step <= std::numeric_limits::max()); for (unsigned int processor = 0; processor < instance.numberOfProcessors(); processor++) { - Expr expr; + Expr expr_work; for (const auto &node : instance.vertices()) { - expr += instance.getComputationalDag().vertex_work_weight(node) * + expr_work += instance.getComputationalDag().vertex_work_weight(node) * node_to_processor_superstep_var[node][processor][static_cast(step)]; } - model.AddConstr(max_work_superstep_var[static_cast(step)] >= expr); + model.AddConstr(max_work_superstep_var[static_cast(step)] >= expr_work); } } @@ -597,7 +597,7 @@ class TotalCommunicationScheduler : public Scheduler { Objective function */ - double comm_cost = (double)instance.communicationCosts() / instance.numberOfProcessors(); + double comm_cost = static_cast(instance.communicationCosts()) / instance.numberOfProcessors(); model.SetObjective(comm_cost * total_edges_cut + expr - instance.synchronisationCosts(), COPT_MINIMIZE); } diff --git a/include/osp/bsp/scheduler/LocalSearch/KernighanLin/kl_base.hpp b/include/osp/bsp/scheduler/LocalSearch/KernighanLin/kl_base.hpp index e49a92cb..7d378d1b 100644 --- a/include/osp/bsp/scheduler/LocalSearch/KernighanLin/kl_base.hpp +++ b/include/osp/bsp/scheduler/LocalSearch/KernighanLin/kl_base.hpp @@ -455,7 +455,7 @@ class kl_base : public ImprovementScheduler, public Ikl_cost_function { (*node_heap_handles[node]).to_step = node_best_step; (*node_heap_handles[node]).change_in_cost = node_change_in_cost; - if ((*node_heap_handles[node]).gain != node_max_gain) { + if ((*node_heap_handles[node]).gain >= node_max_gain) { (*node_heap_handles[node]).gain = node_max_gain; max_gain_heap.update(node_heap_handles[node]); @@ -642,7 +642,7 @@ class kl_base : public ImprovementScheduler, public Ikl_cost_function { unsigned count = 0; for (auto iter = max_gain_heap.ordered_begin(); iter != max_gain_heap.ordered_end(); ++iter) { - if (iter->gain == max_gain_heap.top().gain && count < local_max) { + if (iter->gain >= max_gain_heap.top().gain && count < local_max) { max_nodes[count] = (iter->node); count++; @@ -1146,7 +1146,7 @@ class kl_base : public ImprovementScheduler, public Ikl_cost_function { compute_node_gain(node); moves.push_back(best_move_change_superstep(node)); - if (moves.back().gain == std::numeric_limits::lowest()) { + if (moves.back().gain <= std::numeric_limits::lowest()) { abort = true; break; } diff --git a/include/osp/pebbling/pebblers/pebblingILP/PebblingPartialILP.hpp b/include/osp/pebbling/pebblers/pebblingILP/PebblingPartialILP.hpp index e51470dc..1ac8561b 100644 --- a/include/osp/pebbling/pebblers/pebblingILP/PebblingPartialILP.hpp +++ b/include/osp/pebbling/pebblers/pebblingILP/PebblingPartialILP.hpp @@ -113,7 +113,7 @@ RETURN_STATUS PebblingPartialILP::computePebbling(PebblingSchedule > nodes_in_part(nr_parts), extra_sources(nr_parts), needs_blue_at_end(nr_parts); + std::vector > nodes_in_part(nr_parts), extra_sources(nr_parts); std::vector > original_node_id(nr_parts); std::vector > original_proc_id(nr_parts); for(vertex_idx node = 0; node < instance.numberOfVertices(); ++node) @@ -125,13 +125,6 @@ RETURN_STATUS PebblingPartialILP::computePebbling(PebblingSchedule subDags; diff --git a/include/osp/pebbling/pebblers/pebblingILP/partialILP/AcyclicDagDivider.hpp b/include/osp/pebbling/pebblers/pebblingILP/partialILP/AcyclicDagDivider.hpp index b65fad58..0fb97201 100644 --- a/include/osp/pebbling/pebblers/pebblingILP/partialILP/AcyclicDagDivider.hpp +++ b/include/osp/pebbling/pebblers/pebblingILP/partialILP/AcyclicDagDivider.hpp @@ -34,8 +34,6 @@ class AcyclicDagDivider { protected: using vertex_idx = vertex_idx_t; - std::vector node_to_part; - unsigned minPartitionSize = 40, maxPartitionSize = 80; bool ignore_sources_in_size = true; diff --git a/include/osp/pebbling/pebblers/pebblingILP/partialILP/SubproblemMultiScheduling.hpp b/include/osp/pebbling/pebblers/pebblingILP/partialILP/SubproblemMultiScheduling.hpp index 652e834a..6c277470 100644 --- a/include/osp/pebbling/pebblers/pebblingILP/partialILP/SubproblemMultiScheduling.hpp +++ b/include/osp/pebbling/pebblers/pebblingILP/partialILP/SubproblemMultiScheduling.hpp @@ -183,7 +183,7 @@ RETURN_STATUS SubproblemMultiScheduling::computeMultiSchedule(const Bsp ++itr_latest; } - std::vector > new_assingments = makeAssignment(instance, possible_nodes, free_procs); + new_assingments = makeAssignment(instance, possible_nodes, free_procs); for(auto entry : new_assingments) { vertex_idx node = entry.first;