Skip to content

Commit 8e3bce3

Browse files
committed
Implement forceflush and shutdown on batch processor with background thread
1 parent daa694d commit 8e3bce3

File tree

2 files changed

+105
-34
lines changed

2 files changed

+105
-34
lines changed

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
2-
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
2+
use crate::{export::logs::LogExporter, Resource};
33
use opentelemetry::{
44
global,
55
logs::{LogError, LogResult},
@@ -162,7 +162,7 @@ impl Builder {
162162
}
163163

164164
/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
165-
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
165+
pub fn with_batch_exporter<T: LogExporter + 'static>(
166166
self,
167167
exporter: T,
168168
) -> Self {

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 103 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
use crate::{
2-
export::logs::{ExportResult, LogBatch, LogExporter},
2+
export::{logs::{ExportResult, LogBatch, LogExporter}},
33
logs::LogRecord,
44
Resource,
55
};
6-
use std::sync::mpsc::{self, SyncSender};
7-
use futures_channel::oneshot;
6+
use std::sync::mpsc::{self, SyncSender, RecvTimeoutError};
7+
// use futures_channel::oneshot;
88
use futures_util::{
99
// future::{self, Either},
1010
{pin_mut, /*stream, StreamExt as _*/},
1111
};
12-
// use std::borrow::Cow;
13-
1412
// use futures_util::{
1513
// future::{self, Either},
1614
// {pin_mut, stream, StreamExt as _},
@@ -29,6 +27,7 @@ use std::{
2927
str::FromStr,
3028
sync::Arc,
3129
time::Duration,
30+
time::Instant,
3231
};
3332
use std::thread;
3433

@@ -49,6 +48,14 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
4948
/// Default maximum batch size.
5049
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
5150

51+
/// Default timeout for forceflush and shutdown.
52+
const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1);
53+
const OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);
54+
55+
/// environment variable name for forceflush and shutdown timeout.
56+
const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
57+
const OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
58+
5259
/// The interface for plugging into a [`Logger`].
5360
///
5461
/// [`Logger`]: crate::logs::Logger
@@ -157,6 +164,8 @@ impl LogProcessor for SimpleLogProcessor {
157164
pub struct BatchLogProcessor {
158165
sender: SyncSender<BatchMessage>,
159166
handle: Mutex<Option<thread::JoinHandle<()>>>,
167+
forceflush_timeout: Duration,
168+
shutdown_timeout: Duration,
160169
}
161170

162171
impl Debug for BatchLogProcessor {
@@ -180,19 +189,32 @@ impl LogProcessor for BatchLogProcessor {
180189
}
181190

182191
fn force_flush(&self) -> LogResult<()> {
183-
let _result = self.sender.send(BatchMessage::Flush(None));
184-
LogResult::Ok(())
192+
let (sender, receiver) = mpsc::sync_channel(1);
193+
self.sender.try_send(BatchMessage::ForceFlush(sender))
194+
.map_err(|err| LogError::Other(err.into()))?;
195+
196+
receiver.recv_timeout(self.forceflush_timeout).map_err(|err| {
197+
if err == RecvTimeoutError::Timeout {
198+
LogError::ExportTimedOut(self.forceflush_timeout)
199+
} else {
200+
LogError::Other(err.into())
201+
}
202+
})?
185203
}
186204

187205
fn shutdown(&self) -> LogResult<()> {
188-
let (res_sender, _res_receiver) = oneshot::channel();
189-
let _result = self.sender.send(BatchMessage::Shutdown(res_sender));
190-
// TODO, implement shutdown
191-
// self.handle.join().unwrap();
192-
// let result = self.handle.join();
193-
// if let Err(err) = result {
194-
// global::handle_error(err: LogError::Other(err.into()));
195-
// }
206+
let (sender, receiver) = mpsc::sync_channel(1);
207+
self.sender.try_send(BatchMessage::Shutdown(sender))
208+
.map_err(|err| LogError::Other(err.into()))?;
209+
210+
receiver.recv_timeout(self.shutdown_timeout).map_err(|err| {
211+
if err == RecvTimeoutError::Timeout {
212+
LogError::ExportTimedOut(self.shutdown_timeout)
213+
} else {
214+
LogError::Other(err.into())
215+
}
216+
})??;
217+
196218
if let Some(handle) = self.handle.lock().unwrap().take() {
197219
handle.join().unwrap();
198220
}
@@ -208,40 +230,66 @@ impl BatchLogProcessor {
208230
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig) -> Self {
209231
let (sender, receiver) = mpsc::sync_channel(config.max_queue_size);
210232
let handle = thread::spawn(move || {
233+
let mut last_export_time = Instant::now();
234+
211235
let mut logs = Vec::new();
212236
logs.reserve(config.max_export_batch_size);
237+
213238
loop {
214-
match receiver.recv() {
239+
let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed());
240+
let remaining_time = match remaining_time_option {
241+
Some(remaining_time) => remaining_time,
242+
None => config.scheduled_delay,
243+
};
244+
245+
match receiver.recv_timeout(remaining_time) {
215246
Ok(BatchMessage::ExportLog(data)) => {
216247
logs.push(data);
217248

218-
if logs.len() == config.max_export_batch_size {
219-
let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0));
220-
let result = futures_executor::block_on(export);
221-
match result {
222-
Ok(_) => {}
223-
Err(err) => global::handle_error(err),
224-
}
225-
logs.clear();
249+
if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay {
250+
last_export_time = Instant::now();
251+
export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
226252
}
227253
}
228-
Ok(BatchMessage::Flush(_sender)) => {
229-
// TODO: Implement flush
254+
Ok(BatchMessage::ForceFlush(_sender)) => {
255+
export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
256+
257+
match _sender.send(Ok(())) {
258+
Ok(_) => {}
259+
Err(err) => global::handle_error(LogError::Other(err.into())),
260+
}
230261
}
231262
Ok(BatchMessage::Shutdown(_sender)) => {
263+
export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
232264
exporter.shutdown();
265+
match _sender.send(Ok(())) {
266+
Ok(_) => {}
267+
Err(err) => global::handle_error(LogError::Other(err.into())),
268+
}
233269
break;
234270
}
235271
Ok(BatchMessage::SetResource(resource)) => {
236272
exporter.set_resource(&resource);
237273
}
238-
Err(_) => {}
274+
Err(RecvTimeoutError::Timeout) => {
275+
export_with_timeout_sync(config.max_export_timeout, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
276+
}
277+
Err(err) => global::handle_error(LogError::Other(err.into())),
239278
}
240279
}
241280
});
242281

282+
let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME)
283+
.ok()
284+
.and_then(|v| v.parse().map(Duration::from_millis).ok())
285+
.unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT);
286+
let shutdown_timeout = env::var(OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME)
287+
.ok()
288+
.and_then(|v| v.parse().map(Duration::from_millis).ok())
289+
.unwrap_or(OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT);
290+
243291
// Return batch processor with link to worker
244-
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) }
292+
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout }
245293
}
246294

247295
/// Create a new batch processor builder
@@ -256,6 +304,30 @@ impl BatchLogProcessor {
256304
}
257305
}
258306

307+
fn export_with_timeout_sync<E>(
308+
timeout: Duration,
309+
exporter: &mut E,
310+
batch: Vec<(LogRecord, InstrumentationLibrary)>,
311+
last_export_time: &mut Instant,
312+
)
313+
where
314+
E: LogExporter + ?Sized,
315+
{
316+
*last_export_time = Instant::now();
317+
318+
if batch.is_empty() {
319+
return ();
320+
}
321+
322+
let export = export_with_timeout(timeout, exporter, batch);
323+
let result = futures_executor::block_on(export);
324+
// batch.clear();
325+
match result {
326+
Ok(_) => {}
327+
Err(err) => global::handle_error(err),
328+
};
329+
}
330+
259331
async fn export_with_timeout<E>(
260332
_time_out: Duration,
261333
exporter: &mut E,
@@ -453,11 +525,10 @@ where
453525
enum BatchMessage {
454526
/// Export logs, usually called when the log is emitted.
455527
ExportLog((LogRecord, InstrumentationLibrary)),
456-
/// Flush the current buffer to the backend, it can be triggered by
457-
/// pre configured interval or a call to `force_push` function.
458-
Flush(Option<oneshot::Sender<ExportResult>>),
528+
/// ForceFlush flush the current buffer to the backend
529+
ForceFlush(mpsc::SyncSender<ExportResult>),
459530
/// Shut down the worker thread, push all logs in buffer to the backend.
460-
Shutdown(oneshot::Sender<ExportResult>),
531+
Shutdown(mpsc::SyncSender<ExportResult>),
461532
/// Set the resource for the exporter.
462533
SetResource(Arc<Resource>),
463534
}

0 commit comments

Comments
 (0)