-
Notifications
You must be signed in to change notification settings - Fork 599
Add force_flush to LogExporter #2276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
905ffbc
7db1bd9
9cc40c1
29f7ae6
d67c975
4ed3b73
125af29
3aa6f8d
9984bec
6974ced
9290828
f1e861e
b258265
71d4b40
2987161
9b661f4
1bf3d45
55dfd62
eae55fe
bacb628
3d02d35
b67e2b0
a154ce8
54bb9fa
86d7058
6e7e418
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,25 @@ pub trait LogExporter: Send + Sync + Debug { | |
| // By default, all logs are enabled | ||
| true | ||
| } | ||
|
|
||
| /// This is a hint to ensure that the export of any Logs the exporter | ||
| /// has received prior to the call to this function SHOULD be completed | ||
| /// as soon as possible, preferably before returning from this method. | ||
| /// | ||
| /// This function SHOULD provide a way to let the caller know | ||
| /// whether it succeeded, failed or timed out. | ||
| /// | ||
| /// This function SHOULD only be called in cases where it is absolutely necessary, | ||
| /// such as when using some FaaS providers that may suspend the process after | ||
| /// an invocation, but before the exporter exports the completed logs. | ||
| /// | ||
| /// This function SHOULD complete or abort within some timeout. This function can be | ||
| /// implemented as a blocking API or an asynchronous API which notifies the caller via | ||
| /// a callback or an event. OpenTelemetry client authors can decide if they want to | ||
| /// make the flush timeout configurable. | ||
| fn force_flush(&mut self) -> ExportResult { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. self is sufficient, no need to take mut. |
||
| Ok(()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not have a default implementation for this method. Otherwise, we could run into a situation where an exporter user goes through the documentation and calls It looks like we have a default implementation for a few other methods as well such as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise, we could run into a situation where an exporter user goes through the documentation and calls force_flush with certain expectations that haven't been agreed upon by the exporter author. If the exporter author does not provide an implementation for flush, then they can document that right? Or the doc for Provider can be updated to merely state that it'll invoke flush on the processor, and the same for processor can state they will invoke the flush for exporter. Default implementation seems reasonable to me, as not every exporter need to do something for flush. |
||
| } | ||
| /// Set the resource for the exporter. | ||
| fn set_resource(&mut self, _resource: &Resource) {} | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,7 +129,11 @@ | |
| } | ||
|
|
||
| fn force_flush(&self) -> LogResult<()> { | ||
| Ok(()) | ||
| if let Ok(mut exporter) = self.exporter.lock() { | ||
| exporter.force_flush() | ||
| } else { | ||
| Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) | ||
| } | ||
| } | ||
|
|
||
| fn shutdown(&self) -> LogResult<()> { | ||
|
|
@@ -278,7 +282,8 @@ | |
| &timeout_runtime, | ||
| logs.split_off(0), | ||
| ) | ||
| .await; | ||
| .await | ||
| .and(exporter.as_mut().force_flush()); | ||
|
||
|
|
||
| if let Some(channel) = res_channel { | ||
| if let Err(send_error) = channel.send(result) { | ||
|
|
@@ -803,6 +808,25 @@ | |
| let _ = provider.shutdown(); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn test_batch_forceflush() { | ||
| let exporter = InMemoryLogExporterBuilder::default().build(); | ||
| // TODO: Verify exporter.force_flush() is called | ||
|
|
||
| let processor = BatchLogProcessor::new( | ||
| Box::new(exporter.clone()), | ||
| BatchConfig::default(), | ||
| runtime::Tokio, | ||
| ); | ||
|
|
||
| let mut record = LogRecord::default(); | ||
| let instrumentation = InstrumentationScope::default(); | ||
|
|
||
| processor.emit(&mut record, &instrumentation); | ||
| processor.force_flush().unwrap(); | ||
| assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn test_batch_shutdown() { | ||
| // assert we will receive an error | ||
|
|
@@ -820,7 +844,6 @@ | |
| let instrumentation = InstrumentationScope::default(); | ||
|
|
||
| processor.emit(&mut record, &instrumentation); | ||
| processor.force_flush().unwrap(); | ||
| processor.shutdown().unwrap(); | ||
| // todo: expect to see errors here. How should we assert this? | ||
| processor.emit(&mut record, &instrumentation); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.