@@ -60,14 +60,17 @@ class Pipeline : public Operator {
6060 // API for configuring the pipeline
6161 void register_operator (std::shared_ptr<Operator> op) { operators_[op->id ()] = std::move (op); }
6262
63- void set_entry (const std::string& op_id, size_t port = 0 ) {
63+ void set_entry (const std::string& op_id) {
6464 auto it = operators_.find (op_id);
6565 if (it == operators_.end ()) {
6666 throw std::runtime_error (" Entry operator not found: " + op_id);
6767 }
68- entry_operator_ = it->second ;
69- entry_port_ = port;
70- RTBOT_LOG_DEBUG (" Setting entry operator: " , op_id, " -> " , port);
68+ if (it->second ->num_data_ports () >= num_data_ports ()) {
69+ entry_operator_ = it->second ;
70+ RTBOT_LOG_DEBUG (" Setting entry operator: " , op_id);
71+ } else {
72+ throw std::runtime_error (" Entry operator has less data ports that the pipeline: " + op_id);
73+ }
7174 }
7275
7376 void add_output_mapping (const std::string& op_id, size_t op_port, size_t pipeline_port) {
@@ -117,8 +120,11 @@ class Pipeline : public Operator {
117120 if (input_port_types_ != other.input_port_types_ ) return false ;
118121 if (output_port_types_!= other.output_port_types_ ) return false ;
119122 if (output_mappings_ != other.output_mappings_ ) return false ;
120- if (entry_operator_ != other.entry_operator_ ) return false ;
121- if (entry_port_ != other.entry_port_ ) return false ;
123+ if ((bool )entry_operator_ != (bool )other.entry_operator_ ) return false ;
124+ if (entry_operator_ && other.entry_operator_ ) {
125+ if (*entry_operator_ != *other.entry_operator_ )
126+ return false ;
127+ }
122128 if (operators_.size () != other.operators_ .size ()) return false ;
123129
124130 for (const auto & [key, op1] : operators_) {
@@ -141,7 +147,7 @@ class Pipeline : public Operator {
141147 }
142148
143149 protected:
144- void process_data () override {
150+ void process_data (bool debug= false ) override {
145151 // Check if we have an entry point configured
146152 if (!entry_operator_) {
147153 throw std::runtime_error (" Pipeline entry point not configured" );
@@ -153,7 +159,7 @@ class Pipeline : public Operator {
153159 while (!input_queue.empty ()) {
154160 auto & msg = input_queue.front ();
155161 entry_operator_->receive_data (msg->clone (), i);
156- entry_operator_->execute ();
162+ entry_operator_->execute (debug );
157163 input_queue.pop_front ();
158164 // Process output mappings
159165 bool was_reset = false ;
@@ -195,7 +201,6 @@ class Pipeline : public Operator {
195201 std::vector<PipelineConnection> pipeline_connections_;
196202 std::map<std::string, std::shared_ptr<Operator>> operators_;
197203 std::shared_ptr<Operator> entry_operator_;
198- size_t entry_port_;
199204 std::map<std::string, std::vector<std::pair<size_t , size_t >>> output_mappings_;
200205};
201206
0 commit comments