@@ -40,7 +40,7 @@ void Connection_register_downstream(Connection *self, Port *port) {
4040/**
4141 * @brief Recursively walks down connection graph and copies value into Input ports and triggers reactions.
4242 */
43- void LogicalConnection_trigger_downstreams (Connection * self , const void * value , size_t value_size ) {
43+ void LogicalConnection_trigger_downstreams (Connection * self , tag_t intended_tag , const void * value , size_t value_size ) {
4444 LF_DEBUG (CONN , "Triggering downstreams of %p with value %p" , self , value );
4545 for (size_t i = 0 ; i < self -> downstreams_registered ; i ++ ) {
4646 Port * down = self -> downstreams [i ];
@@ -54,19 +54,20 @@ void LogicalConnection_trigger_downstreams(Connection *self, const void *value,
5454 // "last write wins"
5555 if (!down -> super .is_present ) {
5656 down -> super .prepare (& down -> super , NULL );
57+ down -> intended_tag = intended_tag ;
5758 }
5859 }
5960
6061 for (size_t i = 0 ; i < down -> conns_out_registered ; i ++ ) {
6162 LF_DEBUG (CONN , "Found further downstream connection %p to recurse down" , down -> conns_out [i ]);
62- down -> conns_out [i ]-> trigger_downstreams (down -> conns_out [i ], value , value_size );
63+ down -> conns_out [i ]-> trigger_downstreams (down -> conns_out [i ], intended_tag , value , value_size );
6364 }
6465 }
6566}
6667
6768void Connection_ctor (Connection * self , TriggerType type , Reactor * parent , Port * * downstreams , size_t num_downstreams ,
6869 EventPayloadPool * payload_pool , void (* prepare )(Trigger * , Event * ), void (* cleanup )(Trigger * ),
69- void (* trigger_downstreams )(Connection * , const void * , size_t )) {
70+ void (* trigger_downstreams )(Connection * , tag_t , const void * , size_t )) {
7071
7172 self -> upstream = NULL ;
7273 self -> downstreams_size = num_downstreams ;
@@ -100,7 +101,7 @@ void DelayedConnection_prepare(Trigger *trigger, Event *event) {
100101 trigger -> is_present = true;
101102 sched -> register_for_cleanup (sched , trigger );
102103
103- LogicalConnection_trigger_downstreams (& self -> super , event -> super .payload , pool -> payload_size );
104+ LogicalConnection_trigger_downstreams (& self -> super , event -> intended_tag , event -> super .payload , pool -> payload_size );
104105 validate (pool -> free (pool , event -> super .payload ) == LF_OK );
105106}
106107
@@ -123,7 +124,7 @@ void DelayedConnection_cleanup(Trigger *trigger) {
123124 if (self -> type == PHYSICAL_CONNECTION ) {
124125 base_tag .time = env -> get_physical_time (env );
125126 } else {
126- base_tag = sched -> current_tag ( sched ) ;
127+ base_tag = self -> intended_tag ;
127128 }
128129 tag_t tag = lf_delay_tag (base_tag , self -> delay );
129130 Event event = EVENT_INIT (tag , & self -> super .super , self -> staged_payload_ptr );
@@ -132,7 +133,8 @@ void DelayedConnection_cleanup(Trigger *trigger) {
132133 }
133134}
134135
135- void DelayedConnection_trigger_downstreams (Connection * _self , const void * value , size_t value_size ) {
136+ void DelayedConnection_trigger_downstreams (Connection * _self , tag_t intended_tag , const void * value ,
137+ size_t value_size ) {
136138 DelayedConnection * self = (DelayedConnection * )_self ;
137139 assert (value );
138140 assert (value_size > 0 );
@@ -149,6 +151,7 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value,
149151 return ;
150152 }
151153 }
154+ self -> intended_tag = intended_tag ;
152155 memcpy (self -> staged_payload_ptr , value , value_size );
153156 sched -> register_for_cleanup (sched , & _self -> super );
154157}
0 commit comments