Skip to content

Commit 96c08c2

Browse files
committed
configured the shutdown timeout
1 parent f6827ca commit 96c08c2

File tree

1 file changed

+136
-21
lines changed

1 file changed

+136
-21
lines changed

tracing-appender/src/non_blocking.rs

Lines changed: 136 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
//! tracing_appender::non_blocking(std::io::stdout())
2020
//! # }
2121
//! ```
22-
//! [builder]: NonBlockingBuilder::default
22+
//! [builder]: NonBlockingBuilder::default()
2323
//!
2424
//! <br/> This function returns a tuple of `NonBlocking` and `WorkerGuard`.
2525
//! `NonBlocking` implements [`MakeWriter`] which integrates with `tracing_subscriber`.
@@ -40,8 +40,8 @@
4040
//! ``` rust
4141
//! # fn docs() {
4242
//! let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
43-
//! let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
44-
//! tracing::subscriber::with_default(subscriber.finish(), || {
43+
//! let collector = tracing_subscriber::fmt().with_writer(non_blocking);
44+
//! tracing::collect::with_default(collector.finish(), || {
4545
//! tracing::event!(tracing::Level::INFO, "Hello");
4646
//! });
4747
//! # }
@@ -91,8 +91,8 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
9191
/// fn main () {
9292
/// # fn doc() {
9393
/// let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
94-
/// let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
95-
/// tracing::subscriber::with_default(subscriber.finish(), || {
94+
/// let collector = tracing_subscriber::fmt().with_writer(non_blocking);
95+
/// tracing::collect::with_default(collector.finish(), || {
9696
/// // Emit some tracing events within context of the non_blocking `_guard` and tracing subscriber
9797
/// tracing::event!(tracing::Level::INFO, "Hello");
9898
/// });
@@ -103,22 +103,23 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
103103
#[must_use]
104104
#[derive(Debug)]
105105
pub struct WorkerGuard {
106-
_guard: Option<JoinHandle<()>>,
106+
handle: Option<JoinHandle<()>>,
107107
sender: Sender<Msg>,
108108
shutdown: Sender<()>,
109+
shutdown_timeout: Duration,
109110
}
110111

111112
/// A non-blocking writer.
112113
///
113114
/// While the line between "blocking" and "non-blocking" IO is fuzzy, writing to a file is typically
114-
/// considered to be a _blocking_ operation. For an application whose `Subscriber` writes spans and events
115+
/// considered to be a _blocking_ operation. For an application whose `Collector` writes spans and events
115116
/// as they are emitted, an application might find the latency profile to be unacceptable.
116117
/// `NonBlocking` moves the writing out of an application's data path by sending spans and events
117118
/// to a dedicated logging thread.
118119
///
119120
/// This struct implements [`MakeWriter`][make_writer] from the `tracing-subscriber`
120121
/// crate. Therefore, it can be used with the [`tracing_subscriber::fmt`][fmt] module
121-
/// or with any other subscriber/layer implementation that uses the `MakeWriter` trait.
122+
/// or with any other collector/subscriber implementation that uses the `MakeWriter` trait.
122123
///
123124
/// [make_writer]: tracing_subscriber::fmt::MakeWriter
124125
/// [fmt]: mod@tracing_subscriber::fmt
@@ -144,7 +145,7 @@ impl NonBlocking {
144145
/// The returned `NonBlocking` writer will have the [default configuration][default] values.
145146
/// Other configurations can be specified using the [builder] interface.
146147
///
147-
/// [default]: NonBlockingBuilder::default
148+
/// [default]: NonBlockingBuilder::default()
148149
/// [builder]: NonBlockingBuilder
149150
pub fn new<T: Write + Send + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
150151
NonBlockingBuilder::default().finish(writer)
@@ -155,6 +156,7 @@ impl NonBlocking {
155156
buffered_lines_limit: usize,
156157
is_lossy: bool,
157158
thread_name: String,
159+
shutdown_timeout: Duration,
158160
) -> (NonBlocking, WorkerGuard) {
159161
let (sender, receiver) = bounded(buffered_lines_limit);
160162

@@ -165,6 +167,7 @@ impl NonBlocking {
165167
worker.worker_thread(thread_name),
166168
sender.clone(),
167169
shutdown_sender,
170+
shutdown_timeout,
168171
);
169172

170173
(
@@ -192,6 +195,7 @@ pub struct NonBlockingBuilder {
192195
buffered_lines_limit: usize,
193196
is_lossy: bool,
194197
thread_name: String,
198+
shutdown_timeout: Duration,
195199
}
196200

197201
impl NonBlockingBuilder {
@@ -227,8 +231,20 @@ impl NonBlockingBuilder {
227231
self.buffered_lines_limit,
228232
self.is_lossy,
229233
self.thread_name,
234+
self.shutdown_timeout,
230235
)
231236
}
237+
238+
/// Sets the timeout for shutdown of the worker thread.
239+
///
240+
/// This is the maximum amount of time the main thread will wait
241+
/// for the worker thread to finish proccessing pending logs during shutdown
242+
///
243+
/// The default timeout is 1 second.
244+
pub fn shutdown_timeout(mut self, timeout: Duration) -> NonBlockingBuilder {
245+
self.shutdown_timeout = timeout;
246+
self
247+
}
232248
}
233249

234250
impl Default for NonBlockingBuilder {
@@ -237,6 +253,7 @@ impl Default for NonBlockingBuilder {
237253
buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
238254
is_lossy: true,
239255
thread_name: "tracing-appender".to_string(),
256+
shutdown_timeout: Duration::from_secs(1),
240257
}
241258
}
242259
}
@@ -276,32 +293,51 @@ impl<'a> MakeWriter<'a> for NonBlocking {
276293
}
277294

278295
impl WorkerGuard {
279-
fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
296+
fn new(
297+
handle: JoinHandle<()>,
298+
sender: Sender<Msg>,
299+
shutdown: Sender<()>,
300+
shutdown_timeout: Duration
301+
) -> Self {
280302
WorkerGuard {
281-
_guard: Some(handle),
303+
handle: Some(handle),
282304
sender,
283305
shutdown,
306+
shutdown_timeout,
284307
}
285308
}
286309
}
287310

288311
impl Drop for WorkerGuard {
289312
fn drop(&mut self) {
290-
match self
291-
.sender
292-
.send_timeout(Msg::Shutdown, Duration::from_millis(100))
293-
{
313+
let timeout = Duration::from_millis(100);
314+
match self.sender.send_timeout(Msg::Shutdown, timeout) {
294315
Ok(_) => {
295316
// Attempt to wait for `Worker` to flush all messages before dropping. This happens
296317
// when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
297318
// so that drop is not blocked indefinitely.
298319
// TODO: Make timeout configurable.
299-
let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
320+
match self.shutdown.send_timeout((), self.shutdown_timeout) {
321+
Err(SendTimeoutError::Timeout(_)) => {
322+
eprintln!(
323+
"Shutting down logging worker timed out after {:?}.",
324+
self.shutdown_timeout
325+
);
326+
}
327+
_ => {
328+
// At this point it is safe to wait for `Worker` destruction without blocking
329+
if let Some(handle) = self.handle.take() {
330+
if handle.join().is_err() {
331+
eprintln!("Logging worker thread panicked");
332+
}
333+
};
334+
}
335+
}
300336
}
301337
Err(SendTimeoutError::Disconnected(_)) => (),
302-
Err(SendTimeoutError::Timeout(e)) => println!(
303-
"Failed to send shutdown signal to logging worker. Error: {:?}",
304-
e
338+
Err(SendTimeoutError::Timeout(_)) => eprintln!(
339+
"Sending shutdown signal to logging worker timed out after {:?}",
340+
timeout
305341
),
306342
}
307343
}
@@ -472,8 +508,8 @@ mod test {
472508
for _ in 0..10 {
473509
let cloned_non_blocking = non_blocking.clone();
474510
join_handles.push(thread::spawn(move || {
475-
let subscriber = tracing_subscriber::fmt().with_writer(cloned_non_blocking);
476-
tracing::subscriber::with_default(subscriber.finish(), || {
511+
let collector = tracing_subscriber::fmt().with_writer(cloned_non_blocking);
512+
tracing::collect::with_default(collector.finish(), || {
477513
tracing::event!(tracing::Level::INFO, "Hello");
478514
});
479515
}));
@@ -493,4 +529,83 @@ mod test {
493529
assert_eq!(10, hello_count);
494530
assert_eq!(0, error_count.dropped_lines());
495531
}
532+
533+
#[test]
534+
fn shutdown_completes_after_timeout() {
535+
// Modify the MockWriter with a delay on every write operation
536+
struct DelayedMockWriter {
537+
tx: mpsc::SyncSender<String>,
538+
delay: Duration,
539+
}
540+
541+
impl DelayedMockWriter {
542+
fn new(capacity: usize, delay: Duration) -> (Self, mpsc::Receiver<String>) {
543+
let (tx, rx) = mpsc::sync_channel(capacity);
544+
(Self {tx, delay}, rx)
545+
}
546+
}
547+
548+
impl std::io::Write for DelayedMockWriter {
549+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
550+
thread::sleep(self.delay);
551+
552+
let buf_len = buf.len();
553+
let _ = self.tx.send(String::from_utf8_lossy(buf).to_string());
554+
Ok(buf_len)
555+
}
556+
557+
fn flush(&mut self) -> std::io::Result<()> {
558+
Ok(())
559+
}
560+
}
561+
562+
let delay = Duration::from_millis(100);
563+
let (mock_writer, rx) = DelayedMockWriter::new(5, delay);
564+
565+
// configure the non-blocking writer to wait for at most 200ms during shutdown
566+
let timeout = Duration::from_millis(200);
567+
568+
let (mut non_blocking, guard) = NonBlockingBuilder::default()
569+
.shutdown_timeout(timeout)
570+
.finish(mock_writer);
571+
572+
// write 10 messages each takes ~1 second so exceeding the timeout
573+
for i in 0..10 {
574+
non_blocking.write_all(format!("Message {}\n", i).as_bytes()).unwrap();
575+
}
576+
577+
let start = std::time::Instant::now();
578+
drop(guard);
579+
let elapsed = start.elapsed();
580+
581+
assert!(elapsed >= timeout);
582+
assert!(elapsed < timeout + Duration::from_millis(100));
583+
584+
while rx.recv_timeout(Duration::from_millis(10)).is_ok() {}
585+
}
586+
587+
#[test]
588+
fn shutdown_completes_before_timeout() {
589+
let (mock_writer, rx) = MockWriter::new(10);
590+
let short_timeout = Duration::from_millis(100);
591+
592+
let (mut non_blocking, guard) = NonBlockingBuilder::default()
593+
.shutdown_timeout(short_timeout)
594+
.finish(mock_writer);
595+
596+
for i in 0..3 {
597+
non_blocking.write_all(format!("Message {}\n", i).as_bytes()).unwrap();
598+
}
599+
600+
for _ in 0..3 {
601+
rx.recv_timeout(Duration::from_millis(50)).unwrap();
602+
}
603+
604+
let start = std::time::Instant::now();
605+
drop(guard);
606+
let elapsed = start.elapsed();
607+
608+
assert!(elapsed <= short_timeout);
609+
assert!(elapsed < Duration::from_millis(50));
610+
}
496611
}

0 commit comments

Comments
 (0)