@@ -149,7 +149,7 @@ impl PeriodicReader {
149149 let exporter_arc = Arc :: new ( exporter) ;
150150 let reader = PeriodicReader {
151151 inner : Arc :: new ( PeriodicReaderInner {
152- message_sender : Arc :: new ( message_sender ) ,
152+ message_sender,
153153 producer : Mutex :: new ( None ) ,
154154 exporter : exporter_arc. clone ( ) ,
155155 } ) ,
@@ -295,7 +295,7 @@ impl fmt::Debug for PeriodicReader {
295295
296296struct PeriodicReaderInner {
297297 exporter : Arc < dyn PushMetricExporter > ,
298- message_sender : Arc < mpsc:: Sender < Message > > ,
298+ message_sender : mpsc:: Sender < Message > ,
299299 producer : Mutex < Option < Weak < dyn SdkProducer > > > ,
300300}
301301
@@ -321,15 +321,23 @@ impl PeriodicReaderInner {
321321 }
322322 }
323323
324- fn collect_and_export ( & self , _timeout : Duration ) -> MetricResult < ( ) > {
324+ fn collect_and_export ( & self , timeout : Duration ) -> MetricResult < ( ) > {
325325 // TODO: Reuse the internal vectors. Or refactor to avoid needing any
326326 // owned data structures to be passed to exporters.
327327 let mut rm = ResourceMetrics {
328328 resource : Resource :: empty ( ) ,
329329 scope_metrics : Vec :: new ( ) ,
330330 } ;
331331
332+ // Measure time taken for collect, and subtract it from the timeout.
333+ let current_time = Instant :: now ( ) ;
332334 let collect_result = self . collect ( & mut rm) ;
335+ let time_taken_for_collect = current_time. elapsed ( ) ;
336+ let _timeout = if time_taken_for_collect > timeout {
337+ Duration :: from_secs ( 0 )
338+ } else {
339+ timeout - time_taken_for_collect
340+ } ;
333341 #[ allow( clippy:: question_mark) ]
334342 if let Err ( e) = collect_result {
335343 otel_warn ! (
@@ -347,15 +355,10 @@ impl PeriodicReaderInner {
347355 let metrics_count = rm. scope_metrics . iter ( ) . fold ( 0 , |count, scope_metrics| {
348356 count + scope_metrics. metrics . len ( )
349357 } ) ;
350- otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count) ;
358+ otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count, time_taken_in_millis = time_taken_for_collect . as_millis ( ) ) ;
351359
352- // TODO: subtract the time taken for collect from the timeout. collect
353- // involves observable callbacks too, which are user defined and can
354- // take arbitrary time.
355- //
356360 // Relying on futures executor to execute async call.
357- // TODO: Add timeout and pass it to exporter or consider alternative
358- // design to enforce timeout here.
361+ // TODO: Pass timeout to exporter
359362 let exporter_result = futures_executor:: block_on ( self . exporter . export ( & mut rm) ) ;
360363 #[ allow( clippy:: question_mark) ]
361364 if let Err ( e) = exporter_result {
0 commit comments