Skip to content

Commit 586591c

Browse files
committed
Flush AsyncPoolSink on the current thread if tearing down
1 parent 281f1aa commit 586591c

File tree

4 files changed

+173
-13
lines changed

4 files changed

+173
-13
lines changed

spdlog/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ tracing-appender = "=0.2.2"
8080
[build-dependencies]
8181
rustc_version = "0.4.0"
8282

83+
[[test]]
84+
name = "global_async_pool_sink"
85+
harness = false
86+
required-features = ["multi-thread"]
87+
8388
[[bench]]
8489
name = "compare_with_cpp_spdlog"
8590
harness = false

spdlog/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,13 +647,16 @@ pub fn log_crate_proxy() -> &'static LogCrateProxy {
647647
&PROXY
648648
}
649649

650+
static IS_TEARING_DOWN: AtomicBool = AtomicBool::new(false);
651+
650652
fn flush_default_logger_at_exit() {
651653
// Rust never calls `drop` for static variables.
652654
//
653655
// Setting up an exit handler gives us a chance to flush the default logger
654656
// once at the program exit, thus we don't lose the last logs.
655657

656658
extern "C" fn handler() {
659+
IS_TEARING_DOWN.store(true, Ordering::SeqCst);
657660
if let Some(default_logger) = DEFAULT_LOGGER.get() {
658661
default_logger.load().flush()
659662
}
@@ -685,6 +688,12 @@ fn flush_default_logger_at_exit() {
685688
}
686689

687690
fn default_error_handler(from: impl AsRef<str>, error: Error) {
691+
if let Error::Multiple(errs) = error {
692+
errs.into_iter()
693+
.for_each(|err| default_error_handler(from.as_ref(), err));
694+
return;
695+
}
696+
688697
let date = chrono::Local::now()
689698
.format("%Y-%m-%d %H:%M:%S.%3f")
690699
.to_string();

spdlog/src/sink/async_sink/async_pool_sink.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,18 @@ impl Sink for AsyncPoolSink {
7878
}
7979

8080
fn flush(&self) -> Result<()> {
81-
self.assign_task(Task::Flush {
82-
backend: self.clone_backend(),
83-
})
81+
if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
82+
// https://github.com/SpriteOvO/spdlog-rs/issues/64
83+
//
84+
// `crossbeam` uses thread-local internally, which is not supported in `atexit`
85+
// callback. Let's directly flush the sinks on the current thread if the program
86+
// is tearing down.
87+
self.backend.flush()
88+
} else {
89+
self.assign_task(Task::Flush {
90+
backend: self.clone_backend(),
91+
})
92+
}
8493
}
8594

8695
/// For [`AsyncPoolSink`], the function performs the same call to all
@@ -181,20 +190,20 @@ pub(crate) struct Backend {
181190
}
182191

183192
impl Backend {
184-
fn log(&self, record: &Record) {
193+
fn log(&self, record: &Record) -> Result<()> {
194+
let mut result = Ok(());
185195
for sink in &self.sinks {
186-
if let Err(err) = sink.log(record) {
187-
self.handle_error(err);
188-
}
196+
result = Error::push_result(result, sink.log(record));
189197
}
198+
result
190199
}
191200

192-
fn flush(&self) {
201+
fn flush(&self) -> Result<()> {
202+
let mut result = Ok(());
193203
for sink in &self.sinks {
194-
if let Err(err) = sink.flush() {
195-
self.handle_error(err);
196-
}
204+
result = Error::push_result(result, sink.flush());
197205
}
206+
result
198207
}
199208

200209
fn handle_error(&self, err: Error) {
@@ -219,10 +228,14 @@ impl Task {
219228
pub(crate) fn exec(self) {
220229
match self {
221230
Task::Log { backend, record } => {
222-
backend.log(&record.as_ref());
231+
if let Err(err) = backend.log(&record.as_ref()) {
232+
backend.handle_error(err)
233+
}
223234
}
224235
Task::Flush { backend } => {
225-
backend.flush();
236+
if let Err(err) = backend.flush() {
237+
backend.handle_error(err)
238+
}
226239
}
227240
}
228241
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use std::{
2+
env,
3+
fmt::Write,
4+
os::raw::c_int,
5+
process::{self, Stdio},
6+
sync::{
7+
atomic::{AtomicBool, Ordering},
8+
Arc,
9+
},
10+
};
11+
12+
use spdlog::{
13+
formatter::Formatter,
14+
prelude::*,
15+
sink::{AsyncPoolSink, Sink},
16+
ErrorHandler,
17+
};
18+
19+
static IS_FLUSHED: AtomicBool = AtomicBool::new(false);
20+
21+
struct SetFlagSink;
22+
23+
impl Sink for SetFlagSink {
24+
fn log(&self, _record: &spdlog::Record) -> error::Result<()> {
25+
Ok(())
26+
}
27+
28+
fn flush(&self) -> error::Result<()> {
29+
IS_FLUSHED.store(true, Ordering::SeqCst);
30+
Ok(())
31+
}
32+
33+
fn level_filter(&self) -> LevelFilter {
34+
LevelFilter::All
35+
}
36+
37+
fn set_level_filter(&self, _level_filter: LevelFilter) {
38+
unimplemented!()
39+
}
40+
41+
fn set_formatter(&self, _formatter: Box<dyn Formatter>) {
42+
unimplemented!()
43+
}
44+
45+
fn set_error_handler(&self, _handler: Option<ErrorHandler>) {
46+
unimplemented!()
47+
}
48+
}
49+
50+
fn run_test() {
51+
{
52+
extern "C" fn check() {
53+
// Assert that `AsyncPoolSink` in the default logger will be flushed correctly
54+
// and will not panic.
55+
assert!(IS_FLUSHED.load(Ordering::SeqCst));
56+
}
57+
// Setup `atexit` to check the flag at the end of the program
58+
extern "C" {
59+
fn atexit(cb: extern "C" fn()) -> c_int;
60+
}
61+
assert_eq!(unsafe { atexit(check) }, 0);
62+
63+
let async_pool_sink = Arc::new(
64+
AsyncPoolSink::builder()
65+
.sink(Arc::new(SetFlagSink))
66+
.build()
67+
.unwrap(),
68+
);
69+
let logger = Arc::new(
70+
Logger::builder()
71+
.sink(async_pool_sink)
72+
.level_filter(LevelFilter::All)
73+
.flush_level_filter(LevelFilter::Off)
74+
.build()
75+
.unwrap(),
76+
);
77+
spdlog::set_default_logger(logger);
78+
}
79+
80+
info!("hello async_pool_sink");
81+
}
82+
83+
fn main() {
84+
// https://github.com/SpriteOvO/spdlog-rs/issues/64
85+
86+
// This is a flaky test, it only has a certain probability of failing, so we run
87+
// it multiple times to make sure it's really working properly.
88+
{
89+
let mut captured_output = String::new();
90+
let args = env::args().collect::<Vec<_>>();
91+
// If this is the parent process (no additional arguments)
92+
if args.len() == 1 {
93+
for i in 0..1000 {
94+
let output = process::Command::new(&args[0])
95+
.arg("child")
96+
.stderr(Stdio::piped())
97+
.output()
98+
.unwrap();
99+
let success = output.status.success();
100+
101+
writeln!(
102+
captured_output,
103+
"Attempt #{i} = {}",
104+
if success { "ok" } else { "failed!" }
105+
)
106+
.unwrap();
107+
108+
if !success {
109+
eprintln!("{captured_output}");
110+
111+
let stderr = String::from_utf8_lossy(&output.stderr).lines().fold(
112+
String::new(),
113+
|mut contents, line| {
114+
writeln!(&mut contents, "> {line}").unwrap();
115+
contents
116+
},
117+
);
118+
eprintln!("stderr of the failed attempt:\n{stderr}");
119+
120+
panic!("Test failed");
121+
}
122+
}
123+
return;
124+
} else {
125+
assert_eq!(args[1], "child");
126+
}
127+
128+
// Run the test after leaving the scope, so the main function ends
129+
// without dropping additional variables, thus exiting faster. This
130+
// should increase the probability of reproducing the error.
131+
}
132+
run_test();
133+
}

0 commit comments

Comments
 (0)