@@ -467,6 +467,127 @@ class BspScheduleCS : public BspSchedule<Graph_t> {
467467 }
468468 }
469469 }
470+
471+ virtual void shrinkByMergingSupersteps () override {
472+
473+ std::vector<unsigned > comm_phase_latest_dependency (this ->number_of_supersteps , 0 );
474+ std::vector<std::vector<unsigned > > first_at = getFirstPresence ();
475+
476+ for (auto const &[key, val] : commSchedule)
477+ if (this ->assignedProcessor (std::get<0 >(key)) != std::get<1 >(key))
478+ comm_phase_latest_dependency[val] = std::max (comm_phase_latest_dependency[val], first_at[std::get<0 >(key)][std::get<1 >(key)]);
479+
480+
481+ for (const auto &node : BspSchedule<Graph_t>::instance->getComputationalDag ().vertices ())
482+ for (const auto &child : BspSchedule<Graph_t>::instance->getComputationalDag ().children (node))
483+ if (this ->assignedProcessor (node) != this ->assignedProcessor (child))
484+ comm_phase_latest_dependency[this ->assignedSuperstep (child)] = std::max (comm_phase_latest_dependency[this ->assignedSuperstep (child)], first_at[node][this ->assignedProcessor (child)]);
485+
486+ std::vector<bool > comm_phase_deleted (this ->number_of_supersteps , false );
487+ for (unsigned step = this ->number_of_supersteps -1 ; step < this ->number_of_supersteps ; --step)
488+ {
489+ unsigned limit = 0 ;
490+ while (step > limit)
491+ {
492+ limit = std::max (limit, comm_phase_latest_dependency[step]);
493+ if (step > limit)
494+ {
495+ comm_phase_deleted[step] = true ;
496+ --step;
497+ }
498+ }
499+ }
500+
501+ std::vector<unsigned > new_step_index (this ->number_of_supersteps );
502+ unsigned current_index = std::numeric_limits<unsigned >::max ();
503+ for (unsigned step = 0 ; step < this ->number_of_supersteps ; ++step)
504+ {
505+ if (!comm_phase_deleted[step])
506+ current_index++;
507+
508+ new_step_index[step] = current_index;
509+ }
510+ for (const auto & node : this ->instance ->vertices ())
511+ this ->node_to_superstep_assignment [node] = new_step_index[this ->node_to_superstep_assignment [node]];
512+ for (auto &[key, val] : commSchedule)
513+ val = new_step_index[val];
514+
515+ this ->setNumberOfSupersteps (current_index+1 );
516+ }
517+
518+ // for each vertex v and processor p, find the first superstep where v is present on p by the end of the compute phase
519+ std::vector<std::vector<unsigned > > getFirstPresence () const {
520+
521+ std::vector<std::vector<unsigned > > first_at (BspSchedule<Graph_t>::instance->numberOfVertices (),
522+ std::vector<unsigned >(BspSchedule<Graph_t>::instance->numberOfProcessors (), std::numeric_limits<unsigned >::max ()));
523+
524+ for (const auto &node : BspSchedule<Graph_t>::instance->getComputationalDag ().vertices ())
525+ first_at[node][this ->assignedProcessor (node)] = this ->assignedSuperstep (node);
526+
527+ for (auto const &[key, val] : commSchedule)
528+ first_at[std::get<0 >(key)][std::get<2 >(key)] =
529+ std::min (first_at[std::get<0 >(key)][std::get<2 >(key)], val + 1 ); // TODO: replace by staleness after merge
530+
531+ return first_at;
532+ }
533+
534+ // remove unneeded comm. schedule entries - these can happen in ILPs, partial ILPs, etc.
535+ void cleanCommSchedule (){
536+
537+ // data that is already present before it arrives
538+ std::vector<std::vector<std::multiset<unsigned > > > arrives_at (BspSchedule<Graph_t>::instance->numberOfVertices (),
539+ std::vector<std::multiset<unsigned > >(BspSchedule<Graph_t>::instance->numberOfProcessors ()));
540+ for (const auto &node : BspSchedule<Graph_t>::instance->getComputationalDag ().vertices ())
541+ arrives_at[node][this ->assignedProcessor (node)].insert (this ->assignedSuperstep (node));
542+
543+ for (auto const &[key, val] : commSchedule)
544+ arrives_at[std::get<0 >(key)][std::get<2 >(key)].insert (val);
545+
546+ std::vector<KeyTriple> toErase;
547+ for (auto const &[key, val] : commSchedule)
548+ {
549+ auto itr = arrives_at[std::get<0 >(key)][std::get<2 >(key)].begin ();
550+ if (*itr < val)
551+ toErase.push_back (key);
552+ else if (*itr == val && ++itr != arrives_at[std::get<0 >(key)][std::get<2 >(key)].end () && *itr == val)
553+ {
554+ toErase.push_back (key);
555+ arrives_at[std::get<0 >(key)][std::get<2 >(key)].erase (itr);
556+ }
557+ }
558+
559+ for (const KeyTriple& key : toErase)
560+ commSchedule.erase (key);
561+
562+ // data that is not used after being sent
563+ std::vector<std::vector<std::multiset<unsigned > > > used_at (BspSchedule<Graph_t>::instance->numberOfVertices (),
564+ std::vector<std::multiset<unsigned > >(BspSchedule<Graph_t>::instance->numberOfProcessors ()));
565+ for (const auto &node : BspSchedule<Graph_t>::instance->getComputationalDag ().vertices ())
566+ for (const auto &child : BspSchedule<Graph_t>::instance->getComputationalDag ().children (node))
567+ used_at[node][this ->assignedProcessor (child)].insert (this ->assignedSuperstep (child));
568+
569+ for (auto const &[key, val] : commSchedule)
570+ used_at[std::get<0 >(key)][std::get<1 >(key)].insert (val);
571+
572+ // (need to visit cs entries in reverse superstep order here)
573+ std::vector<std::vector<KeyTriple> > entries (this ->number_of_supersteps );
574+ for (auto const &[key, val] : commSchedule)
575+ entries[val].push_back (key);
576+
577+ toErase.clear ();
578+ for (unsigned step = this ->number_of_supersteps -1 ; step < this ->number_of_supersteps ; --step)
579+ for (const KeyTriple& key : entries[step])
580+ if (used_at[std::get<0 >(key)][std::get<2 >(key)].empty () ||
581+ *used_at[std::get<0 >(key)][std::get<2 >(key)].rbegin () <= step)
582+ {
583+ toErase.push_back (key);
584+ auto itr = used_at[std::get<0 >(key)][std::get<1 >(key)].find (step);
585+ used_at[std::get<0 >(key)][std::get<1 >(key)].erase (itr);
586+ }
587+
588+ for (const KeyTriple& key : toErase)
589+ commSchedule.erase (key);
590+ }
470591};
471592
472593} // namespace osp
0 commit comments