@@ -20,13 +20,11 @@ class Demultiplexer : public Operator {
2020 }
2121
2222 // Add single data input port with type T
23- add_data_port<T>();
24- data_time_tracker_ = std::set<timestamp_t >();
23+ add_data_port<T>();
2524
2625 // Add corresponding control ports (always boolean)
2726 for (size_t i = 0 ; i < num_ports; ++i) {
28- add_control_port<BooleanData>();
29- control_time_tracker_[i] = std::map<timestamp_t , bool >();
27+ add_control_port<BooleanData>();
3028 }
3129
3230 // Add output ports (same type as input)
@@ -37,143 +35,62 @@ class Demultiplexer : public Operator {
3735
3836 std::string type_name () const override { return " Demultiplexer" ; }
3937
40- size_t get_num_ports () const { return control_time_tracker_. size (); }
38+ size_t get_num_ports () const { return num_control_ports (); }
4139
42- Bytes collect () override {
43- Bytes bytes = Operator::collect (); // First collect base state
44-
45- // Serialize data time tracker
46- StateSerializer::serialize_timestamp_set (bytes, data_time_tracker_);
47-
48- // Serialize control time tracker
49- StateSerializer::serialize_port_control_map (bytes, control_time_tracker_);
50-
51- return bytes;
40+ bool equals (const Demultiplexer& other) const {
41+ return Operator::equals (other);
5242 }
53-
54- void restore (Bytes::const_iterator& it) override {
55- // First restore base state
56- Operator::restore (it);
57-
58- // Clear current state
59- data_time_tracker_.clear ();
60- control_time_tracker_.clear ();
61-
62- // Restore data time tracker
63- StateSerializer::deserialize_timestamp_set (it, data_time_tracker_);
64-
65- // Restore control time tracker
66- StateSerializer::deserialize_port_control_map (it, control_time_tracker_);
67-
68- // Validate control port count
69- StateSerializer::validate_port_count (control_time_tracker_.size (), num_control_ports (), " Control" );
43+
44+ bool operator ==(const Demultiplexer& other) const {
45+ return equals (other);
7046 }
7147
72- void reset () override {
73- Operator::reset ();
74- data_time_tracker_.clear ();
75- control_time_tracker_.clear ();
48+ bool operator !=(const Demultiplexer& other) const {
49+ return !(*this == other);
7650 }
7751
78- void receive_data (std::unique_ptr<BaseMessage> msg, size_t port_index) override {
79- auto time = msg->time ;
80- Operator::receive_data (std::move (msg), port_index);
81-
82- data_time_tracker_.insert (time);
83- }
52+ protected:
8453
85- void receive_control (std::unique_ptr<BaseMessage> msg, size_t port_index) override {
86- if (port_index >= num_control_ports ()) {
87- throw std::runtime_error (" Invalid control port index" );
88- }
89-
90- auto * ctrl_msg = dynamic_cast <const Message<BooleanData>*>(msg.get ());
91- if (!ctrl_msg) {
92- throw std::runtime_error (" Invalid control message type" );
93- }
94-
95- // Update control tracker
96- control_time_tracker_[port_index][ctrl_msg->time ] = ctrl_msg->data .value ;
97-
98- // Add message to queue
99- get_control_queue (port_index).push_back (std::move (msg));
100- control_ports_with_new_data_.insert (port_index);
101- }
102-
103- protected:
10454 void process_data () override {
105- while (true ) {
106- // Find oldest common control timestamp
107- auto common_control_time = TimestampTracker::find_oldest_common_time (control_time_tracker_);
108- if (!common_control_time) {
109- break ;
110- }
111-
112- // Clean up any old input data messages
113- auto & data_queue = get_data_queue (0 );
114- while (!data_queue.empty ()) {
115- auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
116- if (msg && msg->time < *common_control_time) {
117- data_time_tracker_.erase (msg->time );
118- data_queue.pop_front ();
119- } else {
120- break ;
121- }
122- }
123-
124- // Look for matching data message
125- bool message_found = false ;
126- if (!data_queue.empty ()) {
127- auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
128- if (msg && msg->time == *common_control_time) {
129- // Get active control ports
130- std::vector<size_t > active_ports;
131- for (size_t i = 0 ; i < num_control_ports (); ++i) {
132- if (control_time_tracker_[i].at (*common_control_time)) {
133- active_ports.push_back (i);
134- }
55+ while (true ) {
56+
57+ bool is_any_control_empty;
58+ bool are_controls_sync;
59+ do {
60+ is_any_control_empty = false ;
61+ are_controls_sync = sync_control_inputs ();
62+ for (int i=0 ; i < num_control_ports (); i++) {
63+ if (get_control_queue (i).empty ()) {
64+ is_any_control_empty = true ;
65+ break ;
13566 }
67+ }
68+ } while (!are_controls_sync && !is_any_control_empty );
13669
137- // Route message to all active ports
138- for (size_t port : active_ports) {
139- get_output_queue (port).push_back (data_queue.front ()->clone ());
140- }
141-
142- data_time_tracker_.erase (msg->time );
143- data_queue.pop_front ();
144- message_found = true ;
145- }
146- }
147-
148- clean_up_control_messages (*common_control_time);
70+ if (!are_controls_sync) return ;
14971
150- if (!message_found) {
151- break ;
152- }
153- }
154- }
155-
156- private:
157- void clean_up_control_messages (timestamp_t time) {
158- for (auto & [port, tracker] : control_time_tracker_) {
159- tracker.erase (time);
160- }
161-
162- for (size_t port = 0 ; port < num_control_ports (); ++port) {
163- auto & queue = get_control_queue (port);
164- while (!queue.empty ()) {
165- auto * msg = dynamic_cast <const Message<BooleanData>*>(queue.front ().get ());
166- if (msg && msg->time <= time) {
167- queue.pop_front ();
168- } else {
169- break ;
72+ auto & data_queue = get_data_queue (0 );
73+ if (data_queue.empty ()) return ;
74+ auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
75+ auto * ctrl_msg = dynamic_cast <const Message<BooleanData>*>(get_control_queue (0 ).front ().get ());
76+ if (msg && ctrl_msg && msg->time == ctrl_msg->time ) {
77+ for (int i = 0 ; i < num_control_ports (); i++) {
78+ ctrl_msg = dynamic_cast <const Message<BooleanData>*>(get_control_queue (i).front ().get ());
79+ if (ctrl_msg->data .value ) {
80+ get_output_queue (i).push_back (data_queue.front ()->clone ());
81+ }
82+ get_control_queue (i).pop_front ();
17083 }
84+ data_queue.pop_front ();
85+ } else if (msg && ctrl_msg && msg->time < ctrl_msg->time ) {
86+ data_queue.pop_front ();
87+ } else if (msg && ctrl_msg && ctrl_msg->time < msg->time ) {
88+ for (int i = 0 ; i < num_control_ports (); i++)
89+ get_control_queue (i).pop_front ();
90+
17191 }
17292 }
17393 }
174-
175- std::set<timestamp_t > data_time_tracker_;
176- std::map<size_t , std::map<timestamp_t , bool >> control_time_tracker_;
17794};
17895
17996// Factory functions for common configurations using PortType
0 commit comments