@@ -148,7 +148,7 @@ impl PeriodicReader {
148148 let exporter_arc = Arc :: new ( exporter) ;
149149 let reader = PeriodicReader {
150150 inner : Arc :: new ( PeriodicReaderInner {
151- message_sender : Arc :: new ( message_sender ) ,
151+ message_sender,
152152 producer : Mutex :: new ( None ) ,
153153 exporter : exporter_arc. clone ( ) ,
154154 } ) ,
@@ -294,7 +294,7 @@ impl fmt::Debug for PeriodicReader {
294294
295295struct PeriodicReaderInner {
296296 exporter : Arc < dyn PushMetricExporter > ,
297- message_sender : Arc < mpsc:: Sender < Message > > ,
297+ message_sender : mpsc:: Sender < Message > ,
298298 producer : Mutex < Option < Weak < dyn SdkProducer > > > ,
299299}
300300
@@ -320,15 +320,23 @@ impl PeriodicReaderInner {
320320 }
321321 }
322322
323- fn collect_and_export ( & self , _timeout : Duration ) -> MetricResult < ( ) > {
323+ fn collect_and_export ( & self , timeout : Duration ) -> MetricResult < ( ) > {
324324 // TODO: Reuse the internal vectors. Or refactor to avoid needing any
325325 // owned data structures to be passed to exporters.
326326 let mut rm = ResourceMetrics {
327327 resource : Resource :: empty ( ) ,
328328 scope_metrics : Vec :: new ( ) ,
329329 } ;
330330
331+ // Measure time taken for collect, and subtract it from the timeout.
332+ let current_time = Instant :: now ( ) ;
331333 let collect_result = self . collect ( & mut rm) ;
334+ let time_taken_for_collect = current_time. elapsed ( ) ;
335+ let _timeout = if time_taken_for_collect > timeout {
336+ Duration :: from_secs ( 0 )
337+ } else {
338+ timeout - time_taken_for_collect
339+ } ;
332340 #[ allow( clippy:: question_mark) ]
333341 if let Err ( e) = collect_result {
334342 otel_warn ! (
@@ -346,15 +354,10 @@ impl PeriodicReaderInner {
346354 let metrics_count = rm. scope_metrics . iter ( ) . fold ( 0 , |count, scope_metrics| {
347355 count + scope_metrics. metrics . len ( )
348356 } ) ;
349- otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count) ;
357+ otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count, time_taken_in_millis = time_taken_for_collect . as_millis ( ) ) ;
350358
351- // TODO: subtract the time taken for collect from the timeout. collect
352- // involves observable callbacks too, which are user defined and can
353- // take arbitrary time.
354- //
355359 // Relying on futures executor to execute async call.
356- // TODO: Add timeout and pass it to exporter or consider alternative
357- // design to enforce timeout here.
360+ // TODO: Pass timeout to exporter
358361 let exporter_result = futures_executor:: block_on ( self . exporter . export ( & mut rm) ) ;
359362 #[ allow( clippy:: question_mark) ]
360363 if let Err ( e) = exporter_result {
0 commit comments