diff --git a/apps/test_suite_runner/StringToScheduler/run_bsp_recomp_scheduler.hpp b/apps/test_suite_runner/StringToScheduler/run_bsp_recomp_scheduler.hpp index 4d8fa110..c9f1cf06 100644 --- a/apps/test_suite_runner/StringToScheduler/run_bsp_recomp_scheduler.hpp +++ b/apps/test_suite_runner/StringToScheduler/run_bsp_recomp_scheduler.hpp @@ -58,7 +58,7 @@ ReturnStatus RunBspRecompScheduler(const ConfigParser &parser, GreedyRecomputer scheduler; - return scheduler.ComputeRecompSchedule(initialSchedule, schedule); + return scheduler.ComputeRecompScheduleBasic(initialSchedule, schedule); } else { throw std::invalid_argument("Parameter error: Unknown algorithm.\n"); diff --git a/include/osp/bsp/model/BspScheduleRecomp.hpp b/include/osp/bsp/model/BspScheduleRecomp.hpp index 14477a1f..7dc7b6db 100644 --- a/include/osp/bsp/model/BspScheduleRecomp.hpp +++ b/include/osp/bsp/model/BspScheduleRecomp.hpp @@ -126,6 +126,7 @@ class BspScheduleRecomp : public IBspScheduleEval { VertexIdx GetTotalAssignments() const; void MergeSupersteps(); + void CleanSchedule(); }; template @@ -324,4 +325,103 @@ void BspScheduleRecomp::MergeSupersteps() { numberOfSupersteps_ = currentStepIdx; } +// remove unneeded comm. schedule entries - these can happen in several algorithms +template +void BspScheduleRecomp::CleanSchedule() +{ + // I. Data that is already present before it arrives + std::vector>> arrivesAt(instance_->NumberOfVertices(), + std::vector>(instance_->NumberOfProcessors())); + for (const auto &node : instance_->GetComputationalDag().Vertices()) { + for (const auto &procAndStep : nodeToProcessorAndSupertepAssignment_[node]) { + arrivesAt[node][procAndStep.first].insert(procAndStep.second); + } + } + + for (auto const &[key, val] : commSchedule_) { + arrivesAt[std::get<0>(key)][std::get<2>(key)].insert(val); + } + + // - computation steps + for (const auto &node : instance_->GetComputationalDag().Vertices()) { + for (unsigned index = 0; index < nodeToProcessorAndSupertepAssignment_[node].size(); ) { + const auto &procAndStep = nodeToProcessorAndSupertepAssignment_[node][index]; + if(*arrivesAt[node][procAndStep.first].begin() < procAndStep.second) { + nodeToProcessorAndSupertepAssignment_[node][index] = nodeToProcessorAndSupertepAssignment_[node].back(); + nodeToProcessorAndSupertepAssignment_[node].pop_back(); + } else { + ++index; + } + } + } + + // - communication steps + std::vector toErase; + for (auto const &[key, val] : commSchedule_) { + auto itr = arrivesAt[std::get<0>(key)][std::get<2>(key)].begin(); + if (*itr < val) { + toErase.push_back(key); + } else if (*itr == val && ++itr != arrivesAt[std::get<0>(key)][std::get<2>(key)].end() && *itr == val) { + toErase.push_back(key); + arrivesAt[std::get<0>(key)][std::get<2>(key)].erase(itr); + } + } + + for (const KeyTriple &key : toErase) { + commSchedule_.erase(key); + } + + // II. Data that is not used after being computed/sent + std::vector>> usedAt(instance_->NumberOfVertices(), + std::vector>(instance_->NumberOfProcessors())); + for (const auto &node : instance_->GetComputationalDag().Vertices()) { + for (const auto &child : instance_->GetComputationalDag().Children(node)) { + for (const auto &procAndStep : nodeToProcessorAndSupertepAssignment_[child]) { + usedAt[node][procAndStep.first].insert(procAndStep.second); + } + } + } + + for (auto const &[key, val] : commSchedule_) { + usedAt[std::get<0>(key)][std::get<1>(key)].insert(val); + } + + // - computation steps + for (const auto &node : instance_->GetComputationalDag().Vertices()) { + for (unsigned index = 0; index < nodeToProcessorAndSupertepAssignment_[node].size(); ) { + const auto &procAndStep = nodeToProcessorAndSupertepAssignment_[node][index]; + if ((usedAt[node][procAndStep.first].empty() || *usedAt[node][procAndStep.first].rbegin() < procAndStep.second) + && index > 0) + { + nodeToProcessorAndSupertepAssignment_[node][index] = nodeToProcessorAndSupertepAssignment_[node].back(); + nodeToProcessorAndSupertepAssignment_[node].pop_back(); + } else { + ++index; + } + } + } + + // - communication steps (need to visit cs entries in reverse superstep order here) + std::vector> entries(numberOfSupersteps_); + for (auto const &[key, val] : commSchedule_) { + entries[val].push_back(key); + } + + toErase.clear(); + for (unsigned step = numberOfSupersteps_ - 1; step < numberOfSupersteps_; --step) { + for (const KeyTriple &key : entries[step]) { + if (usedAt[std::get<0>(key)][std::get<2>(key)].empty() + || *usedAt[std::get<0>(key)][std::get<2>(key)].rbegin() <= step) { + toErase.push_back(key); + auto itr = usedAt[std::get<0>(key)][std::get<1>(key)].find(step); + usedAt[std::get<0>(key)][std::get<1>(key)].erase(itr); + } + } + } + + for (const KeyTriple &key : toErase) { + commSchedule_.erase(key); + } +} + } // namespace osp diff --git a/include/osp/bsp/scheduler/GreedySchedulers/GreedyRecomputer.hpp b/include/osp/bsp/scheduler/GreedySchedulers/GreedyRecomputer.hpp index 20e3fdc3..1feeecf7 100644 --- a/include/osp/bsp/scheduler/GreedySchedulers/GreedyRecomputer.hpp +++ b/include/osp/bsp/scheduler/GreedySchedulers/GreedyRecomputer.hpp @@ -23,7 +23,7 @@ limitations under the License. namespace osp { /** - * @brief The GreedyReccomputer class applies a greedy algorithm to remove some of the communciation steps in + * @brief The GreedyReccomputer class applies a greedy algorithm to replace some of the communciation steps in * a BspSchedule by recomputation steps if this decreases the cost. */ template @@ -38,185 +38,999 @@ class GreedyRecomputer { static_assert(std::is_same_v, VCommwT>, "GreedyRecomputer requires work and comm. weights to have the same type."); + // auxiliary data to handle schedule efficiently + std::vector> workCost_, sendCost_, recCost_; + std::vector> firstPresent_; + std::vector > > neededOnProc_; + std::vector > > nodesPerProcAndStep_; + std::vector maxWork_, maxComm_; + std::vector > commSteps_; + + void RefreshAuxData(const BspScheduleRecomp &schedule); + + // elementary operations to edit schedule - add/remove step, and update data structures + void AddCommStep(const BspScheduleRecomp &schedule, const KeyTriple &newComm, const unsigned step); + void RemoveCommStep(const BspScheduleRecomp &schedule, const KeyTriple &removedComm, const unsigned step); + void AddRecomputeStep(BspScheduleRecomp &schedule, const VertexIdx node, const unsigned proc, const unsigned step); + + // DIFFERENT TECHNIQUES TO IMPROVE SCHEDULE BY INTRODUCING RECOMPUTATION + // (return values show whether there were any succesful improvement steps) + + // Replace single comm. steps by recomp, if it is better + bool GreedyImprove(BspScheduleRecomp &schedule); + + // Merge consecutive supersteps using recomp, if it is better + bool MergeEntireSupersteps(BspScheduleRecomp &schedule); + + // Copy all the (necessary) nodes from one processor to another in a superstep, if it is better + bool RecomputeEntireSupersteps(BspScheduleRecomp &schedule); + + // Remove multiple comm steps from the same superstep at once, attempting to escape local minima + bool BatchRemoveSteps(BspScheduleRecomp &schedule); + public: /** * @brief Default destructor for GreedyRecomputer. */ virtual ~GreedyRecomputer() = default; - ReturnStatus ComputeRecompSchedule(BspScheduleCS &initialSchedule, BspScheduleRecomp &outSchedule) const; + ReturnStatus ComputeRecompScheduleBasic(BspScheduleCS &initialSchedule, BspScheduleRecomp &recompSchedule); + + ReturnStatus ComputeRecompScheduleAdvanced(BspScheduleCS &initialSchedule, BspScheduleRecomp &recompSchedule); }; template -ReturnStatus GreedyRecomputer::ComputeRecompSchedule(BspScheduleCS &initialSchedule, - BspScheduleRecomp &outSchedule) const { - const VertexIdx &n = initialSchedule.GetInstance().NumberOfVertices(); - const unsigned &p = initialSchedule.GetInstance().NumberOfProcessors(); - const unsigned &s = initialSchedule.NumberOfSupersteps(); - const GraphT &g = initialSchedule.GetInstance().GetComputationalDag(); +ReturnStatus GreedyRecomputer::ComputeRecompScheduleBasic(BspScheduleCS &initialSchedule, BspScheduleRecomp &recompSchedule) +{ + recompSchedule = BspScheduleRecomp(initialSchedule); + GreedyImprove(recompSchedule); + recompSchedule.MergeSupersteps(); + return ReturnStatus::OSP_SUCCESS; +} + +template +ReturnStatus GreedyRecomputer::ComputeRecompScheduleAdvanced(BspScheduleCS &initialSchedule, BspScheduleRecomp &recompSchedule) +{ + recompSchedule = BspScheduleRecomp(initialSchedule); + bool keepsImproving = true; + while (keepsImproving) + { + keepsImproving = BatchRemoveSteps(recompSchedule); // no need for greedyImprove if we use this more general one + recompSchedule.MergeSupersteps(); - outSchedule = BspScheduleRecomp(initialSchedule.GetInstance()); - outSchedule.SetNumberOfSupersteps(initialSchedule.NumberOfSupersteps()); + keepsImproving = MergeEntireSupersteps(recompSchedule) || keepsImproving; + recompSchedule.CleanSchedule(); + recompSchedule.MergeSupersteps(); + + keepsImproving = RecomputeEntireSupersteps(recompSchedule) || keepsImproving; + recompSchedule.MergeSupersteps(); + + // add further methods, if desired + } + + return ReturnStatus::OSP_SUCCESS; +} + +template +bool GreedyRecomputer::GreedyImprove(BspScheduleRecomp &schedule) +{ + const VertexIdx N = schedule.GetInstance().NumberOfVertices(); + const unsigned P = schedule.GetInstance().NumberOfProcessors(); + const unsigned S = schedule.NumberOfSupersteps(); + const GraphT &G = schedule.GetInstance().GetComputationalDag(); + + bool improved = false; // Initialize required data structures - std::vector> workCost(p, std::vector(s, 0)), sendCost(p, std::vector(s, 0)), - recCost(p, std::vector(s, 0)); + RefreshAuxData(schedule); + + std::vector> firstComputable(N, std::vector(P, 0U)); + for (VertexIdx node = 0; node < N; ++node) { + for (const VertexIdx &pred : G.Parents(node)) { + for (unsigned proc = 0; proc < P; ++proc) { + firstComputable[node][proc] = std::max(firstComputable[node][proc], firstPresent_[pred][proc]); + } + } + } + + // Find improvement steps + bool stillImproved = true; + while (stillImproved) { + stillImproved = false; - std::vector> firstComputable(n, std::vector(p, 0U)), - firstPresent(n, std::vector(p, std::numeric_limits::max())); + for (unsigned step = 0; step < S; ++step) { + std::vector toErase; + for (const KeyTriple &entry : commSteps_[step]) { + const VertexIdx &node = std::get<0>(entry); + const unsigned &fromProc = std::get<1>(entry); + const unsigned &toProc = std::get<2>(entry); - std::vector>> neededOnProc(n, std::vector>(p, {s})); + // check how much comm cost we save by removing comm schedule entry + CostType commInduced = G.VertexCommWeight(node) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); - std::vector maxWork(s, 0), maxComm(s, 0); + CostType newMaxComm = 0; + for (unsigned proc = 0; proc < P; ++proc) { + if (proc == fromProc) { + newMaxComm = std::max(newMaxComm, sendCost_[proc][step] - commInduced); + } else { + newMaxComm = std::max(newMaxComm, sendCost_[proc][step]); + } + if (proc == toProc) { + newMaxComm = std::max(newMaxComm, recCost_[proc][step] - commInduced); + } else { + newMaxComm = std::max(newMaxComm, recCost_[proc][step]); + } + } + if (newMaxComm == maxComm_[step]) { + continue; + } - std::vector> commSteps(s); + if (!schedule.GetInstance().IsCompatible(node, toProc)) { + continue; + } + + CostType decrease = maxComm_[step] - newMaxComm; + if (maxComm_[step] > 0 && newMaxComm == 0) { + decrease += schedule.GetInstance().GetArchitecture().SynchronisationCosts(); + } + + // check how much it would increase the work cost instead + unsigned bestStep = S; + CostType smallestIncrease = std::numeric_limits::max(); + for (unsigned compStep = firstComputable[node][toProc]; compStep <= *neededOnProc_[node][toProc].begin(); ++compStep) { + CostType increase = workCost_[toProc][compStep] + G.VertexWorkWeight(node) > maxWork_[compStep] + ? workCost_[toProc][compStep] + G.VertexWorkWeight(node) - maxWork_[compStep] + : 0; + + if (increase < smallestIncrease) { + bestStep = compStep; + smallestIncrease = increase; + } + } - for (VertexIdx node = 0; node < n; ++node) { - const unsigned &proc = initialSchedule.AssignedProcessor(node); - const unsigned &step = initialSchedule.AssignedSuperstep(node); + // check if this modification is beneficial + if (bestStep == S || smallestIncrease > decrease) { + continue; + } + + // execute the modification + toErase.emplace_back(entry); + AddRecomputeStep(schedule, node, toProc, bestStep); + improved = true; + + sendCost_[fromProc][step] -= commInduced; + recCost_[toProc][step] -= commInduced; + maxComm_[step] = newMaxComm; + + maxWork_[bestStep] += smallestIncrease; + + // update movability bounds + neededOnProc_[node][fromProc].erase(neededOnProc_[node][fromProc].lower_bound(step)); + + firstPresent_[node][toProc] = bestStep; + for (const VertexIdx &succ : G.Children(node)) { + firstComputable[succ][toProc] = 0U; + for (const VertexIdx &pred : G.Parents(succ)) { + firstComputable[succ][toProc] = std::max(firstComputable[succ][toProc], firstPresent_[pred][toProc]); + } + } - workCost[proc][step] += g.VertexWorkWeight(node); - firstPresent[node][proc] = std::min(firstPresent[node][proc], step); - for (VertexIdx pred : g.Parents(node)) { - neededOnProc[pred][proc].insert(step); + stillImproved = true; } + for (const KeyTriple &entry : toErase) { + commSteps_[step].erase(entry); + } + } + } - outSchedule.Assignments(node).emplace_back(proc, step); + schedule.GetCommunicationSchedule().clear(); + for (unsigned step = 0; step < S; ++step) { + for (const KeyTriple &entry : commSteps_[step]) { + schedule.AddCommunicationScheduleEntry(entry, step); + } } - for (const std::pair item : initialSchedule.GetCommunicationSchedule()) { - const VertexIdx &node = std::get<0>(item.first); - const unsigned &fromProc = std::get<1>(item.first); - const unsigned &toProc = std::get<2>(item.first); - const unsigned &step = item.second; - sendCost[fromProc][step] - += g.VertexCommWeight(node) * initialSchedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); - recCost[toProc][step] - += g.VertexCommWeight(node) * initialSchedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); - commSteps[step].emplace(item.first); - neededOnProc[node][fromProc].insert(step); - firstPresent[node][toProc] = std::min(firstPresent[node][toProc], step + 1); + return improved; +} + +template +bool GreedyRecomputer::MergeEntireSupersteps(BspScheduleRecomp &schedule) +{ + bool improved = false; + RefreshAuxData(schedule); + std::vector stepRemoved(schedule.NumberOfSupersteps(), false); + + const GraphT &G = schedule.GetInstance().GetComputationalDag(); + + unsigned previousStep = 0; + for (unsigned step = 0; step < schedule.NumberOfSupersteps() - 1; ++step) { + if (stepRemoved[step]) { + continue; } - for (unsigned step = 0; step < s; ++step) { - for (unsigned proc = 0; proc < p; ++proc) { - maxWork[step] = std::max(maxWork[step], workCost[proc][step]); - maxComm[step] = std::max(maxComm[step], sendCost[proc][step]); - maxComm[step] = std::max(maxComm[step], recCost[proc][step]); + + for (unsigned nextStep = step + 1; nextStep < schedule.NumberOfSupersteps(); ++nextStep) { + + // TRY TO MERGE step AND nextStep + std::set newCommStepsBefore, newCommStepsAfter; + std::set > newWorkSteps; + + std::vector > mustReplicate(schedule.GetInstance().NumberOfProcessors()); + + for (const KeyTriple &entry : commSteps_[step]) { + const VertexIdx &node = std::get<0>(entry); + const unsigned &fromProc = std::get<1>(entry); + const unsigned &toProc = std::get<2>(entry); + + bool used = false; + if (neededOnProc_[node][toProc].empty() || *neededOnProc_[node][toProc].begin() > nextStep) { + newCommStepsAfter.insert(entry); + continue; + } + + if (step > 0 && firstPresent_[node][fromProc] <= previousStep) { + newCommStepsBefore.insert(entry); + } else { + mustReplicate[toProc].insert(node); + } + } + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + for (const VertexIdx node : nodesPerProcAndStep_[proc][nextStep]) { + newWorkSteps.emplace(node, proc); + } + + while (!mustReplicate[proc].empty()) { + const VertexIdx node = *mustReplicate[proc].begin(); + mustReplicate[proc].erase(mustReplicate[proc].begin()); + if (newWorkSteps.find(std::make_pair(node, proc)) != newWorkSteps.end()) { + continue; + } + newWorkSteps.emplace(node, proc); + for (const VertexIdx &pred : G.Parents(node)) { + if (firstPresent_[pred][proc] <= step) { + continue; + } + + unsigned sendFromProcBefore = std::numeric_limits::max(); + for (unsigned procOffset = 0; procOffset < schedule.GetInstance().NumberOfProcessors(); ++procOffset) { + unsigned fromProc = (proc + procOffset) % schedule.GetInstance().NumberOfProcessors(); + if (step > 0 && firstPresent_[pred][fromProc] <= previousStep) { + sendFromProcBefore = fromProc; + break; + } + } + if (sendFromProcBefore < std::numeric_limits::max()) { + newCommStepsBefore.emplace(pred, sendFromProcBefore, proc); + } else { + mustReplicate[proc].insert(pred); + } + } + } + } + + // now that newWorkSteps is finalized, check types + bool typesIncompatible = false; + for (const std::pair &nodeAndProc : newWorkSteps) { + if (!schedule.GetInstance().IsCompatible(nodeAndProc.first, nodeAndProc.second)) { + typesIncompatible = true; + break; + } + } + if (typesIncompatible) { + break; + } + + // EVALUATE COST + int costChange = 0; + + // work cost in merged step + std::vector newWorkCost(schedule.GetInstance().NumberOfProcessors()); + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newWorkCost[proc] = workCost_[proc][step]; + } + + for (const std::pair &newCompute : newWorkSteps) { + newWorkCost[newCompute.second] += G.VertexWorkWeight(newCompute.first); + } + + CostType newMax = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newMax = std::max(newMax, newWorkCost[proc]); + } + + costChange += static_cast(newMax) - static_cast(maxWork_[step] + maxWork_[nextStep]); + + // comm cost before merged step + std::vector newSendCost(schedule.GetInstance().NumberOfProcessors()), newRecCost(schedule.GetInstance().NumberOfProcessors()); + if (step > 0) { + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newSendCost[proc] = sendCost_[proc][previousStep]; + newRecCost[proc] = recCost_[proc][previousStep]; + } + for (const KeyTriple &newComm : newCommStepsBefore) { + CostType commCost = G.VertexCommWeight(std::get<0>(newComm)) * + schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(newComm), std::get<2>(newComm)); + newSendCost[std::get<1>(newComm)] += commCost; + newRecCost[std::get<2>(newComm)] += commCost; + } + + newMax = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newMax = std::max(newMax, newSendCost[proc]); + newMax = std::max(newMax, newRecCost[proc]); + } + costChange += static_cast(newMax) - static_cast(maxComm_[previousStep]); + + CostType oldSync = (maxComm_[previousStep] > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + CostType newSync = (newMax > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + + costChange += static_cast(newSync) - static_cast(oldSync); + } + + // comm cost after merged step + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newSendCost[proc] = sendCost_[proc][nextStep]; + newRecCost[proc] = recCost_[proc][nextStep]; + } + for (const KeyTriple &newComm : newCommStepsAfter) { + CostType commCost = G.VertexCommWeight(std::get<0>(newComm)) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(newComm), std::get<2>(newComm)); + newSendCost[std::get<1>(newComm)] += commCost; + newRecCost[std::get<2>(newComm)] += commCost; + } + + newMax = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newMax = std::max(newMax, newSendCost[proc]); + newMax = std::max(newMax, newRecCost[proc]); + } + costChange += static_cast(newMax) - static_cast(maxComm_[step] + maxComm_[nextStep]); + + CostType oldSync = ((maxComm_[step] > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0) + + ((maxComm_[nextStep] > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0); + CostType newSync = (newMax > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + + costChange += static_cast(newSync) - static_cast(oldSync); + + if (costChange < 0) + { + // MERGE STEPS - change schedule and update data structures + + // update assignments and compute data + for (const std::pair &nodeAndProc : newWorkSteps) { + AddRecomputeStep(schedule, nodeAndProc.first, nodeAndProc.second, step); + } + maxWork_[step] = 0; + maxWork_[nextStep] = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxWork_[step] = std::max(maxWork_[step], workCost_[proc][step]); + workCost_[proc][nextStep] = 0; + for (const VertexIdx node : nodesPerProcAndStep_[proc][nextStep]) { + auto &assignments = schedule.Assignments(node); + for (auto itr = assignments.begin(); itr != assignments.end(); ++itr) { + if (*itr == std::make_pair(proc, nextStep)) { + assignments.erase(itr); + break; + } + } + for (const VertexIdx &pred : G.Parents(node)) { + neededOnProc_[pred][proc].erase(neededOnProc_[pred][proc].lower_bound(nextStep)); + } + } + nodesPerProcAndStep_[proc][nextStep].clear(); + } + + // update comm and its data in step (imported mostly from nextStep) + for (const KeyTriple &entry : commSteps_[step]) { + neededOnProc_[std::get<0>(entry)][std::get<1>(entry)].erase(neededOnProc_[std::get<0>(entry)][std::get<1>(entry)].lower_bound(step)); } + + for (const KeyTriple &entry : commSteps_[nextStep]) { + neededOnProc_[std::get<0>(entry)][std::get<1>(entry)].erase(neededOnProc_[std::get<0>(entry)][std::get<1>(entry)].lower_bound(nextStep)); + } + + commSteps_[step].clear(); + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + sendCost_[proc][step] = 0; + recCost_[proc][step] = 0; + sendCost_[proc][nextStep] = 0; + recCost_[proc][nextStep] = 0; + } + std::set commNextSteps = commSteps_[nextStep]; + commSteps_[nextStep].clear(); + for (const KeyTriple &newComm : commNextSteps) { + AddCommStep(schedule, newComm, step); + } + + for (const KeyTriple &newComm : newCommStepsAfter) { + AddCommStep(schedule, newComm, step); + } + + maxComm_[nextStep] = 0; + + maxComm_[step] = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxComm_[step] = std::max(maxComm_[step], sendCost_[proc][step]); + maxComm_[step] = std::max(maxComm_[step], recCost_[proc][step]); + } + + // update comm and its data in step-1 + if (step > 0) { + for (const KeyTriple &newComm : newCommStepsBefore) { + AddCommStep(schedule, newComm, previousStep); + } + + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxComm_[previousStep] = std::max(maxComm_[previousStep], sendCost_[proc][previousStep]); + maxComm_[previousStep] = std::max(maxComm_[previousStep], recCost_[proc][previousStep]); + } + } + + stepRemoved[nextStep] = true; + improved = true; + } else { + break; + } + } + previousStep = step; + } + + schedule.GetCommunicationSchedule().clear(); + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + for (const KeyTriple &entry : commSteps_[step]) { + schedule.AddCommunicationScheduleEntry(entry, step); + } + } + + return improved; +} + +template +bool GreedyRecomputer::RecomputeEntireSupersteps(BspScheduleRecomp &schedule) +{ + bool improved = false; + RefreshAuxData(schedule); + + const GraphT &G = schedule.GetInstance().GetComputationalDag(); + + std::map, std::vector>> commStepPerNodeAndReceiver; + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + for (const KeyTriple &entry : commSteps_[step]) { + commStepPerNodeAndReceiver[std::make_pair(std::get<0>(entry), std::get<2>(entry))].emplace_back(std::get<1>(entry), step); } + } + + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + for (unsigned fromProc = 0; fromProc < schedule.GetInstance().NumberOfProcessors(); ++fromProc) { + for (unsigned toProc = 0; toProc < schedule.GetInstance().NumberOfProcessors(); ++toProc) { + if (fromProc == toProc) { + continue; + } - for (VertexIdx node = 0; node < n; ++node) { - for (const VertexIdx &pred : g.Parents(node)) { - for (unsigned proc = 0; proc < p; ++proc) { - firstComputable[node][proc] = std::max(firstComputable[node][proc], firstPresent[pred][proc]); + // ATTEMPT TO REPLICATE all the necessary nodes of (fromProc, step) on (toProc, step) + + // collect the nodes that would be useful to replicate (not present before, not unnecessary) + std::set newCommStepsBefore, removedCommStepsAfter; + std::set mustReplicate; + + for (const VertexIdx node : nodesPerProcAndStep_[fromProc][step]) + { + if (firstPresent_[node][toProc] <= step) { + continue; + } + mustReplicate.insert(node); + } + + std::map internalOutDegree; + for (const VertexIdx node : mustReplicate) { + internalOutDegree[node] = 0; + } + for (const VertexIdx node : mustReplicate) { + for (const VertexIdx &pred : G.Parents(node)) { + if (mustReplicate.find(pred) == mustReplicate.end()) { + continue; } + internalOutDegree[pred] += 1; + } + } + + std::set checkIfDisposable; + for (const VertexIdx node : mustReplicate) { + if (internalOutDegree.at(node) == 0) { + checkIfDisposable.insert(node); + } } + + while (!checkIfDisposable.empty()) { + const VertexIdx node = *checkIfDisposable.begin(); + checkIfDisposable.erase(checkIfDisposable.begin()); + if (neededOnProc_[node][toProc].empty()) { + mustReplicate.erase(node); + for (const VertexIdx &pred : G.Parents(node)) { + if (mustReplicate.find(pred) == mustReplicate.end()) { + continue; + } + if ((--internalOutDegree[pred]) == 0) { + checkIfDisposable.insert(pred); + } + } + } + } + + // now that mustReplicate is finalized, check types + bool typesIncompatible = false; + for (const VertexIdx node : mustReplicate) { + if (!schedule.GetInstance().IsCompatible(node, toProc)) { + typesIncompatible = true; + break; + } + } + if (typesIncompatible) { + continue; + } + + // collect new comm steps - before + for (const VertexIdx node : mustReplicate) { + for (const VertexIdx &pred : G.Parents(node)) { + if (firstPresent_[pred][toProc] <= step || mustReplicate.find(pred) != mustReplicate.end()) { + continue; + } + + unsigned sendFromProcBefore = fromProc; + for (unsigned procOffset = 0; procOffset < schedule.GetInstance().NumberOfProcessors(); ++procOffset) { + unsigned sendFromCandidate = (fromProc + procOffset) % schedule.GetInstance().NumberOfProcessors(); + if (step > 0 && firstPresent_[pred][sendFromCandidate] <= step - 1) { + sendFromProcBefore = sendFromCandidate; + break; + } + } + if (sendFromProcBefore < std::numeric_limits::max()) { + newCommStepsBefore.emplace(pred, sendFromProcBefore, toProc); + } else { + std::cout<<"ERROR: parent of replicated node not present anywhere."< &entry : commStepPerNodeAndReceiver[std::make_pair(node, toProc)]) { + removedCommStepsAfter.emplace(node, entry.first, entry.second); + } + } + + // EVALUATE COST + + int costChange = 0; + + // work cost + CostType newWorkCost = workCost_[toProc][step]; + for (const VertexIdx node : mustReplicate) { + newWorkCost += G.VertexWorkWeight(node); + } + CostType newMax = std::max(maxWork_[step], newWorkCost); + + costChange += static_cast(newMax) - static_cast(maxWork_[step]); + + // comm cost before merged step + if (step > 0) { + std::vector newSendCost(schedule.GetInstance().NumberOfProcessors()); + CostType newRecCost = recCost_[toProc][step-1]; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newSendCost[proc] = sendCost_[proc][step-1]; + } + for (const KeyTriple &newComm : newCommStepsBefore) + { + CostType commCost = G.VertexCommWeight(std::get<0>(newComm)) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(newComm), std::get<2>(newComm)); + newSendCost[std::get<1>(newComm)] += commCost; + newRecCost += commCost; + } + + newMax = std::max(maxComm_[step - 1], newRecCost); + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newMax = std::max(newMax, newSendCost[proc]); + } + costChange += static_cast(newMax) - static_cast(maxComm_[step - 1]); + + CostType oldSync = (maxComm_[step - 1] > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + CostType newSync = (newMax > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + + costChange += static_cast(newSync) - static_cast(oldSync); + } + + // comm cost after merged step + std::map> changedStepsSent; + std::map changedStepsRec; + for (const KeyTriple &newComm : removedCommStepsAfter) { + CostType commCost = G.VertexCommWeight(std::get<0>(newComm)) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(newComm), toProc); + if (changedStepsSent[std::get<2>(newComm)].find(std::get<1>(newComm)) == changedStepsSent[std::get<2>(newComm)].end()) { + changedStepsSent[std::get<2>(newComm)][std::get<1>(newComm)] = commCost; + } else { + changedStepsSent[std::get<2>(newComm)][std::get<1>(newComm)] += commCost; + } + if (changedStepsRec.find(std::get<2>(newComm)) == changedStepsRec.end()) { + changedStepsRec[std::get<2>(newComm)] = commCost; + } else { + changedStepsRec[std::get<2>(newComm)] += commCost; + } + } + for (const auto &changingStep : changedStepsRec) { + unsigned stepChanged = changingStep.first; + + std::vector newSendCost(schedule.GetInstance().NumberOfProcessors()); + CostType newRecCost = recCost_[toProc][stepChanged] - changingStep.second; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newSendCost[proc] = sendCost_[proc][stepChanged]; + } + for (const auto &procAndChange : changedStepsSent[stepChanged]) { + newSendCost[procAndChange.first] -= procAndChange.second; + } + + newMax = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + newMax = std::max(newMax, newSendCost[proc]); + } + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + if (proc == toProc) { + newMax = std::max(newMax, newRecCost); + } else { + newMax = std::max(newMax, recCost_[proc][stepChanged]); + } + } + costChange += static_cast(newMax) - static_cast(maxComm_[stepChanged]); + + CostType oldSync = (maxComm_[stepChanged] > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + CostType newSync = (newMax > 0) ? schedule.GetInstance().GetArchitecture().SynchronisationCosts() : 0; + + costChange += static_cast(newSync) - static_cast(oldSync); + } + + if (costChange < 0) { + // REPLICATE STEP IF BENEFICIAL - change schedule and update data structures + + // update assignments and compute data + for (const VertexIdx node : mustReplicate) { + AddRecomputeStep(schedule, node, toProc, step); + auto itr = commStepPerNodeAndReceiver.find(std::make_pair(node, toProc)); + if (itr != commStepPerNodeAndReceiver.end()) { + commStepPerNodeAndReceiver.erase(itr); + } + } + maxWork_[step] = std::max(maxWork_[step], workCost_[toProc][step]); + + // update comm and its data in step-1 + if (step > 0) { + for (const KeyTriple &newComm : newCommStepsBefore) { + AddCommStep(schedule, newComm, step - 1); + } + + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxComm_[step - 1] = std::max(maxComm_[step - 1], sendCost_[proc][step - 1]); + maxComm_[step - 1] = std::max(maxComm_[step - 1], recCost_[proc][step - 1]); + } + } + + // update comm and its data in later steps + for (const KeyTriple &newComm : removedCommStepsAfter) { + unsigned changingStep = std::get<2>(newComm); + RemoveCommStep(schedule, KeyTriple(std::get<0>(newComm), std::get<1>(newComm), toProc), changingStep); + } + for (const auto &stepAndChange : changedStepsRec) { + unsigned changingStep = stepAndChange.first; + maxComm_[changingStep] = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxComm_[changingStep] = std::max(maxComm_[changingStep], sendCost_[proc][changingStep]); + maxComm_[changingStep] = std::max(maxComm_[changingStep], recCost_[proc][changingStep]); + } + } + + improved = true; + } + } } + } - // Find improvement steps - bool stillImproved = true; - while (stillImproved) { - stillImproved = false; - - for (unsigned step = 0; step < s; ++step) { - std::vector toErase; - for (const KeyTriple &entry : commSteps[step]) { - const VertexIdx &node = std::get<0>(entry); - const unsigned &fromProc = std::get<1>(entry); - const unsigned &toProc = std::get<2>(entry); - - // check how much comm cost we save by removing comm schedule entry - CostType commInduced = g.VertexCommWeight(node) - * initialSchedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); - - CostType newMaxComm = 0; - for (unsigned proc = 0; proc < p; ++proc) { - if (proc == fromProc) { - newMaxComm = std::max(newMaxComm, sendCost[proc][step] - commInduced); - } else { - newMaxComm = std::max(newMaxComm, sendCost[proc][step]); - } - if (proc == toProc) { - newMaxComm = std::max(newMaxComm, recCost[proc][step] - commInduced); - } else { - newMaxComm = std::max(newMaxComm, recCost[proc][step]); - } - } - if (newMaxComm == maxComm[step]) { - continue; - } - - if (!initialSchedule.GetInstance().IsCompatible(node, toProc)) { - continue; - } - - CostType decrease = maxComm[step] - newMaxComm; - if (maxComm[step] > 0 && newMaxComm == 0) { - decrease += initialSchedule.GetInstance().GetArchitecture().SynchronisationCosts(); - } - - // check how much it would increase the work cost instead - unsigned bestStep = s; - CostType smallestIncrease = std::numeric_limits::max(); - for (unsigned compStep = firstComputable[node][toProc]; compStep <= *neededOnProc[node][toProc].begin(); - ++compStep) { - CostType increase = workCost[toProc][compStep] + g.VertexWorkWeight(node) > maxWork[compStep] - ? workCost[toProc][compStep] + g.VertexWorkWeight(node) - maxWork[compStep] - : 0; - - if (increase < smallestIncrease) { - bestStep = compStep; - smallestIncrease = increase; - } - } - - // check if this modification is beneficial - if (bestStep == s || smallestIncrease > decrease) { - continue; - } - - // execute the modification - toErase.emplace_back(entry); - outSchedule.Assignments(node).emplace_back(toProc, bestStep); - - sendCost[fromProc][step] -= commInduced; - recCost[toProc][step] -= commInduced; - maxComm[step] = newMaxComm; - - workCost[toProc][bestStep] += g.VertexWorkWeight(node); - maxWork[bestStep] += smallestIncrease; - - // update movability bounds - for (const VertexIdx &pred : g.Parents(node)) { - neededOnProc[pred][toProc].insert(bestStep); - } - - neededOnProc[node][fromProc].erase(neededOnProc[node][fromProc].lower_bound(step)); - - firstPresent[node][toProc] = bestStep; - for (const VertexIdx &succ : g.Children(node)) { - for (const VertexIdx &pred : g.Parents(node)) { - firstComputable[succ][toProc] = std::max(firstComputable[succ][toProc], firstPresent[pred][toProc]); - } - } - - stillImproved = true; - } - for (const KeyTriple &entry : toErase) { - commSteps[step].erase(entry); - } - } - } - - for (unsigned step = 0; step < s; ++step) { - for (const KeyTriple &entry : commSteps[step]) { - outSchedule.GetCommunicationSchedule().emplace(entry, step); - } - } - - outSchedule.MergeSupersteps(); + schedule.GetCommunicationSchedule().clear(); + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + for (const KeyTriple &entry : commSteps_[step]) { + schedule.AddCommunicationScheduleEntry(entry, step); + } + } - return ReturnStatus::OSP_SUCCESS; + return improved; +} + +template +bool GreedyRecomputer::BatchRemoveSteps(BspScheduleRecomp &schedule) +{ + bool improved = false; + const GraphT &G = schedule.GetInstance().GetComputationalDag(); + + // Initialize required data structures + RefreshAuxData(schedule); + + std::vector> firstComputable(schedule.GetInstance().NumberOfVertices(), std::vector(schedule.GetInstance().NumberOfProcessors(), 0U)); + for (VertexIdx node = 0; node < schedule.GetInstance().NumberOfVertices(); ++node) { + for (const VertexIdx &pred : G.Parents(node)) { + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + firstComputable[node][proc] = std::max(firstComputable[node][proc], firstPresent_[pred][proc]); + } + } + } + + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + + bool canReduce = (maxComm_[step] > 0); + while (canReduce) { + + // find processors where send/rec costs equals the maximum (so we want to remove comm steps) + canReduce = false; + std::vector sendSaturated(schedule.GetInstance().NumberOfProcessors(), false), recSaturated(schedule.GetInstance().NumberOfProcessors(), false); + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + if (sendCost_[proc][step] == maxComm_[step]) { + sendSaturated[proc] = true; + } + if (recCost_[proc][step] == maxComm_[step]) { + recSaturated[proc] = true; + } + } + + // initialize required variables + std::map, CostType> workIncreased; + std::set removedCommSteps, addedComputeSteps; + std::vector > sendCommSteps(schedule.GetInstance().NumberOfProcessors()), + recCommSteps(schedule.GetInstance().NumberOfProcessors()); + for (const KeyTriple &commStep : commSteps_[step]) { + sendCommSteps[std::get<1>(commStep)].insert(commStep); + recCommSteps[std::get<2>(commStep)].insert(commStep); + } + bool skipStep = false; + CostType workIncrease = 0; + CostType commDecrease = std::numeric_limits::max(); + + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + for (unsigned sendOrRec = 0; sendOrRec < 2; ++sendOrRec) { + + std::set *currentCommSteps; + if (sendOrRec == 0) { + if (!sendSaturated[proc]) { + continue; + } + currentCommSteps = &sendCommSteps[proc]; + } else { + if (!recSaturated[proc]) { + continue; + } + currentCommSteps = &recCommSteps[proc]; + } + + KeyTriple bestCommStep; + unsigned bestStepTarget = std::numeric_limits::max(); + CostType smallestIncrease = std::numeric_limits::max(); + for (const KeyTriple &commStep : *currentCommSteps) { + const VertexIdx node = std::get<0>(commStep); + const unsigned fromProc = std::get<1>(commStep); + const unsigned toProc = std::get<2>(commStep); + if (G.VertexCommWeight(node) == 0) { + continue; + } + if (!schedule.GetInstance().IsCompatible(node, toProc)) { + continue; + } + + for (unsigned compStep = firstComputable[node][toProc]; compStep <= *neededOnProc_[node][toProc].begin(); ++compStep) { + auto itr = workIncreased.find(std::make_pair(toProc, compStep)); + CostType assignedExtra = (itr != workIncreased.end()) ? itr->second : 0; + CostType increase = 0; + if (workCost_[toProc][compStep] + assignedExtra + G.VertexWorkWeight(node) > maxWork_[compStep]) { + increase = workCost_[toProc][compStep] + assignedExtra + G.VertexWorkWeight(node) - maxWork_[compStep]; + } + if (increase < smallestIncrease) { + smallestIncrease = increase; + bestStepTarget = compStep; + bestCommStep = commStep; + } + } + } + + // save this if this is the cheapest way to move away a comm step + if (smallestIncrease < std::numeric_limits::max()) { + const VertexIdx node = std::get<0>(bestCommStep); + const unsigned fromProc = std::get<1>(bestCommStep); + const unsigned toProc = std::get<2>(bestCommStep); + addedComputeSteps.emplace(node, toProc, bestStepTarget); + auto itr = workIncreased.find(std::make_pair(toProc, bestStepTarget)); + if (itr == workIncreased.end()) { + workIncreased[std::make_pair(toProc, bestStepTarget)] = G.VertexWorkWeight(node); + } else { + itr->second += G.VertexWorkWeight(node); + } + + sendSaturated[fromProc] = false; + recSaturated[toProc] = false; + + removedCommSteps.insert(bestCommStep); + workIncrease += smallestIncrease; + CostType commCost = schedule.GetInstance().GetComputationalDag().VertexCommWeight(node) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); + commDecrease = std::min(commDecrease, commCost); + + } else { + skipStep = true; + } + } + if (skipStep) { + // weird edge case if all comm steps have weight 0 (can be removed?) + break; + } + } + if (skipStep) { + continue; + } + + if (maxComm_[step] > 0 && commSteps_[step].size() == removedCommSteps.size()) { + commDecrease += schedule.GetInstance().GetArchitecture().SynchronisationCosts(); + } + // execute step if total work cost increase < total comm cost decrease + if (commDecrease > workIncrease) { + for (const KeyTriple &newComp : addedComputeSteps) { + const VertexIdx node = std::get<0>(newComp); + const unsigned proc = std::get<1>(newComp); + const unsigned newStep = std::get<2>(newComp); + AddRecomputeStep(schedule, node, proc, newStep); + firstPresent_[node][proc] = newStep; + maxWork_[newStep] = std::max(maxWork_[newStep], workCost_[proc][newStep]); + + for (const VertexIdx &succ : G.Children(node)) { + firstComputable[succ][proc] = 0U; + for (const VertexIdx &pred : G.Parents(succ)) { + firstComputable[succ][proc] = std::max(firstComputable[succ][proc], firstPresent_[pred][proc]); + } + } + } + for (const KeyTriple &removedComm : removedCommSteps) { + RemoveCommStep(schedule, removedComm, step); + } + maxComm_[step] = 0; + for (unsigned proc = 0; proc < schedule.GetInstance().NumberOfProcessors(); ++proc) { + maxComm_[step] = std::max(maxComm_[step], sendCost_[proc][step]); + maxComm_[step] = std::max(maxComm_[step], recCost_[proc][step]); + } + + canReduce = true; + improved = true; + } + } + } + + schedule.GetCommunicationSchedule().clear(); + for (unsigned step = 0; step < schedule.NumberOfSupersteps(); ++step) { + for (const KeyTriple &entry : commSteps_[step]) { + schedule.AddCommunicationScheduleEntry(entry, step); + } + } + + return improved; +} + +template +void GreedyRecomputer::RefreshAuxData(const BspScheduleRecomp &schedule) +{ + const VertexIdx N = schedule.GetInstance().NumberOfVertices(); + const unsigned P = schedule.GetInstance().NumberOfProcessors(); + const unsigned S = schedule.NumberOfSupersteps(); + const GraphT &G = schedule.GetInstance().GetComputationalDag(); + + workCost_.clear(); + sendCost_.clear(); + recCost_.clear(); + + workCost_.resize(P, std::vector(S, 0)); + sendCost_.resize(P, std::vector(S, 0)), + recCost_.resize(P, std::vector(S, 0)); + + firstPresent_.clear(); + firstPresent_.resize(N, std::vector(P, std::numeric_limits::max())); + + nodesPerProcAndStep_.clear(); + nodesPerProcAndStep_.resize(P, std::vector >(S)); + + neededOnProc_.clear(); + neededOnProc_.resize(N, std::vector >(P, {S})); + + maxWork_.clear(); + maxComm_.clear(); + maxWork_.resize(S, 0); + maxComm_.resize(S, 0); + + commSteps_.clear(); + commSteps_.resize(S); + + for (VertexIdx node = 0; node < N; ++node) { + for (const std::pair &procAndStep : schedule.Assignments(node)) { + const unsigned &proc = procAndStep.first; + const unsigned &step = procAndStep.second; + nodesPerProcAndStep_[proc][step].push_back(node); + workCost_[proc][step] += G.VertexWorkWeight(node); + firstPresent_[node][proc] = std::min(firstPresent_[node][proc], step); + for (VertexIdx pred : G.Parents(node)) { + neededOnProc_[pred][proc].insert(step); + } + } + } + for (const std::pair item : schedule.GetCommunicationSchedule()) { + const VertexIdx &node = std::get<0>(item.first); + const unsigned &fromProc = std::get<1>(item.first); + const unsigned &toProc = std::get<2>(item.first); + const unsigned &step = item.second; + sendCost_[fromProc][step] += G.VertexCommWeight(node) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); + recCost_[toProc][step] += G.VertexCommWeight(node) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(fromProc, toProc); + + commSteps_[step].emplace(item.first); + neededOnProc_[node][fromProc].insert(step); + firstPresent_[node][toProc] = std::min(firstPresent_[node][toProc], step + 1); + } + for (unsigned step = 0; step < S; ++step) { + for (unsigned proc = 0; proc < P; ++proc) { + maxWork_[step] = std::max(maxWork_[step], workCost_[proc][step]); + maxComm_[step] = std::max(maxComm_[step], sendCost_[proc][step]); + maxComm_[step] = std::max(maxComm_[step], recCost_[proc][step]); + } + } +} + +template +void GreedyRecomputer::AddRecomputeStep(BspScheduleRecomp &schedule, const VertexIdx node, const unsigned proc, const unsigned step) +{ + schedule.Assignments(node).emplace_back(proc, step); + nodesPerProcAndStep_[proc][step].push_back(node); + workCost_[proc][step] += schedule.GetInstance().GetComputationalDag().VertexWorkWeight(node); + firstPresent_[node][proc] = std::min(firstPresent_[node][proc], step); + for (const VertexIdx &pred : schedule.GetInstance().GetComputationalDag().Parents(node)) { + neededOnProc_[pred][proc].insert(step); + } +} + +template +void GreedyRecomputer::AddCommStep(const BspScheduleRecomp &schedule, const KeyTriple &newComm, const unsigned step) +{ + commSteps_[step].insert(newComm); + CostType commCost = schedule.GetInstance().GetComputationalDag().VertexCommWeight(std::get<0>(newComm)) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(newComm), std::get<2>(newComm)); + sendCost_[std::get<1>(newComm)][step] += commCost; + recCost_[std::get<2>(newComm)][step] += commCost; + neededOnProc_[std::get<0>(newComm)][std::get<1>(newComm)].insert(step); + unsigned &firstPres = firstPresent_[std::get<0>(newComm)][std::get<2>(newComm)]; + if (firstPres > step + 1 && firstPres <= schedule.NumberOfSupersteps()) { + auto itr = commSteps_[firstPres - 1].find(newComm); + if (itr != commSteps_[firstPres - 1].end()) { + commSteps_[firstPres - 1].erase(itr); + sendCost_[std::get<1>(newComm)][firstPres - 1] -= commCost; + recCost_[std::get<2>(newComm)][firstPres - 1] -= commCost; + neededOnProc_[std::get<0>(newComm)][std::get<1>(newComm)].erase(neededOnProc_[std::get<0>(newComm)][std::get<1>(newComm)].lower_bound(firstPres - 1)); + } + } + firstPres = std::min(firstPres, step + 1); +} + +template +void GreedyRecomputer::RemoveCommStep(const BspScheduleRecomp &schedule, const KeyTriple &removedComm, const unsigned step) +{ + neededOnProc_[std::get<0>(removedComm)][std::get<1>(removedComm)].erase(neededOnProc_[std::get<0>(removedComm)][std::get<1>(removedComm)].lower_bound(step)); + + CostType commCost = schedule.GetInstance().GetComputationalDag().VertexCommWeight(std::get<0>(removedComm)) + * schedule.GetInstance().GetArchitecture().CommunicationCosts(std::get<1>(removedComm), std::get<2>(removedComm)); + + auto itr = commSteps_[step].find(removedComm); + if (itr != commSteps_[step].end()) { + commSteps_[step].erase(itr); + } + sendCost_[std::get<1>(removedComm)][step] -= commCost; + recCost_[std::get<2>(removedComm)][step] -= commCost; } -} // namespace osp +} // namespace osp \ No newline at end of file diff --git a/tests/bsp_greedy_recomputer.cpp b/tests/bsp_greedy_recomputer.cpp index 95bcee5d..4bda2684 100644 --- a/tests/bsp_greedy_recomputer.cpp +++ b/tests/bsp_greedy_recomputer.cpp @@ -55,13 +55,13 @@ BOOST_AUTO_TEST_CASE(TestRecomputer) { BspScheduleRecomp schedule(instance1); GreedyRecomputer scheduler; - scheduler.ComputeRecompSchedule(scheduleInitCs1, schedule); + scheduler.ComputeRecompScheduleBasic(scheduleInitCs1, schedule); BOOST_CHECK(schedule.SatisfiesConstraints()); BOOST_CHECK(schedule.ComputeCosts() < scheduleInitCs1.ComputeCosts()); std::cout << "Cost decrease by greedy recomp: " << scheduleInitCs1.ComputeCosts() << " -> " << schedule.ComputeCosts() << std::endl; - // non-toy instance + // non-toy instances BspInstance instance2; instance2.SetNumberOfProcessors(4); @@ -75,22 +75,35 @@ BOOST_AUTO_TEST_CASE(TestRecomputer) { cwd = cwd.parent_path(); std::cout << cwd << std::endl; } + std::vector larger_files{"data/spaa/large/instance_CG_N24_K22_nzP0d2.hdag", + "data/spaa/large/instance_exp_N50_K12_nzP0d15.hdag", + "data/spaa/large/instance_kNN_N45_K15_nzP0d16.hdag", + "data/spaa/large/instance_spmv_N120_nzP0d18.hdag"}; + + for (std::string filename : larger_files) { + bool status = file_reader::ReadComputationalDagHyperdagFormatDB((cwd / filename).string(), instance2.GetComputationalDag()); + + BOOST_CHECK(status); + + BspSchedule scheduleInit2(instance2); + BspLocking greedy; + greedy.ComputeSchedule(scheduleInit2); + BOOST_CHECK(scheduleInit2.SatisfiesPrecedenceConstraints()); + BspScheduleCS scheduleInitCs2(scheduleInit2); + BOOST_CHECK(scheduleInitCs2.HasValidCommSchedule()); + BspScheduleCS scheduleInitCs3 = scheduleInitCs2; + + scheduler.ComputeRecompScheduleBasic(scheduleInitCs2, schedule); + BOOST_CHECK(schedule.SatisfiesConstraints()); + BOOST_CHECK(schedule.ComputeCosts() <= scheduleInitCs2.ComputeCosts()); + std::cout << "Cost decrease by greedy recomp (basic): " << scheduleInitCs2.ComputeCosts() << " -> " << schedule.ComputeCosts() + << std::endl; + + scheduler.ComputeRecompScheduleAdvanced(scheduleInitCs3, schedule); + BOOST_CHECK(schedule.SatisfiesConstraints()); + BOOST_CHECK(schedule.ComputeCosts() < scheduleInitCs3.ComputeCosts()); + std::cout << "Cost decrease by greedy recomp (advanced): " << scheduleInitCs3.ComputeCosts() << " -> " << schedule.ComputeCosts() + << std::endl; + } - bool status = file_reader::ReadComputationalDagHyperdagFormatDB((cwd / "data/spaa/tiny/instance_bicgstab.hdag").string(), - instance2.GetComputationalDag()); - - BOOST_CHECK(status); - - BspSchedule scheduleInit2(instance2); - BspLocking greedy; - greedy.ComputeSchedule(scheduleInit2); - BOOST_CHECK(scheduleInit2.SatisfiesPrecedenceConstraints()); - BspScheduleCS scheduleInitCs2(scheduleInit2); - BOOST_CHECK(scheduleInitCs2.HasValidCommSchedule()); - - scheduler.ComputeRecompSchedule(scheduleInitCs2, schedule); - BOOST_CHECK(schedule.SatisfiesConstraints()); - BOOST_CHECK(schedule.ComputeCosts() < scheduleInitCs2.ComputeCosts()); - std::cout << "Cost decrease by greedy recomp: " << scheduleInitCs2.ComputeCosts() << " -> " << schedule.ComputeCosts() - << std::endl; }