@@ -834,8 +834,9 @@ impl<'a> Builder<'a> {
834834 for event in events. iter_events_mut ( ) {
835835 update_runtime_schema_definition ( event, & output_id, & schema_definition_map) ;
836836 }
837- latency_recorder. on_send ( & mut events) ;
838- ( events, Instant :: now ( ) )
837+ let now = Instant :: now ( ) ;
838+ latency_recorder. on_send ( & mut events, now) ;
839+ ( events, now)
839840 } )
840841 . inspect ( move |( events, _) : & ( EventArray , Instant ) | {
841842 events_sent. emit ( CountByteSize (
@@ -881,16 +882,17 @@ async fn run_source_output_pump(
881882 send_reference,
882883 } ) = rx. next ( ) . await
883884 {
885+ // Even though we have a `send_reference` timestamp above, that reference time is when
886+ // the events were enqueued in the `SourceSender`, not when they were pulled out of the
887+ // `rx` stream on this end. Since those times can be quite different (due to blocking
888+ // inherent to the fanout send operation), we set the `last_transform_timestamp` to the
889+ // current time instead to get an accurate reference for when the events started waiting
890+ // for the first transform.
891+ let now = Instant :: now ( ) ;
884892 array. for_each_metadata_mut ( |metadata| {
885893 metadata. set_source_id ( Arc :: clone ( & source) ) ;
886894 metadata. set_source_type ( source_type) ;
887- // Even though we have a `send_reference` timestamp here, that reference time is when
888- // the events were enqueued in the `SourceSender`, not when they were pulled out of the
889- // `rx` stream on this end. Since those times can be quite different (due to blocking
890- // inherent to the fanout send operation), we set the `last_transform_timestamp` to the
891- // current time instead to get an accurate reference for when the events started waiting
892- // for the first transform.
893- metadata. set_last_transform_timestamp ( Instant :: now ( ) ) ;
895+ metadata. set_last_transform_timestamp ( now) ;
894896 } ) ;
895897 fanout
896898 . send ( array, Some ( send_reference) )
@@ -1154,7 +1156,8 @@ impl Runner {
11541156
11551157 async fn send_outputs ( & mut self , outputs_buf : & mut TransformOutputsBuf ) -> crate :: Result < ( ) > {
11561158 self . timer_tx . try_send_start_wait ( ) ;
1157- outputs_buf. for_each_array_mut ( |array| self . latency_recorder . on_send ( array) ) ;
1159+ let now = Instant :: now ( ) ;
1160+ outputs_buf. for_each_array_mut ( |array| self . latency_recorder . on_send ( array, now) ) ;
11581161 self . outputs . send ( outputs_buf) . await
11591162 }
11601163
0 commit comments