Skip to content

Commit 0ea817a

Browse files
committed
Add changelog and format
1 parent 752ea93 commit 0ea817a

File tree

3 files changed

+149
-84
lines changed

3 files changed

+149
-84
lines changed

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,7 @@ impl Builder {
194194
}
195195

196196
/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
197-
pub fn with_batch_exporter<T: LogExporter + 'static>(
198-
self,
199-
exporter: T,
200-
) -> Self {
197+
pub fn with_batch_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
201198
let batch = BatchLogProcessor::builder(exporter).build();
202199
self.with_log_processor(batch)
203200
}

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 96 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
logs::{LogError, LogRecord, LogResult},
44
Resource,
55
};
6-
use std::sync::mpsc::{self, SyncSender, RecvTimeoutError};
6+
use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
77

88
#[cfg(feature = "spec_unstable_logs_enabled")]
99
use opentelemetry::logs::Severity;
@@ -210,8 +210,12 @@ impl LogProcessor for BatchLogProcessor {
210210
return;
211211
}
212212

213-
let result = self.message_sender.
214-
try_send(BatchMessage::ExportLog(Box::new((record.clone(), instrumentation.clone()))));
213+
let result = self
214+
.message_sender
215+
.try_send(BatchMessage::ExportLog(Box::new((
216+
record.clone(),
217+
instrumentation.clone(),
218+
))));
215219

216220
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
217221
if result.is_err() {
@@ -226,33 +230,39 @@ impl LogProcessor for BatchLogProcessor {
226230

227231
fn force_flush(&self) -> LogResult<()> {
228232
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
229-
otel_warn!(
230-
name: "BatchLogProcessor.ForceFlush.ProcessorShutdown",
231-
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
232-
);
233-
return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into()));
233+
return LogResult::Err(LogError::Other(
234+
"BatchLogProcessor is already shutdown".into(),
235+
));
234236
}
235237
let (sender, receiver) = mpsc::sync_channel(1);
236-
self.message_sender.try_send(BatchMessage::ForceFlush(sender))
238+
self.message_sender
239+
.try_send(BatchMessage::ForceFlush(sender))
237240
.map_err(|err| LogError::Other(err.into()))?;
238241

239-
receiver.recv_timeout(self.forceflush_timeout).map_err(|err| {
240-
if err == RecvTimeoutError::Timeout {
241-
LogError::ExportTimedOut(self.forceflush_timeout)
242-
} else {
243-
LogError::Other(err.into())
244-
}
245-
})?
242+
receiver
243+
.recv_timeout(self.forceflush_timeout)
244+
.map_err(|err| {
245+
if err == RecvTimeoutError::Timeout {
246+
LogError::ExportTimedOut(self.forceflush_timeout)
247+
} else {
248+
LogError::Other(err.into())
249+
}
250+
})?
246251
}
247252

248253
fn shutdown(&self) -> LogResult<()> {
249254
// test and set is_shutdown flag if it is not set
250-
if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) {
255+
if self
256+
.is_shutdown
257+
.swap(true, std::sync::atomic::Ordering::Relaxed)
258+
{
251259
otel_warn!(
252260
name: "BatchLogProcessor.Shutdown.ProcessorShutdown",
253261
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
254262
);
255-
return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into()));
263+
return LogResult::Err(LogError::AlreadyShutdown(
264+
"BatchLogProcessor is already shutdown".into(),
265+
));
256266
}
257267

258268
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
@@ -267,33 +277,33 @@ impl LogProcessor for BatchLogProcessor {
267277
}
268278

269279
let (sender, receiver) = mpsc::sync_channel(1);
270-
self.message_sender.try_send(BatchMessage::Shutdown(sender))
280+
self.message_sender
281+
.try_send(BatchMessage::Shutdown(sender))
271282
.map_err(|err| LogError::Other(err.into()))?;
272283

273-
receiver.recv_timeout(self.shutdown_timeout)
284+
receiver
285+
.recv_timeout(self.shutdown_timeout)
274286
.map(|_| {
275287
// join the background thread after receiving back the shutdown signal
276288
if let Some(handle) = self.handle.lock().unwrap().take() {
277289
handle.join().unwrap();
278290
}
279291
LogResult::Ok(())
280292
})
281-
.map_err(|err| {
282-
match err {
283-
RecvTimeoutError::Timeout => {
284-
otel_error!(
285-
name: "BatchLogProcessor.Shutdown.Timeout",
286-
message = "BatchLogProcessor shutdown timing out."
287-
);
288-
LogError::ExportTimedOut(self.shutdown_timeout)
289-
}
290-
_ => {
291-
otel_error!(
292-
name: "BatchLogProcessor.Shutdown.Error",
293-
error = format!("{}", err)
294-
);
295-
LogError::Other(err.into())
296-
}
293+
.map_err(|err| match err {
294+
RecvTimeoutError::Timeout => {
295+
otel_error!(
296+
name: "BatchLogProcessor.Shutdown.Timeout",
297+
message = "BatchLogProcessor shutdown timing out."
298+
);
299+
LogError::ExportTimedOut(self.shutdown_timeout)
300+
}
301+
_ => {
302+
otel_error!(
303+
name: "BatchLogProcessor.Shutdown.Error",
304+
error = format!("{}", err)
305+
);
306+
LogError::Other(err.into())
297307
}
298308
})?
299309
}
@@ -308,8 +318,7 @@ impl LogProcessor for BatchLogProcessor {
308318

309319
impl BatchLogProcessor {
310320
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig) -> Self {
311-
let (message_sender, message_receiver) =
312-
mpsc::sync_channel(config.max_queue_size);
321+
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
313322
let max_queue_size = config.max_queue_size;
314323

315324
let handle = thread::spawn(move || {
@@ -318,7 +327,9 @@ impl BatchLogProcessor {
318327
logs.reserve(config.max_export_batch_size);
319328

320329
loop {
321-
let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed());
330+
let remaining_time_option = config
331+
.scheduled_delay
332+
.checked_sub(last_export_time.elapsed());
322333
let remaining_time = match remaining_time_option {
323334
Some(remaining_time) => remaining_time,
324335
None => config.scheduled_delay,
@@ -327,16 +338,33 @@ impl BatchLogProcessor {
327338
match message_receiver.recv_timeout(remaining_time) {
328339
Ok(BatchMessage::ExportLog(log)) => {
329340
logs.push(log);
330-
if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay {
331-
let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
341+
if logs.len() == config.max_export_batch_size
342+
|| last_export_time.elapsed() >= config.scheduled_delay
343+
{
344+
let _ = export_with_timeout_sync(
345+
remaining_time,
346+
exporter.as_mut(),
347+
logs.split_off(0),
348+
&mut last_export_time,
349+
);
332350
}
333351
}
334352
Ok(BatchMessage::ForceFlush(sender)) => {
335-
let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
353+
let result = export_with_timeout_sync(
354+
remaining_time,
355+
exporter.as_mut(),
356+
logs.split_off(0),
357+
&mut last_export_time,
358+
);
336359
let _ = sender.send(result);
337360
}
338361
Ok(BatchMessage::Shutdown(sender)) => {
339-
let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
362+
let result = export_with_timeout_sync(
363+
remaining_time,
364+
exporter.as_mut(),
365+
logs.split_off(0),
366+
&mut last_export_time,
367+
);
340368
let _ = sender.send(result);
341369

342370
//
@@ -348,14 +376,19 @@ impl BatchLogProcessor {
348376
exporter.set_resource(&resource);
349377
}
350378
Err(RecvTimeoutError::Timeout) => {
351-
let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
379+
let _ = export_with_timeout_sync(
380+
remaining_time,
381+
exporter.as_mut(),
382+
logs.split_off(0),
383+
&mut last_export_time,
384+
);
352385
}
353386
Err(err) => {
354387
otel_error!(
355388
name: "BatchLogProcessor.ReceiveError",
356389
error = format!("{}", err)
357390
);
358-
},
391+
}
359392
}
360393
}
361394
});
@@ -482,10 +515,12 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessorWithAsyncRuntime<R> {
482515
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
483516
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessorWithAsyncRuntime<R> {
484517
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
485-
let result = self.message_sender.try_send(BatchMessageWithAsyncRuntime::ExportLog((
486-
record.clone(),
487-
instrumentation.clone(),
488-
)));
518+
let result = self
519+
.message_sender
520+
.try_send(BatchMessageWithAsyncRuntime::ExportLog((
521+
record.clone(),
522+
instrumentation.clone(),
523+
)));
489524

490525
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
491526
if result.is_err() {
@@ -1015,8 +1050,7 @@ mod tests {
10151050
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
10161051
];
10171052
temp_env::with_vars(env_vars.clone(), || {
1018-
let builder =
1019-
BatchLogProcessor::builder(InMemoryLogExporter::default());
1053+
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
10201054

10211055
assert_eq!(builder.config.max_export_batch_size, 500);
10221056
assert_eq!(
@@ -1036,8 +1070,7 @@ mod tests {
10361070
env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
10371071

10381072
temp_env::with_vars(env_vars, || {
1039-
let builder =
1040-
BatchLogProcessor::builder(InMemoryLogExporter::default());
1073+
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
10411074
assert_eq!(builder.config.max_export_batch_size, 120);
10421075
assert_eq!(builder.config.max_queue_size, 120);
10431076
});
@@ -1052,8 +1085,8 @@ mod tests {
10521085
.with_max_queue_size(4)
10531086
.build();
10541087

1055-
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default())
1056-
.with_batch_config(expected);
1088+
let builder =
1089+
BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
10571090

10581091
let actual = &builder.config;
10591092
assert_eq!(actual.max_export_batch_size, 1);
@@ -1090,10 +1123,7 @@ mod tests {
10901123
let exporter = MockLogExporter {
10911124
resource: Arc::new(Mutex::new(None)),
10921125
};
1093-
let processor = BatchLogProcessor::new(
1094-
Box::new(exporter.clone()),
1095-
BatchConfig::default(),
1096-
);
1126+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
10971127
let provider = LoggerProvider::builder()
10981128
.with_log_processor(processor)
10991129
.with_resource(
@@ -1119,10 +1149,7 @@ mod tests {
11191149
let exporter = InMemoryLogExporterBuilder::default()
11201150
.keep_records_on_shutdown()
11211151
.build();
1122-
let processor = BatchLogProcessor::new(
1123-
Box::new(exporter.clone()),
1124-
BatchConfig::default(),
1125-
);
1152+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
11261153

11271154
let mut record = LogRecord::default();
11281155
let instrumentation = InstrumentationScope::default();
@@ -1162,10 +1189,7 @@ mod tests {
11621189
#[tokio::test(flavor = "current_thread")]
11631190
async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
11641191
let exporter = InMemoryLogExporterBuilder::default().build();
1165-
let processor = BatchLogProcessor::new(
1166-
Box::new(exporter.clone()),
1167-
BatchConfig::default(),
1168-
);
1192+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
11691193

11701194
//
11711195
// deadloack happens in shutdown with tokio current_thread runtime
@@ -1176,7 +1200,8 @@ mod tests {
11761200
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
11771201
#[tokio::test(flavor = "current_thread")]
11781202
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
1179-
async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread() {
1203+
async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread(
1204+
) {
11801205
let exporter = InMemoryLogExporterBuilder::default().build();
11811206
let processor = BatchLogProcessorWithAsyncRuntime::new(
11821207
Box::new(exporter.clone()),
@@ -1193,32 +1218,23 @@ mod tests {
11931218
#[tokio::test(flavor = "current_thread")]
11941219
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
11951220
let exporter = InMemoryLogExporterBuilder::default().build();
1196-
let processor = BatchLogProcessor::new(
1197-
Box::new(exporter.clone()),
1198-
BatchConfig::default(),
1199-
);
1221+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
12001222

12011223
processor.shutdown().unwrap();
12021224
}
12031225

12041226
#[tokio::test(flavor = "multi_thread")]
12051227
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
12061228
let exporter = InMemoryLogExporterBuilder::default().build();
1207-
let processor = BatchLogProcessor::new(
1208-
Box::new(exporter.clone()),
1209-
BatchConfig::default(),
1210-
);
1229+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
12111230

12121231
processor.shutdown().unwrap();
12131232
}
12141233

12151234
#[tokio::test(flavor = "multi_thread")]
12161235
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
12171236
let exporter = InMemoryLogExporterBuilder::default().build();
1218-
let processor = BatchLogProcessor::new(
1219-
Box::new(exporter.clone()),
1220-
BatchConfig::default(),
1221-
);
1237+
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
12221238

12231239
processor.shutdown().unwrap();
12241240
}

0 commit comments

Comments
 (0)