@@ -126,6 +126,7 @@ class BspScheduleRecomp : public IBspScheduleEval<GraphT> {
126126 VertexIdx GetTotalAssignments () const ;
127127
128128 void MergeSupersteps ();
129+ void CleanSchedule ();
129130};
130131
131132template <typename GraphT>
@@ -324,4 +325,103 @@ void BspScheduleRecomp<GraphT>::MergeSupersteps() {
324325 numberOfSupersteps_ = currentStepIdx;
325326}
326327
328+ // remove unneeded comm. schedule entries - these can happen in several algorithms
329+ template <typename Graph_t>
330+ void BspScheduleRecomp<Graph_t>::CleanSchedule()
331+ {
332+ // I. Data that is already present before it arrives
333+ std::vector<std::vector<std::multiset<unsigned >>> arrivesAt (instance_->NumberOfVertices (),
334+ std::vector<std::multiset<unsigned >>(instance_->NumberOfProcessors ()));
335+ for (const auto &node : instance_->GetComputationalDag ().Vertices ()) {
336+ for (const auto &procAndStep : nodeToProcessorAndSupertepAssignment_[node]) {
337+ arrivesAt[node][procAndStep.first ].insert (procAndStep.second );
338+ }
339+ }
340+
341+ for (auto const &[key, val] : commSchedule_) {
342+ arrivesAt[std::get<0 >(key)][std::get<2 >(key)].insert (val);
343+ }
344+
345+ // - computation steps
346+ for (const auto &node : instance_->GetComputationalDag ().Vertices ()) {
347+ for (unsigned index = 0 ; index < nodeToProcessorAndSupertepAssignment_[node].size (); ) {
348+ const auto &procAndStep = nodeToProcessorAndSupertepAssignment_[node][index];
349+ if (*arrivesAt[node][procAndStep.first ].begin () < procAndStep.second ) {
350+ nodeToProcessorAndSupertepAssignment_[node][index] = nodeToProcessorAndSupertepAssignment_[node].back ();
351+ nodeToProcessorAndSupertepAssignment_[node].pop_back ();
352+ } else {
353+ ++index;
354+ }
355+ }
356+ }
357+
358+ // - communication steps
359+ std::vector<KeyTriple> toErase;
360+ for (auto const &[key, val] : commSchedule_) {
361+ auto itr = arrivesAt[std::get<0 >(key)][std::get<2 >(key)].begin ();
362+ if (*itr < val) {
363+ toErase.push_back (key);
364+ } else if (*itr == val && ++itr != arrivesAt[std::get<0 >(key)][std::get<2 >(key)].end () && *itr == val) {
365+ toErase.push_back (key);
366+ arrivesAt[std::get<0 >(key)][std::get<2 >(key)].erase (itr);
367+ }
368+ }
369+
370+ for (const KeyTriple &key : toErase) {
371+ commSchedule_.erase (key);
372+ }
373+
374+ // II. Data that is not used after being computed/sent
375+ std::vector<std::vector<std::multiset<unsigned >>> usedAt (instance_->NumberOfVertices (),
376+ std::vector<std::multiset<unsigned >>(instance_->NumberOfProcessors ()));
377+ for (const auto &node : instance_->GetComputationalDag ().Vertices ()) {
378+ for (const auto &child : instance_->GetComputationalDag ().Children (node)) {
379+ for (const auto &procAndStep : nodeToProcessorAndSupertepAssignment_[child]) {
380+ usedAt[node][procAndStep.first ].insert (procAndStep.second );
381+ }
382+ }
383+ }
384+
385+ for (auto const &[key, val] : commSchedule_) {
386+ usedAt[std::get<0 >(key)][std::get<1 >(key)].insert (val);
387+ }
388+
389+ // - computation steps
390+ for (const auto &node : instance_->GetComputationalDag ().Vertices ()) {
391+ for (unsigned index = 0 ; index < nodeToProcessorAndSupertepAssignment_[node].size (); ) {
392+ const auto &procAndStep = nodeToProcessorAndSupertepAssignment_[node][index];
393+ if ((usedAt[node][procAndStep.first ].empty () || *usedAt[node][procAndStep.first ].rbegin () < procAndStep.second )
394+ && index > 0 )
395+ {
396+ nodeToProcessorAndSupertepAssignment_[node][index] = nodeToProcessorAndSupertepAssignment_[node].back ();
397+ nodeToProcessorAndSupertepAssignment_[node].pop_back ();
398+ } else {
399+ ++index;
400+ }
401+ }
402+ }
403+
404+ // - communication steps (need to visit cs entries in reverse superstep order here)
405+ std::vector<std::vector<KeyTriple>> entries (numberOfSupersteps_);
406+ for (auto const &[key, val] : commSchedule_) {
407+ entries[val].push_back (key);
408+ }
409+
410+ toErase.clear ();
411+ for (unsigned step = numberOfSupersteps_ - 1 ; step < numberOfSupersteps_; --step) {
412+ for (const KeyTriple &key : entries[step]) {
413+ if (usedAt[std::get<0 >(key)][std::get<2 >(key)].empty ()
414+ || *usedAt[std::get<0 >(key)][std::get<2 >(key)].rbegin () <= step) {
415+ toErase.push_back (key);
416+ auto itr = usedAt[std::get<0 >(key)][std::get<1 >(key)].find (step);
417+ usedAt[std::get<0 >(key)][std::get<1 >(key)].erase (itr);
418+ }
419+ }
420+ }
421+
422+ for (const KeyTriple &key : toErase) {
423+ commSchedule_.erase (key);
424+ }
425+ }
426+
327427} // namespace osp
0 commit comments