diff --git a/include/osp/bsp/scheduler/GreedySchedulers/GreedyVarianceSspScheduler.hpp b/include/osp/bsp/scheduler/GreedySchedulers/GreedyVarianceSspScheduler.hpp new file mode 100644 index 00000000..7a6c454d --- /dev/null +++ b/include/osp/bsp/scheduler/GreedySchedulers/GreedyVarianceSspScheduler.hpp @@ -0,0 +1,649 @@ +/* +Copyright 2024 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Christos Matzoros, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "MemoryConstraintModules.hpp" +#include "osp/auxiliary/misc.hpp" +#include "osp/bsp/scheduler/MaxBspScheduler.hpp" +#include "osp/graph_algorithms/directed_graph_top_sort.hpp" + +namespace osp { + +/** + * @brief The GreedyVarianceSspScheduler class represents a scheduler that uses a greedy algorithm + * with stale synchronous parallel (SSP) execution model. + * + * It computes schedules for BspInstance using variance-based priorities. + */ +template +class GreedyVarianceSspScheduler : public MaxBspScheduler { + + static_assert(is_computational_dag_v, "GreedyVarianceSspScheduler can only be used with computational DAGs."); + + private: + using VertexType = vertex_idx_t; + + constexpr static bool use_memory_constraint = + is_memory_constraint_v or is_memory_constraint_schedule_v; + + static_assert(not use_memory_constraint or std::is_same_v, + "Graph_t must be the same as MemoryConstraint_t::Graph_impl_t."); + + MemoryConstraint_t memory_constraint; + double max_percent_idle_processors; + bool increase_parallelism_in_new_superstep; + + std::vector compute_work_variance(const Graph_t &graph) const { + std::vector work_variance(graph.num_vertices(), 0.0); + const std::vector top_order = GetTopOrder(graph); + + for (auto r_iter = top_order.rbegin(); r_iter != top_order.crend(); r_iter++) { + double temp = 0; + double max_priority = 0; + for (const auto &child : graph.children(*r_iter)) { + max_priority = std::max(work_variance[child], max_priority); + } + for (const auto &child : graph.children(*r_iter)) { + temp += std::exp(2 * (work_variance[child] - max_priority)); + } + temp = std::log(temp) / 2 + max_priority; + + double node_weight = std::log( + static_cast( + std::max( + graph.vertex_work_weight(*r_iter), + static_cast>(1) + ) + ) + ); + double larger_val = node_weight > temp ? node_weight : temp; + + work_variance[*r_iter] = + std::log(std::exp(node_weight - larger_val) + std::exp(temp - larger_val)) + larger_val; + } + + return work_variance; + } + + std::vector>> + procTypesCompatibleWithNodeType_omit_procType(const BspInstance &instance) const { + + const std::vector> procTypesCompatibleWithNodeType = + instance.getProcTypesCompatibleWithNodeType(); + + std::vector>> procTypesCompatibleWithNodeType_skip( + instance.getArchitecture().getNumberOfProcessorTypes(), + std::vector>(instance.getComputationalDag().num_vertex_types())); + for (unsigned procType = 0; procType < instance.getArchitecture().getNumberOfProcessorTypes(); procType++) { + for (unsigned nodeType = 0; nodeType < instance.getComputationalDag().num_vertex_types(); nodeType++) { + for (unsigned otherProcType : procTypesCompatibleWithNodeType[nodeType]) { + if (procType == otherProcType) + continue; + procTypesCompatibleWithNodeType_skip[procType][nodeType].emplace_back(otherProcType); + } + } + } + + return procTypesCompatibleWithNodeType_skip; + } + + struct VarianceCompare { + bool operator()(const std::pair &lhs, const std::pair &rhs) const { + return ((lhs.second > rhs.second) || ((lhs.second >= rhs.second) && (lhs.first < rhs.first))); + } + }; + + bool CanChooseNode(const BspInstance &instance, + const std::vector, VarianceCompare>> &allReady, + const std::vector, VarianceCompare>> &procReady, + const std::vector &procFree) const { + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) + if (procFree[i] && !procReady[i].empty()) + return true; + + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) + if (procFree[i] && !allReady[instance.getArchitecture().processorType(i)].empty()) + return true; + + return false; + } + + void Choose(const BspInstance &instance, + const std::vector &work_variance, + std::vector, VarianceCompare>> &allReady, + std::vector, VarianceCompare>> &procReady, + const std::vector &procFree, + VertexType &node, unsigned &p, + const bool endSupStep, + const v_workw_t remaining_time, + const std::vector>> &procTypesCompatibleWithNodeType_skip_proctype) const + { + double maxScore = -1; + bool found_allocation = false; + + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) { + if (!procFree[i] || procReady[i].empty()) + continue; + + auto it = procReady[i].begin(); + while (it != procReady[i].end()) { + if (endSupStep && + (remaining_time < instance.getComputationalDag().vertex_work_weight(it->first))) { + it = procReady[i].erase(it); + continue; + } + + const double &score = it->second; + + if (score > maxScore) { + const unsigned procType = instance.getArchitecture().processorType(i); + + if constexpr (use_memory_constraint) { + if (memory_constraint.can_add(it->first, i)) { + node = it->first; + p = i; + found_allocation = true; + + if (procType < procTypesCompatibleWithNodeType_skip_proctype.size()) { + const auto &compatibleTypes = + procTypesCompatibleWithNodeType_skip_proctype[procType] + [instance.getComputationalDag().vertex_type(node)]; + + for (unsigned otherType : compatibleTypes) { + for (unsigned j = 0; j < instance.numberOfProcessors(); ++j) { + if (j != i && + instance.getArchitecture().processorType(j) == otherType && + j < procReady.size()) { + procReady[j].erase(std::make_pair(node, work_variance[node])); + } + } + } + } + + return; + } + } else { + node = it->first; + p = i; + found_allocation = true; + + if (procType < procTypesCompatibleWithNodeType_skip_proctype.size()) { + const auto &compatibleTypes = + procTypesCompatibleWithNodeType_skip_proctype[procType] + [instance.getComputationalDag().vertex_type(node)]; + + for (unsigned otherType : compatibleTypes) { + for (unsigned j = 0; j < instance.numberOfProcessors(); ++j) { + if (j != i && + instance.getArchitecture().processorType(j) == otherType && + j < procReady.size()) { + procReady[j].erase(std::make_pair(node, work_variance[node])); + } + } + } + } + + return; + } + } + + ++it; + } + } + + if (found_allocation) + return; + + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) { + const unsigned procType = instance.getArchitecture().processorType(i); + if (!procFree[i] || procType >= allReady.size() || allReady[procType].empty()) + continue; + + auto &readyList = allReady[procType]; + auto it = readyList.begin(); + + while (it != readyList.end()) { + if (endSupStep && + (remaining_time < instance.getComputationalDag().vertex_work_weight(it->first))) { + it = readyList.erase(it); + continue; + } + + const double &score = it->second; + + if (score > maxScore) { + if constexpr (use_memory_constraint) { + if (memory_constraint.can_add(it->first, i)) { + node = it->first; + p = i; + + const auto &compatibleTypes = + procTypesCompatibleWithNodeType_skip_proctype[procType] + [instance.getComputationalDag().vertex_type(node)]; + + for (unsigned otherType : compatibleTypes) { + if (otherType < allReady.size()) + allReady[otherType].erase(std::make_pair(node, work_variance[node])); + } + + return; + } + } else { + node = it->first; + p = i; + + const auto &compatibleTypes = + procTypesCompatibleWithNodeType_skip_proctype[procType] + [instance.getComputationalDag().vertex_type(node)]; + + for (unsigned otherType : compatibleTypes) { + if (otherType < allReady.size()) + allReady[otherType].erase(std::make_pair(node, work_variance[node])); + } + + return; + } + } + ++it; + } + } + }; + + + bool check_mem_feasibility( + const BspInstance &instance, + const std::vector, VarianceCompare>> &allReady, + const std::vector, VarianceCompare>> &procReady) const + { + if constexpr (use_memory_constraint) { + if (instance.getArchitecture().getMemoryConstraintType() == MEMORY_CONSTRAINT_TYPE::PERSISTENT_AND_TRANSIENT) + { + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) { + if (!procReady[i].empty()) { + + const std::pair &node_pair = *procReady[i].begin(); + VertexType top_node = node_pair.first; + + if (memory_constraint.can_add(top_node, i)) { + return true; + } + } + } + + for (unsigned i = 0; i < instance.numberOfProcessors(); ++i) { + + if (allReady[instance.getArchitecture().processorType(i)].empty()) + continue; + + const std::pair &node_pair = + *allReady[instance.getArchitecture().processorType(i)].begin(); + VertexType top_node = node_pair.first; + + if (memory_constraint.can_add(top_node, i)) { + return true; + } + } + + return false; + } + } + + return true; + } + + unsigned get_nr_parallelizable_nodes( + const BspInstance &instance, + const unsigned &stale, + const std::vector &nr_old_ready_nodes_per_type, + const std::vector &nr_ready_nodes_per_type, + const std::vector, VarianceCompare>> &procReady, + const std::vector &nr_procs_per_type) const + { + unsigned nr_nodes = 0; + unsigned num_proc_types = instance.getArchitecture().getNumberOfProcessorTypes(); + + std::vector procs_per_type = nr_procs_per_type; + + if (stale > 1) { + for (unsigned proc = 0; proc < instance.numberOfProcessors(); proc++) { + if (!procReady[proc].empty()) { + procs_per_type[instance.getArchitecture().processorType(proc)]--; + nr_nodes++; + } + } + } + + std::vector ready_nodes_per_type = nr_ready_nodes_per_type; + for (unsigned node_type = 0; node_type < ready_nodes_per_type.size(); node_type++) { + ready_nodes_per_type[node_type] += nr_old_ready_nodes_per_type[node_type]; + } + + for (unsigned proc_type = 0; proc_type < num_proc_types; ++proc_type) { + for (unsigned node_type = 0; node_type < instance.getComputationalDag().num_vertex_types(); ++node_type) { + if (instance.isCompatibleType(node_type, proc_type)) { + unsigned matched = std::min(ready_nodes_per_type[node_type], + procs_per_type[proc_type]); + nr_nodes += matched; + ready_nodes_per_type[node_type] -= matched; + procs_per_type[proc_type] -= matched; + } + } + } + + return nr_nodes; + } + + public: + + /** + * @brief Default constructor for GreedyVarianceSspScheduler. + */ + GreedyVarianceSspScheduler(float max_percent_idle_processors_ = 0.2f, bool increase_parallelism_in_new_superstep_ = true) + : max_percent_idle_processors(max_percent_idle_processors_), + increase_parallelism_in_new_superstep(increase_parallelism_in_new_superstep_) {} + + /** + * @brief Default destructor for GreedyVarianceSspScheduler. + */ + virtual ~GreedyVarianceSspScheduler() = default; + + RETURN_STATUS computeSspSchedule(BspSchedule &schedule, unsigned stale) { + const auto &instance = schedule.getInstance(); + const auto &G = instance.getComputationalDag(); + const VertexType &N = instance.numberOfVertices(); + const unsigned &P = instance.numberOfProcessors(); + + unsigned supstepIdx = 0; + + if constexpr (is_memory_constraint_v) { + memory_constraint.initialize(instance); + } else if constexpr (is_memory_constraint_schedule_v) { + memory_constraint.initialize(schedule, supstepIdx); + } + + const std::vector work_variances = compute_work_variance(G); + + std::set, VarianceCompare> old_ready; + std::vector, VarianceCompare>> ready(stale); + std::vector, VarianceCompare>>> procReady( + stale, std::vector, VarianceCompare>>(P)); + std::vector, VarianceCompare>> allReady( + instance.getArchitecture().getNumberOfProcessorTypes()); + + const auto procTypesCompatibleWithNodeType = instance.getProcTypesCompatibleWithNodeType(); + const std::vector>> procTypesCompatibleWithNodeType_skip_proctype = + procTypesCompatibleWithNodeType_omit_procType(instance); + + std::vector nr_old_ready_nodes_per_type(G.num_vertex_types(), 0); + std::vector> nr_ready_stale_nodes_per_type( + stale, std::vector(G.num_vertex_types(), 0)); + std::vector nr_procs_per_type(instance.getArchitecture().getNumberOfProcessorTypes(), 0); + for (auto proc = 0u; proc < P; ++proc) { + ++nr_procs_per_type[instance.getArchitecture().processorType(proc)]; + } + + std::vector nrPredecRemain(N); + + for (VertexType node = 0; node < N; ++node) { + const auto num_parents = G.in_degree(node); + + nrPredecRemain[node] = num_parents; + + if (num_parents == 0) { + ready[0].insert(std::make_pair(node, work_variances[node])); + nr_ready_stale_nodes_per_type[0][G.vertex_type(node)]++; + } + } + + std::vector procFree(P, true); + unsigned free = P; + + std::set, VertexType>> finishTimes; + finishTimes.emplace(0, std::numeric_limits::max()); + + std::vector number_of_allocated_allReady_tasks_in_superstep(instance.getArchitecture().getNumberOfProcessorTypes(), 0); + std::vector limit_of_number_of_allocated_allReady_tasks_in_superstep(instance.getArchitecture().getNumberOfProcessorTypes(), 0); + + bool endSupStep = true; + bool begin_outer_while = true; + bool able_to_schedule_in_step = false; + unsigned successive_empty_supersteps = 0u; + + auto nonempty_ready = [&]() { + return std::any_of(ready.cbegin(), ready.cend(), + [](const std::set, VarianceCompare>& ready_set) { return !ready_set.empty(); }); + }; + + while (!old_ready.empty() || nonempty_ready() || !finishTimes.empty()) { + if (finishTimes.empty() && endSupStep) { + able_to_schedule_in_step = false; + number_of_allocated_allReady_tasks_in_superstep = std::vector(instance.getArchitecture().getNumberOfProcessorTypes(), 0); + + for (unsigned i = 0; i < P; ++i) + procReady[supstepIdx % stale][i].clear(); + + if (!begin_outer_while) { + supstepIdx++; + } else { + begin_outer_while = false; + } + + for (unsigned procType = 0; procType < instance.getArchitecture().getNumberOfProcessorTypes(); ++procType) + allReady[procType].clear(); + + old_ready.insert(ready[supstepIdx % stale].begin(), ready[supstepIdx % stale].end()); + ready[supstepIdx % stale].clear(); + for (unsigned node_type = 0; node_type < G.num_vertex_types(); ++node_type) { + nr_old_ready_nodes_per_type[node_type] += nr_ready_stale_nodes_per_type[supstepIdx % stale][node_type]; + nr_ready_stale_nodes_per_type[supstepIdx % stale][node_type] = 0; + } + + for (const auto &nodeAndValuePair : old_ready) { + VertexType node = nodeAndValuePair.first; + for (unsigned procType : procTypesCompatibleWithNodeType[G.vertex_type(node)]) { + allReady[procType].insert(allReady[procType].end(), nodeAndValuePair); + } + } + + if constexpr (use_memory_constraint) { + if (instance.getArchitecture().getMemoryConstraintType() == MEMORY_CONSTRAINT_TYPE::LOCAL) { + for (unsigned proc = 0; proc < P; proc++) + memory_constraint.reset(proc); + } + } + + for (unsigned procType = 0; procType < instance.getArchitecture().getNumberOfProcessorTypes(); procType++) { + unsigned equal_split = (static_cast(allReady[procType].size()) + stale - 1) / stale; + unsigned at_least_for_long_step = 3 * nr_procs_per_type[procType]; + limit_of_number_of_allocated_allReady_tasks_in_superstep[procType] = std::max(at_least_for_long_step, equal_split); + } + + endSupStep = false; + finishTimes.emplace(0, std::numeric_limits::max()); + } + + const v_workw_t time = finishTimes.begin()->first; + const v_workw_t max_finish_time = finishTimes.rbegin()->first; + + // Find new ready jobs + while (!finishTimes.empty() && finishTimes.begin()->first == time) { + const VertexType node = finishTimes.begin()->second; + finishTimes.erase(finishTimes.begin()); + + if (node != std::numeric_limits::max()) { + const unsigned proc_of_node = schedule.assignedProcessor(node); + + for (const auto& succ : G.children(node)) { + nrPredecRemain[succ]--; + if (nrPredecRemain[succ] == 0) { + ready[supstepIdx % stale].emplace(succ, work_variances[succ]); + nr_ready_stale_nodes_per_type[supstepIdx % stale][G.vertex_type(succ)]++; + + unsigned earliest_add = supstepIdx; + for (const auto& pred : G.parents(succ)) { + if (schedule.assignedProcessor(pred) != proc_of_node) { + earliest_add = std::max(earliest_add, stale + schedule.assignedSuperstep(pred)); + } + } + + if (instance.isCompatible(succ, proc_of_node)) { + bool memory_ok = true; + + if constexpr (use_memory_constraint) { + if (earliest_add == supstepIdx) { + memory_ok = memory_constraint.can_add(succ, proc_of_node); + } + } + for (unsigned step_to_add = earliest_add; + step_to_add < supstepIdx + stale; ++step_to_add) { + if ((step_to_add == supstepIdx) && !memory_ok) { + continue; + } + procReady[step_to_add % stale][proc_of_node].emplace( + succ, work_variances[succ]); + } + } + } + } + + procFree[proc_of_node] = true; + ++free; + } + } + + // Assign new jobs + if (!CanChooseNode(instance, allReady, procReady[supstepIdx % stale], procFree)) { + endSupStep = true; + } + + while (CanChooseNode(instance, allReady, procReady[supstepIdx % stale], procFree)) { + VertexType nextNode = std::numeric_limits::max(); + unsigned nextProc = P; + + Choose( instance, work_variances, allReady, + procReady[supstepIdx % stale], procFree, + nextNode, nextProc, endSupStep, max_finish_time - time, procTypesCompatibleWithNodeType_skip_proctype); + + if (nextNode == std::numeric_limits::max() || nextProc == P) { + endSupStep = true; + break; + } + + if (procReady[supstepIdx % stale][nextProc].find(std::make_pair(nextNode, work_variances[nextNode])) != + procReady[supstepIdx % stale][nextProc].end()) { + for (size_t i = 0; i < stale; i++) { + procReady[i][nextProc].erase(std::make_pair(nextNode, work_variances[nextNode])); + } + } else { + for(unsigned procType : procTypesCompatibleWithNodeType[G.vertex_type(nextNode)]) { + allReady[procType].erase(std::make_pair(nextNode, work_variances[nextNode])); + } + nr_old_ready_nodes_per_type[G.vertex_type(nextNode)]--; + const unsigned nextProcType = instance.getArchitecture().processorType(nextProc); + number_of_allocated_allReady_tasks_in_superstep[nextProcType]++; + + if (number_of_allocated_allReady_tasks_in_superstep[nextProcType] >= limit_of_number_of_allocated_allReady_tasks_in_superstep[nextProcType]) { + allReady[nextProcType].clear(); + } + } + + for (size_t i = 0; i < stale; i++) { + ready[i].erase(std::make_pair(nextNode, work_variances[nextNode])); + } + + old_ready.erase(std::make_pair(nextNode, work_variances[nextNode])); + + schedule.setAssignedProcessor(nextNode, nextProc); + schedule.setAssignedSuperstep(nextNode, supstepIdx); + able_to_schedule_in_step = true; + + if constexpr (use_memory_constraint) { + memory_constraint.add(nextNode, nextProc); + + std::vector> toErase; + for (const auto &node_pair : procReady[supstepIdx % stale][nextProc]) { + if (!memory_constraint.can_add(node_pair.first, nextProc)) { + toErase.push_back(node_pair); + } + } + for (const auto &n : toErase) { + procReady[supstepIdx % stale][nextProc].erase(n); + } + } + + finishTimes.emplace(time + G.vertex_work_weight(nextNode), nextNode); + procFree[nextProc] = false; + --free; + } + + if (able_to_schedule_in_step) + successive_empty_supersteps = 0; + else if (++successive_empty_supersteps > 100 + stale) + return RETURN_STATUS::ERROR; + + if (free > (P * max_percent_idle_processors) && + ((!increase_parallelism_in_new_superstep) || + get_nr_parallelizable_nodes( + instance, stale, nr_old_ready_nodes_per_type, + nr_ready_stale_nodes_per_type[(supstepIdx + 1) % stale], + procReady[(supstepIdx + 1) % stale], + nr_procs_per_type) >= std::min( + std::min(P, static_cast(1.2 * (P - free))), + P - free + static_cast(0.5 * free)))) + { + endSupStep = true; + } + } + + assert(schedule.satisfiesPrecedenceConstraints()); + //schedule.setAutoCommunicationSchedule(); + + return RETURN_STATUS::OSP_SUCCESS; + } + + RETURN_STATUS computeSchedule(BspSchedule &schedule) override { + return computeSspSchedule(schedule, 1U); + } + + RETURN_STATUS computeSchedule(MaxBspSchedule &schedule) override { + return computeSspSchedule(schedule, 2U); + } + + std::string getScheduleName() const override { + if constexpr (use_memory_constraint) { + return "GreedyVarianceSspMemory"; + } else { + return "GreedyVarianceSsp"; + } + } + +}; + +} // namespace osp diff --git a/include/osp/bsp/scheduler/MaxBspScheduler.hpp b/include/osp/bsp/scheduler/MaxBspScheduler.hpp new file mode 100644 index 00000000..c6accf25 --- /dev/null +++ b/include/osp/bsp/scheduler/MaxBspScheduler.hpp @@ -0,0 +1,88 @@ +/* +Copyright 2024 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Christos Matzoros, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner +*/ + +#pragma once + +#include "osp/bsp/model/MaxBspSchedule.hpp" +#include "osp/bsp/model/MaxBspScheduleCS.hpp" +#include "osp/bsp/scheduler/Scheduler.hpp" + +namespace osp { + +/** + * @class Scheduler + * @brief Abstract base class for scheduling scheduler. + * + * The Scheduler class provides a common interface for scheduling scheduler in the BSP scheduling system. + * It defines methods for setting and getting the time limit, as well as computing schedules. + */ +template +class MaxBspScheduler : public Scheduler { + public: + + static_assert(is_computational_dag_v, "BspSchedule can only be used with computational DAGs."); + + /** + * @brief Get the name of the scheduling algorithm. + * @return The name of the scheduling algorithm. + */ + virtual std::string getScheduleName() const override = 0; + + /** + * @brief Compute a BSP schedule for the given BSP instance. + * @param instance The BSP instance for which to compute the schedule. + * @return A pair containing the return status and the computed schedule. + */ + virtual RETURN_STATUS computeSchedule(BspSchedule &schedule) override { + MaxBspSchedule tmpSched(schedule.getInstance()); + RETURN_STATUS status = computeSchedule(tmpSched); + schedule = tmpSched; + return status; + } + + virtual RETURN_STATUS computeScheduleCS(BspScheduleCS &schedule) override { + MaxBspScheduleCS tmpSchedule(schedule.getInstance()); + auto result = computeScheduleCS(tmpSchedule); + if (result == RETURN_STATUS::OSP_SUCCESS || result == RETURN_STATUS::BEST_FOUND) { + schedule = tmpSchedule; + schedule.setAutoCommunicationSchedule(); + return result; + } else { + return RETURN_STATUS::ERROR; + } + } + + /** + * @brief Compute a BSP schedule for the given BSP instance. + * @param instance The BSP instance for which to compute the schedule. + * @return A pair containing the return status and the computed schedule. + */ + virtual RETURN_STATUS computeSchedule(MaxBspSchedule &schedule) = 0; + + virtual RETURN_STATUS computeScheduleCS(MaxBspScheduleCS &schedule) { + auto result = computeSchedule(schedule); + if (result == RETURN_STATUS::OSP_SUCCESS || result == RETURN_STATUS::BEST_FOUND) { + // schedule.setAutoCommunicationSchedule(); + return result; + } else { + return RETURN_STATUS::ERROR; + } + }; +}; + +} // namespace osp \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 17975dde..8738643f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -122,6 +122,8 @@ endif() _add_test( bsp_schedulers ) +_add_test( max_bsp_schedulers ) + _add_test( bsp_schedulers_mem_const ) _add_test( bsp_improvementschedulers ) diff --git a/tests/max_bsp_schedulers.cpp b/tests/max_bsp_schedulers.cpp new file mode 100644 index 00000000..a3a9aeda --- /dev/null +++ b/tests/max_bsp_schedulers.cpp @@ -0,0 +1,149 @@ +/* +Copyright 2024 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Christos Matzoros, Pal Andras Papp, Raphael S. Steiner +*/ + +#define BOOST_TEST_MODULE BSP_SCHEDULERS +#include + +#include +#include +#include + + +#include "osp/bsp/scheduler/GreedySchedulers/GreedyBspScheduler.hpp" +#include "osp/bsp/scheduler/GreedySchedulers/GreedyVarianceSspScheduler.hpp" +#include "osp/bsp/scheduler/MaxBspScheduler.hpp" +#include "osp/graph_implementations/adj_list_impl/computational_dag_edge_idx_vector_impl.hpp" +#include "osp/graph_implementations/adj_list_impl/computational_dag_vector_impl.hpp" +#include "osp/auxiliary/io/arch_file_reader.hpp" +#include "osp/auxiliary/io/hdag_graph_file_reader.hpp" +#include "osp/auxiliary/io/general_file_reader.hpp" +#include "test_graphs.hpp" + +using namespace osp; + +std::vector test_architectures() { return {"data/machine_params/p3.arch"}; } + +template +void run_test(Scheduler *test_scheduler) { + // static_assert(std::is_base_of::value, "Class is not a scheduler!"); + std::vector filenames_graph = tiny_spaa_graphs(); + std::vector filenames_architectures = test_architectures(); + + // Getting root git directory + std::filesystem::path cwd = std::filesystem::current_path(); + std::cout << cwd << std::endl; + while ((!cwd.empty()) && (cwd.filename() != "OneStopParallel")) { + cwd = cwd.parent_path(); + std::cout << cwd << std::endl; + } + + for (auto &filename_graph : filenames_graph) { + for (auto &filename_machine : filenames_architectures) { + std::string name_graph = filename_graph.substr(filename_graph.find_last_of("/\\") + 1); + name_graph = name_graph.substr(0, name_graph.find_last_of(".")); + std::string name_machine = filename_machine.substr(filename_machine.find_last_of("/\\") + 1); + name_machine = name_machine.substr(0, name_machine.rfind(".")); + + std::cout << std::endl << "Scheduler: " << test_scheduler->getScheduleName() << std::endl; + std::cout << "Graph: " << name_graph << std::endl; + std::cout << "Architecture: " << name_machine << std::endl; + + BspInstance instance; + + bool status_graph = file_reader::readGraph((cwd / filename_graph).string(), + instance.getComputationalDag()); + bool status_architecture = file_reader::readBspArchitecture((cwd / "data/machine_params/p3.arch").string(), + instance.getArchitecture()); + + if (!status_graph || !status_architecture) { + + std::cout << "Reading files failed." << std::endl; + BOOST_CHECK(false); + } + + BspSchedule schedule(instance); + const auto result = test_scheduler->computeSchedule(schedule); + + BOOST_CHECK_EQUAL(RETURN_STATUS::OSP_SUCCESS, result); + BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); + } + } +}; + +template +void run_test_max_bsp(MaxBspScheduler* test_scheduler) { + std::vector filenames_graph = tiny_spaa_graphs(); + std::vector filenames_architectures = test_architectures(); + + // Locate project root + std::filesystem::path cwd = std::filesystem::current_path(); + while ((!cwd.empty()) && (cwd.filename() != "OneStopParallel")) { + cwd = cwd.parent_path(); + } + + for (auto& filename_graph : filenames_graph) { + for (auto& filename_machine : filenames_architectures) { + std::string name_graph = filename_graph.substr(filename_graph.find_last_of("/\\") + 1); + name_graph = name_graph.substr(0, name_graph.find_last_of(".")); + std::string name_machine = filename_machine.substr(filename_machine.find_last_of("/\\") + 1); + name_machine = name_machine.substr(0, name_machine.rfind(".")); + + std::cout << std::endl + << "Scheduler (MaxBsp): " << test_scheduler->getScheduleName() << std::endl + << "Graph: " << name_graph << std::endl + << "Architecture: " << name_machine << std::endl; + + computational_dag_edge_idx_vector_impl_def_int_t graph; + BspArchitecture arch; + + bool status_graph = file_reader::readGraph((cwd / filename_graph).string(), graph); + bool status_architecture = + file_reader::readBspArchitecture((cwd / filename_machine).string(), arch); + + BOOST_REQUIRE_MESSAGE(status_graph, "Failed to read graph: " << filename_graph); + BOOST_REQUIRE_MESSAGE(status_architecture, "Failed to read architecture: " << filename_machine); + + BspInstance instance(graph, arch); + + MaxBspSchedule schedule(instance); + + const auto result = test_scheduler->computeSchedule(schedule); + + BOOST_CHECK_EQUAL(result, RETURN_STATUS::OSP_SUCCESS); + BOOST_CHECK(schedule.satisfiesPrecedenceConstraints()); + } + } +} + +// Tests computeSchedule(BspSchedule&) → staleness = 1 +BOOST_AUTO_TEST_CASE(GreedyVarianceSspScheduler_test_vector_impl) { + GreedyVarianceSspScheduler test; + run_test(&test); +} + +// Tests computeSchedule(BspSchedule&) → staleness = 1 (different graph impl) +BOOST_AUTO_TEST_CASE(GreedyVarianceSspScheduler_test_edge_idx_impl) { + GreedyVarianceSspScheduler test; + run_test(&test); +} + +// Tests computeSchedule(MaxBspSchedule&) → staleness = 2 +BOOST_AUTO_TEST_CASE(GreedyVarianceSspScheduler_MaxBspSchedule_large_test) { + GreedyVarianceSspScheduler test; + run_test_max_bsp(&test); +}