Skip to content

Commit daa694d

Browse files
committed
Batch log data
1 parent 475ab15 commit daa694d

File tree

1 file changed

+26
-48
lines changed

1 file changed

+26
-48
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -211,56 +211,34 @@ impl BatchLogProcessor {
211211
let mut logs = Vec::new();
212212
logs.reserve(config.max_export_batch_size);
213213
loop {
214-
logs.clear();
215-
match receiver.recv() {
216-
Ok(BatchMessage::ExportLog(data)) => {
217-
logs.push(data);
214+
match receiver.recv() {
215+
Ok(BatchMessage::ExportLog(data)) => {
216+
logs.push(data);
217+
218+
if logs.len() == config.max_export_batch_size {
219+
let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0));
220+
let result = futures_executor::block_on(export);
221+
match result {
222+
Ok(_) => {}
223+
Err(err) => global::handle_error(err),
224+
}
225+
logs.clear();
226+
}
227+
}
228+
Ok(BatchMessage::Flush(_sender)) => {
229+
// TODO: Implement flush
230+
}
231+
Ok(BatchMessage::Shutdown(_sender)) => {
232+
exporter.shutdown();
233+
break;
234+
}
235+
Ok(BatchMessage::SetResource(resource)) => {
236+
exporter.set_resource(&resource);
237+
}
238+
Err(_) => {}
218239
}
219-
Ok(BatchMessage::Flush(_sender)) => {
220-
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
221-
// let result = futures_executor::block_on(export);
222-
// match sender {
223-
// Some(sender) => {
224-
// let _ = sender.send(result);
225-
// }
226-
// None => {
227-
// match result {
228-
// Ok(_) => {}
229-
// Err(err) => global::handle_error(err),
230-
// }
231-
// }
232-
// }
233-
}
234-
Ok(BatchMessage::Shutdown(_sender)) => {
235-
exporter.shutdown();
236-
break;
237-
// let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect());
238-
// let result = futures_executor::block_on(export);
239-
// match sender {
240-
// Some(sender) => {
241-
// let _ = sender.send(result);
242-
// }
243-
// None => {
244-
// match result {
245-
// Ok(_) => {}
246-
// Err(err) => global::handle_error(err),
247-
// }
248-
// }
249-
// }
250-
}
251-
Ok(BatchMessage::SetResource(_resource)) => {
252-
// exporter.set_resource(&resource);
253-
}
254-
Err(_) => {}
255240
}
256-
257-
let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0));
258-
let result = futures_executor::block_on(export);
259-
match result {
260-
Ok(_) => {}
261-
Err(err) => global::handle_error(err),
262-
}
263-
}});
241+
});
264242

265243
// Return batch processor with link to worker
266244
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) }

0 commit comments

Comments
 (0)