@@ -876,7 +876,11 @@ inline void Fallbacks::replaceImpl() {
876876 impl = new FallbacksPrivatePropagator (std::move (*impl));
877877 break ;
878878 case CONNECT:
879- throw std::runtime_error (" Not yet implemented" );
879+ // For now, we only support Connecting children
880+ for (const auto & child : impl->children ())
881+ if (!dynamic_cast <Connecting*>(child.get ()))
882+ throw std::runtime_error (" CONNECT-like interface is only supported for Connecting children" );
883+ impl = new FallbacksPrivateConnect (std::move (*impl));
880884 break ;
881885 }
882886 delete pimpl_;
@@ -1017,6 +1021,65 @@ bool FallbacksPrivatePropagator::nextJob() {
10171021}
10181022
10191023
1024+ FallbacksPrivateConnect::FallbacksPrivateConnect (FallbacksPrivate&& old)
1025+ : FallbacksPrivate(std::move(old)) {
1026+ starts_ = std::make_shared<Interface>(
1027+ std::bind (&FallbacksPrivateConnect::propagateStateUpdate<Interface::FORWARD>, this , std::placeholders::_1, std::placeholders::_2));
1028+ ends_ = std::make_shared<Interface>(
1029+ std::bind (&FallbacksPrivateConnect::propagateStateUpdate<Interface::BACKWARD>, this , std::placeholders::_1, std::placeholders::_2));
1030+
1031+ FallbacksPrivateConnect::reset ();
1032+ }
1033+
1034+ void FallbacksPrivateConnect::reset () {
1035+ active_ = children ().end ();
1036+ }
1037+
1038+ template <Interface::Direction dir>
1039+ void FallbacksPrivateConnect::propagateStateUpdate (Interface::iterator external, bool updated) {
1040+ copyState<dir>(external, children ().front ()->pimpl ()->pullInterface (dir), updated);
1041+ // TODO: propagate updates to other children as well
1042+ }
1043+
1044+ bool FallbacksPrivateConnect::canCompute () const {
1045+ for (auto it=children ().begin (), end=children ().end (); it!=end; ++it)
1046+ if ((*it)->pimpl ()->canCompute ()) {
1047+ active_ = it;
1048+ return true ;
1049+ }
1050+ active_ = children ().end ();
1051+ return false ;
1052+ }
1053+
1054+ void FallbacksPrivateConnect::compute () {
1055+ // Alternatively, we could also compute() all children that canCompute()
1056+ assert (active_ != children ().end ());
1057+ (*active_)->pimpl ()->runCompute ();
1058+ }
1059+
1060+ void FallbacksPrivateConnect::onNewFailure (const Stage& child, const InterfaceState* from, const InterfaceState* to) {
1061+ // expect failure to be reported from active child
1062+ assert (active_ != children ().end () && active_->get () == &child);
1063+ // ... thus we can use std::next(active_) to find the next child
1064+ auto next = std::next (active_);
1065+
1066+ auto findIteratorFor = [](const InterfaceState* state, const Interface& interface) {
1067+ auto it = std::find (interface.begin (), interface.end (), state);
1068+ assert (it != interface.end ());
1069+ return it;
1070+ };
1071+
1072+ if (next != children ().end ()) { // pass job to next child
1073+ auto next_con = static_cast <ConnectingPrivate*>(const_cast <StagePrivate*>((*next)->pimpl ()));
1074+ auto first_con = static_cast <const ConnectingPrivate*>(children ().front ()->pimpl ());
1075+ auto fromIt = findIteratorFor (from, *first_con->starts ());
1076+ auto toIt = findIteratorFor (to, *first_con->ends ());
1077+ next_con->pending .insert (std::make_pair (fromIt, toIt));
1078+ } else // or report failure to parent
1079+ parent ()->pimpl ()->onNewFailure (*me (), from, to);
1080+ }
1081+
1082+
10201083MergerPrivate::MergerPrivate (Merger* me, const std::string& name) : ParallelContainerBasePrivate(me, name) {}
10211084
10221085void MergerPrivate::resolveInterface (InterfaceFlags expected) {
0 commit comments