@@ -20,13 +20,11 @@ class Demultiplexer : public Operator {
2020 }
2121
2222 // Add single data input port with type T
23- add_data_port<T>();
24- data_time_tracker_ = std::set<timestamp_t >();
23+ add_data_port<T>();
2524
2625 // Add corresponding control ports (always boolean)
2726 for (size_t i = 0 ; i < num_ports; ++i) {
28- add_control_port<BooleanData>();
29- control_time_tracker_[i] = std::map<timestamp_t , bool >();
27+ add_control_port<BooleanData>();
3028 }
3129
3230 // Add output ports (same type as input)
@@ -37,60 +35,9 @@ class Demultiplexer : public Operator {
3735
3836 std::string type_name () const override { return " Demultiplexer" ; }
3937
40- size_t get_num_ports () const { return control_time_tracker_. size (); }
38+ size_t get_num_ports () const { return num_control_ports (); }
4139
42- Bytes collect () override {
43- Bytes bytes = Operator::collect (); // First collect base state
44-
45- // Serialize data time tracker
46- StateSerializer::serialize_timestamp_set (bytes, data_time_tracker_);
47-
48- // Serialize control time tracker
49- StateSerializer::serialize_port_control_map (bytes, control_time_tracker_);
50-
51- return bytes;
52- }
53-
54- void restore (Bytes::const_iterator& it) override {
55- // First restore base state
56- Operator::restore (it);
57-
58- // Clear current state
59- data_time_tracker_.clear ();
60- control_time_tracker_.clear ();
61-
62- // Restore data time tracker
63- StateSerializer::deserialize_timestamp_set (it, data_time_tracker_);
64-
65- // Restore control time tracker
66- StateSerializer::deserialize_port_control_map (it, control_time_tracker_);
67-
68- // Validate control port count
69- StateSerializer::validate_port_count (control_time_tracker_.size (), num_control_ports (), " Control" );
70- }
71-
72- void reset () override {
73- Operator::reset ();
74- data_time_tracker_.clear ();
75- control_time_tracker_.clear ();
76- for (size_t i = 0 ; i < get_num_ports (); ++i) {
77- control_time_tracker_[i] = std::map<timestamp_t , bool >();
78- }
79- }
80-
81- timestamp_t receive_data (std::unique_ptr<BaseMessage> msg, size_t port_index) override {
82- auto time = msg->time ;
83- timestamp_t time_dequeued = Operator::receive_data (std::move (msg), port_index);
84-
85- data_time_tracker_.insert (time);
86- if (time_dequeued >= 0 ) {
87- data_time_tracker_.erase (time_dequeued);
88- }
89-
90- return time_dequeued;
91- }
92-
93- timestamp_t receive_control (std::unique_ptr<BaseMessage> msg, size_t port_index) override {
40+ void receive_control (std::unique_ptr<BaseMessage> msg, size_t port_index) override {
9441 if (port_index >= num_control_ports ()) {
9542 throw std::runtime_error (" Invalid control port index" );
9643 }
@@ -100,103 +47,62 @@ class Demultiplexer : public Operator {
10047 throw std::runtime_error (" Invalid control message type" );
10148 }
10249
103- timestamp_t time_dequeued = - 1 ;
104-
105- if ( get_control_queue (port_index). size () == max_size_per_port_) {
106- time_dequeued = get_control_queue (port_index).front ()-> time ;
50+ // Update last timestamp
51+ control_ports_[port_index]. last_timestamp = msg-> time ;
52+
53+ if ( get_control_queue (port_index).size () == max_size_per_port_) {
10754 get_control_queue (port_index).pop_front ();
108- }
109-
110- control_time_tracker_[port_index][ctrl_msg->time ] = ctrl_msg->data .value ;
111-
112- if (time_dequeued >= 0 ) {
113- auto it = control_time_tracker_.find (port_index);
114- if (it != control_time_tracker_.end ()) {
115- it->second .erase (time_dequeued);
116- }
117- }
55+ }
11856
11957 // Add message to queue
12058 get_control_queue (port_index).push_back (std::move (msg));
12159 control_ports_with_new_data_.insert (port_index);
122-
123- return time_dequeued;
12460 }
12561
126- protected:
127- void process_data () override {
128- while (true ) {
129- // Find oldest common control timestamp
130- auto common_control_time = TimestampTracker::find_oldest_common_time (control_time_tracker_);
131- if (!common_control_time) {
132- break ;
133- }
134-
135- // Clean up any old input data messages
136- auto & data_queue = get_data_queue (0 );
137- while (!data_queue.empty ()) {
138- auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
139- if (msg && msg->time < *common_control_time) {
140- data_time_tracker_.erase (msg->time );
141- data_queue.pop_front ();
142- } else {
143- break ;
144- }
145- }
62+ protected:
14663
147- // Look for matching data message
148- bool message_found = false ;
149- if (!data_queue.empty ()) {
150- auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
151- if (msg && msg->time == *common_control_time) {
152- // Get active control ports
153- std::vector<size_t > active_ports;
154- for (size_t i = 0 ; i < num_control_ports (); ++i) {
155- if (control_time_tracker_[i].at (*common_control_time)) {
156- active_ports.push_back (i);
157- }
158- }
64+ void process_data () override {
15965
160- // Route message to all active ports
161- for (size_t port : active_ports) {
162- get_output_queue (port).push_back (data_queue.front ()->clone ());
66+ while (true ) {
67+
68+ bool is_any_control_empty;
69+ bool are_controls_sync;
70+ do {
71+ is_any_control_empty = false ;
72+ are_controls_sync = sync_control_inputs ();
73+ for (int i=0 ; i < num_control_ports (); i++) {
74+ if (get_control_queue (i).empty ()) {
75+ is_any_control_empty = true ;
76+ break ;
16377 }
78+ }
79+ } while (!are_controls_sync && !is_any_control_empty );
16480
165- data_time_tracker_.erase (msg->time );
166- data_queue.pop_front ();
167- message_found = true ;
168- }
169- }
81+ if (!are_controls_sync) return ;
17082
171- clean_up_control_messages (*common_control_time);
172-
173- if (!message_found) {
174- break ;
175- }
176- }
177- }
178-
179- private:
180- void clean_up_control_messages (timestamp_t time) {
181- for (auto & [port, tracker] : control_time_tracker_) {
182- tracker.erase (time);
183- }
184-
185- for (size_t port = 0 ; port < num_control_ports (); ++port) {
186- auto & queue = get_control_queue (port);
187- while (!queue.empty ()) {
188- auto * msg = dynamic_cast <const Message<BooleanData>*>(queue.front ().get ());
189- if (msg && msg->time <= time) {
190- queue.pop_front ();
191- } else {
192- break ;
83+ auto & data_queue = get_data_queue (0 );
84+ if (data_queue.empty ()) return ;
85+ auto * msg = dynamic_cast <const Message<T>*>(data_queue.front ().get ());
86+ auto * ctrl_msg = dynamic_cast <const Message<BooleanData>*>(get_control_queue (0 ).front ().get ());
87+ if (msg && ctrl_msg && msg->time == ctrl_msg->time ) {
88+ for (int i = 0 ; i < num_control_ports (); i++) {
89+ ctrl_msg = dynamic_cast <const Message<BooleanData>*>(get_control_queue (i).front ().get ());
90+ if (ctrl_msg->data .value ) {
91+ get_output_queue (i).push_back (data_queue.front ()->clone ());
92+ }
93+ get_control_queue (i).pop_front ();
19394 }
95+ data_queue.pop_front ();
96+ } else if (msg && ctrl_msg && msg->time < ctrl_msg->time ) {
97+ data_queue.pop_front ();
98+ } else if (msg && ctrl_msg && ctrl_msg->time < msg->time ) {
99+ for (int i = 0 ; i < num_control_ports (); i++)
100+ get_control_queue (i).pop_front ();
101+
194102 }
195- }
196- }
103+ }
104+ }
197105
198- std::set<timestamp_t > data_time_tracker_;
199- std::map<size_t , std::map<timestamp_t , bool >> control_time_tracker_;
200106};
201107
202108// Factory functions for common configurations using PortType
0 commit comments