Skip to content

Commit a45310f

Browse files
committed
Commit last good log processor
1 parent 01ef3f4 commit a45310f

File tree

1 file changed

+55
-34
lines changed

1 file changed

+55
-34
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl LogProcessor for SimpleLogProcessor {
152152
/// A [`LogProcessor`] that asynchronously buffers log records and reports
153153
/// them at a pre-configured interval.
154154
pub struct BatchLogProcessor {
155-
sender: SyncSender<Box<LogData>>,
155+
sender: SyncSender<BatchMessage>,
156156
handle: thread::JoinHandle<()>,
157157
}
158158

@@ -166,14 +166,14 @@ impl Debug for BatchLogProcessor {
166166

167167
impl LogProcessor for BatchLogProcessor {
168168
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
169-
// let result = self.sender.send(BatchMessage::ExportLog((
170-
// record.clone(),
171-
// instrumentation.clone(),
172-
// )));
169+
let result = self.sender.send(BatchMessage::ExportLog((
170+
record.clone(),
171+
instrumentation.clone(),
172+
)));
173173

174-
// if let Err(err) = result {
175-
// global::handle_error(LogError::Other(err.into()));
176-
// }
174+
if let Err(err) = result {
175+
global::handle_error(LogError::Other(err.into()));
176+
}
177177
}
178178

179179
fn force_flush(&self) -> LogResult<()> {
@@ -202,17 +202,51 @@ impl BatchLogProcessor {
202202
let (sender, receiver) = mpsc::sync_channel(config.max_queue_size);
203203
let handle = thread::spawn(move || {
204204
let mut batch: Vec<Box<LogData>> = Vec::new();
205-
match receiver.try_recv() {
206-
Ok(data) => batch.push(data),
207-
// TODO: handle error
205+
match receiver.recv() {
206+
Ok(BatchMessage::ExportLog((data, instrumentation))) => {
207+
batch.push(Box::new(LogData { record: data, instrumentation }));
208+
}
209+
Ok(BatchMessage::Flush(sender)) => {
210+
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
211+
// let result = futures_executor::block_on(export);
212+
// match sender {
213+
// Some(sender) => {
214+
// let _ = sender.send(result);
215+
// }
216+
// None => {
217+
// match result {
218+
// Ok(_) => {}
219+
// Err(err) => global::handle_error(err),
220+
// }
221+
// }
222+
// }
223+
}
224+
Ok(BatchMessage::Shutdown(sender)) => {
225+
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
226+
// let result = futures_executor::block_on(export);
227+
// match sender {
228+
// Some(sender) => {
229+
// let _ = sender.send(result);
230+
// }
231+
// None => {
232+
// match result {
233+
// Ok(_) => {}
234+
// Err(err) => global::handle_error(err),
235+
// }
236+
// }
237+
// }
238+
}
239+
Ok(BatchMessage::SetResource(resource)) => {
240+
// exporter.set_resource(&resource);
241+
}
208242
Err(_) => {}
209243
}
210-
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
211-
// let result = futures_executor::block_on(export);
212-
// match result {
213-
// Ok(_) => {}
214-
// Err(err) => global::handle_error(err),
215-
// }
244+
let export = exporter.export(batch.into_iter().collect());
245+
let result = futures_executor::block_on(export);
246+
match result {
247+
Ok(_) => {}
248+
Err(err) => global::handle_error(err),
249+
}
216250
});
217251

218252
// Return batch processor with link to worker
@@ -395,19 +429,6 @@ where
395429
}
396430

397431
/// Messages sent between application thread and batch log processor's work thread.
398-
// #[allow(clippy::large_enum_variant)]
399-
// #[derive(Debug)]
400-
// enum BatchMessage {
401-
// /// Export logs, usually called when the log is emitted.
402-
// ExportLog(LogData),
403-
// /// Flush the current buffer to the backend, it can be triggered by
404-
// /// pre configured interval or a call to `force_push` function.
405-
// Flush(Option<oneshot::Sender<ExportResult>>),
406-
// /// Shut down the worker thread, push all logs in buffer to the backend.
407-
// Shutdown(oneshot::Sender<ExportResult>),
408-
// /// Set the resource for the exporter.
409-
// SetResource(Arc<Resource>),
410-
// }
411432
#[allow(clippy::large_enum_variant)]
412433
#[derive(Debug)]
413434
enum BatchMessage {
@@ -731,7 +752,7 @@ mod tests {
731752
let processor = BatchLogProcessor::new(
732753
Box::new(exporter.clone()),
733754
BatchConfig::default(),
734-
runtime::Tokio,
755+
// runtime::Tokio,
735756
);
736757

737758
//
@@ -748,7 +769,7 @@ mod tests {
748769
let processor = BatchLogProcessor::new(
749770
Box::new(exporter.clone()),
750771
BatchConfig::default(),
751-
runtime::TokioCurrentThread,
772+
// runtime::TokioCurrentThread,
752773
);
753774

754775
processor.shutdown().unwrap();
@@ -762,7 +783,7 @@ mod tests {
762783
let processor = BatchLogProcessor::new(
763784
Box::new(exporter.clone()),
764785
BatchConfig::default(),
765-
runtime::Tokio,
786+
// runtime::Tokio,
766787
);
767788

768789
processor.shutdown().unwrap();
@@ -776,7 +797,7 @@ mod tests {
776797
let processor = BatchLogProcessor::new(
777798
Box::new(exporter.clone()),
778799
BatchConfig::default(),
779-
runtime::TokioCurrentThread,
800+
// runtime::TokioCurrentThread,
780801
);
781802

782803
processor.shutdown().unwrap();

0 commit comments

Comments
 (0)