Skip to content

Commit 4453aa7

Browse files
committed
Add shutdown implementation
1 parent a45310f commit 4453aa7

File tree

1 file changed

+49
-14
lines changed

1 file changed

+49
-14
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::{
22
export::logs::{ExportResult, LogBatch, LogExporter},
3-
logs::LogData,
43
logs::LogRecord,
54
Resource,
65
};
76
use std::sync::mpsc::{self, SyncSender};
87
use futures_channel::oneshot;
9-
use std::borrow::Cow;
8+
use futures_util::{
9+
// future::{self, Either},
10+
{pin_mut, /*stream, StreamExt as _*/},
11+
};
12+
// use std::borrow::Cow;
1013

1114
// use futures_util::{
1215
// future::{self, Either},
@@ -153,7 +156,7 @@ impl LogProcessor for SimpleLogProcessor {
153156
/// them at a pre-configured interval.
154157
pub struct BatchLogProcessor {
155158
sender: SyncSender<BatchMessage>,
156-
handle: thread::JoinHandle<()>,
159+
handle: Mutex<Option<thread::JoinHandle<()>>>,
157160
}
158161

159162
impl Debug for BatchLogProcessor {
@@ -177,36 +180,40 @@ impl LogProcessor for BatchLogProcessor {
177180
}
178181

179182
fn force_flush(&self) -> LogResult<()> {
180-
// TODO, implement force_flush
183+
let _result = self.sender.send(BatchMessage::Flush(None));
181184
LogResult::Ok(())
182185
}
183186

184187
fn shutdown(&self) -> LogResult<()> {
188+
let (res_sender, _res_receiver) = oneshot::channel();
189+
let _result = self.sender.send(BatchMessage::Shutdown(res_sender));
185190
// TODO, implement shutdown
186191
// self.handle.join().unwrap();
187192
// let result = self.handle.join();
188193
// if let Err(err) = result {
189194
// global::handle_error(err: LogError::Other(err.into()));
190195
// }
191-
// // TODO, implement shutdown
196+
if let Some(handle) = self.handle.lock().unwrap().take() {
197+
handle.join().unwrap();
198+
}
192199
LogResult::Ok(())
193200
}
194201

195202
fn set_resource(&self, _resource: &Resource) {
196-
// TODO, implement set_resource
203+
let _result = self.sender.send(BatchMessage::SetResource(Arc::new(_resource.clone())));
197204
}
198205
}
199206

200207
impl BatchLogProcessor {
201208
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig) -> Self {
202209
let (sender, receiver) = mpsc::sync_channel(config.max_queue_size);
203210
let handle = thread::spawn(move || {
204-
let mut batch: Vec<Box<LogData>> = Vec::new();
211+
let mut logs = Vec::new();
205212
match receiver.recv() {
206-
Ok(BatchMessage::ExportLog((data, instrumentation))) => {
207-
batch.push(Box::new(LogData { record: data, instrumentation }));
213+
Ok(BatchMessage::ExportLog(data)) => {
214+
logs.push(data);
208215
}
209-
Ok(BatchMessage::Flush(sender)) => {
216+
Ok(BatchMessage::Flush(_sender)) => {
210217
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
211218
// let result = futures_executor::block_on(export);
212219
// match sender {
@@ -221,7 +228,7 @@ impl BatchLogProcessor {
221228
// }
222229
// }
223230
}
224-
Ok(BatchMessage::Shutdown(sender)) => {
231+
Ok(BatchMessage::Shutdown(_sender)) => {
225232
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
226233
// let result = futures_executor::block_on(export);
227234
// match sender {
@@ -236,12 +243,12 @@ impl BatchLogProcessor {
236243
// }
237244
// }
238245
}
239-
Ok(BatchMessage::SetResource(resource)) => {
246+
Ok(BatchMessage::SetResource(_resource)) => {
240247
// exporter.set_resource(&resource);
241248
}
242249
Err(_) => {}
243250
}
244-
let export = exporter.export(batch.into_iter().collect());
251+
let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0));
245252
let result = futures_executor::block_on(export);
246253
match result {
247254
Ok(_) => {}
@@ -250,7 +257,7 @@ impl BatchLogProcessor {
250257
});
251258

252259
// Return batch processor with link to worker
253-
BatchLogProcessor { sender, handle }
260+
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) }
254261
}
255262

256263
/// Create a new batch processor builder
@@ -265,6 +272,34 @@ impl BatchLogProcessor {
265272
}
266273
}
267274

275+
async fn export_with_timeout<E>(
276+
_time_out: Duration,
277+
exporter: &mut E,
278+
batch: Vec<(LogRecord, InstrumentationLibrary)>,
279+
) -> ExportResult
280+
where
281+
E: LogExporter + ?Sized,
282+
{
283+
if batch.is_empty() {
284+
return Ok(());
285+
}
286+
287+
// TBD - Can we avoid this conversion as it involves heap allocation with new vector?
288+
let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch
289+
.iter()
290+
.map(|log_data| (&log_data.0, &log_data.1))
291+
.collect();
292+
let _export = exporter.export(LogBatch::new(log_vec.as_slice()));
293+
// let timeout = runtime.delay(time_out);
294+
pin_mut!(_export);
295+
// pin_mut!(timeout);
296+
// match future::select(export, export).await {
297+
// Either::Left((export_res, _)) => export_res,
298+
// Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
299+
// }
300+
ExportResult::Ok(())
301+
}
302+
268303
/// Batch log processor configuration.
269304
/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
270305
#[derive(Debug)]

0 commit comments

Comments
 (0)