@@ -91,25 +91,50 @@ where
9191 }
9292}
9393
94- /// A [MetricReader] that continuously collects and exports metrics at a set
95- /// interval.
94+ /// A `MetricReader` that periodically collects and exports metrics at a configurable interval.
9695///
97- /// By default, `PeriodicReader` will collect and export metrics every 60
98- /// seconds. The export time is not counted towards the interval between
99- /// attempts. `PeriodicReader` itself does not enforce a timeout. Instead, the
100- /// timeout is passed on to the configured exporter for each export attempt.
96+ /// By default, [`PeriodicReader`] collects and exports metrics every **60 seconds**.
97+ /// The time taken for export is **not** included in the interval. Use [`PeriodicReaderBuilder`]
98+ /// to customize the interval.
10199///
102- /// `PeriodicReader` spawns a background thread to handle the periodic
103- /// collection and export of metrics. The background thread will continue to run
104- /// until `shutdown()` is called.
100+ /// [`PeriodicReader`] spawns a background thread to handle metric collection and export.
101+ /// This thread remains active until [`shutdown()`] is called.
105102///
106- /// When using this reader with the OTLP Exporter, the following exporter
107- /// features are supported:
108- /// - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
109- /// runtime.
110- /// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
103+ /// ## Collection Process
104+ /// "Collection" refers to gathering aggregated metrics from the SDK's internal storage.
105+ /// During this phase, callbacks from observable instruments are also triggered.
111106///
112- /// In other words, other clients like `reqwest` and `hyper` are not supported.
107+ /// [`PeriodicReader`] does **not** enforce a timeout for collection. If an
108+ /// observable callback takes too long, it may delay the next collection cycle.
109+ /// If a callback never returns, it **will stall** all metric collection (and exports)
110+ /// indefinitely.
111+ ///
112+ /// ## Exporter Compatibility
113+ /// When used with the [`OTLP Exporter`](https://docs.rs/opentelemetry-otlp), the following
114+ /// transport options are supported:
115+ ///
116+ /// - **`grpc-tonic`**: Requires [`MeterProvider`] to be initialized within a `tokio` runtime.
117+ /// - **`reqwest-blocking-client`**: Works with both a standard (`main`) function and `tokio::main`.
118+ ///
119+ /// [`PeriodicReader`] does **not** enforce a timeout for exports either. Instead,
120+ /// the configured exporter is responsible for enforcing timeouts. If an export operation
121+ /// never returns, [`PeriodicReader`] will **stop exporting new metrics**, stalling
122+ /// metric collection.
123+ ///
124+ /// ## Manual Export & Shutdown
125+ /// Users can manually trigger an export via [`force_flush()`]. Calling [`shutdown()`]
126+ /// exports any remaining metrics and should be done before application exit to ensure
127+ /// all data is sent.
128+ ///
129+ /// **Warning**: If using **tokio’s current-thread runtime**, calling [`shutdown()`]
130+ /// from the main thread may cause a deadlock. To prevent this, call [`shutdown()`]
131+ /// from a separate thread or use tokio's `spawn_blocking`.
132+ ///
133+ /// [`PeriodicReader`]: crate::metrics::PeriodicReader
134+ /// [`PeriodicReaderBuilder`]: crate::metrics::PeriodicReaderBuilder
135+ /// [`MeterProvider`]: crate::metrics::SdkMeterProvider
136+ /// [`shutdown()`]: crate::metrics::SdkMeterProvider::shutdown
137+ /// [`force_flush()`]: crate::metrics::SdkMeterProvider::force_flush
113138///
114139/// # Example
115140///
@@ -175,10 +200,36 @@ impl PeriodicReader {
175200 otel_debug ! (
176201 name: "PeriodReaderThreadExportingDueToFlush"
177202 ) ;
178- if let Err ( _e) = cloned_reader. collect_and_export ( timeout) {
179- response_sender. send ( false ) . unwrap ( ) ;
180- } else {
181- response_sender. send ( true ) . unwrap ( ) ;
203+
204+ let export_result = cloned_reader. collect_and_export ( timeout) ;
205+ otel_debug ! (
206+ name: "PeriodReaderInvokedExport" ,
207+ export_result = format!( "{:?}" , export_result)
208+ ) ;
209+
210+ // If response_sender is disconnected, we can't send
211+ // the result back. This occurs when the thread that
212+ // initiated flush gave up due to timeout.
213+ // Gracefully handle that with internal logs. The
214+ // internal errors are of Info level, as this is
215+ // useful for user to know whether the flush was
216+ // successful or not, when flush() itself merely
217+ // tells that it timed out.
218+
219+ if export_result. is_err ( ) {
220+ if response_sender. send ( false ) . is_err ( ) {
221+ otel_info ! (
222+ name: "PeriodReader.Flush.ResponseSendError" ,
223+ message = "PeriodicReader's flush has failed, but unable to send this info back to caller.
224+ This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
225+ ) ;
226+ }
227+ } else if response_sender. send ( true ) . is_err ( ) {
228+ otel_info ! (
229+ name: "PeriodReader.Flush.ResponseSendError" ,
230+ message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller.
231+ This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
232+ ) ;
182233 }
183234
184235 // Adjust the remaining interval after the flush
@@ -207,15 +258,39 @@ impl PeriodicReader {
207258 // Perform final export and break out of loop and exit the thread
208259 otel_debug ! ( name: "PeriodReaderThreadExportingDueToShutdown" ) ;
209260 let export_result = cloned_reader. collect_and_export ( timeout) ;
261+ otel_debug ! (
262+ name: "PeriodReaderInvokedExport" ,
263+ export_result = format!( "{:?}" , export_result)
264+ ) ;
210265 let shutdown_result = exporter_arc. shutdown ( ) ;
211266 otel_debug ! (
212267 name: "PeriodReaderInvokedExporterShutdown" ,
213268 shutdown_result = format!( "{:?}" , shutdown_result)
214269 ) ;
270+
271+ // If response_sender is disconnected, we can't send
272+ // the result back. This occurs when the thread that
273+ // initiated shutdown gave up due to timeout.
274+ // Gracefully handle that with internal logs and
275+ // continue with shutdown (i.e exit thread) The
276+ // internal errors are of Info level, as this is
277+ // useful for user to know whether the shutdown was
278+ // successful or not, when shutdown() itself merely
279+ // tells that it timed out.
215280 if export_result. is_err ( ) || shutdown_result. is_err ( ) {
216- response_sender. send ( false ) . unwrap ( ) ;
217- } else {
218- response_sender. send ( true ) . unwrap ( ) ;
281+ if response_sender. send ( false ) . is_err ( ) {
282+ otel_info ! (
283+ name: "PeriodReaderThreadShutdown.ResponseSendError" ,
284+ message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller.
285+ This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
286+ ) ;
287+ }
288+ } else if response_sender. send ( true ) . is_err ( ) {
289+ otel_info ! (
290+ name: "PeriodReaderThreadShutdown.ResponseSendError" ,
291+ message = "PeriodicReader completed its shutdown, but unable to send this info back to caller.
292+ This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
293+ ) ;
219294 }
220295
221296 otel_debug ! (
@@ -230,11 +305,11 @@ impl PeriodicReader {
230305 name: "PeriodReaderThreadExportingDueToTimer"
231306 ) ;
232307
233- if let Err ( _e ) = cloned_reader. collect_and_export ( timeout) {
234- otel_debug ! (
235- name: "PeriodReaderThreadExportingDueToTimerFailed"
236- ) ;
237- }
308+ let export_result = cloned_reader. collect_and_export ( timeout) ;
309+ otel_debug ! (
310+ name: "PeriodReaderInvokedExport" ,
311+ export_result = format! ( "{:?}" , export_result )
312+ ) ;
238313
239314 let time_taken_for_export = export_start. elapsed ( ) ;
240315 if time_taken_for_export > interval {
@@ -365,17 +440,7 @@ impl PeriodicReaderInner {
365440
366441 // Relying on futures executor to execute async call.
367442 // TODO: Pass timeout to exporter
368- let exporter_result = futures_executor:: block_on ( self . exporter . export ( & mut rm) ) ;
369- #[ allow( clippy:: question_mark) ]
370- if let Err ( e) = exporter_result {
371- otel_warn ! (
372- name: "PeriodReaderExportError" ,
373- error = format!( "{:?}" , e)
374- ) ;
375- return Err ( e) ;
376- }
377-
378- Ok ( ( ) )
443+ futures_executor:: block_on ( self . exporter . export ( & mut rm) )
379444 }
380445
381446 fn force_flush ( & self ) -> MetricResult < ( ) > {
0 commit comments