Skip to content

Commit 5102d16

Browse files
authored
Merge branch 'main' into log-attributes-update
2 parents 7c08092 + ad88615 commit 5102d16

File tree

17 files changed

+274
-48
lines changed

17 files changed

+274
-48
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ body:
2828
- type: textarea
2929
id: sdk-version
3030
attributes:
31-
label: label: OpenTelemetry SDK Version (i.e version of `opentelemetry_sdk` crate)
31+
label: OpenTelemetry SDK Version (i.e version of `opentelemetry_sdk` crate)
3232
description: What version of the `opentelemetry_sdk` crate are you using?
3333
placeholder: 0.x, 1.x, etc.
3434
validations:

opentelemetry-otlp/tests/integration_test/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ reqwest-client = ["opentelemetry-otlp/reqwest-client", "opentelemetry-otlp/http-
2929
reqwest-blocking-client = ["opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace","opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"]
3030
tonic-client = ["opentelemetry-otlp/grpc-tonic", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"]
3131
internal-logs = ["opentelemetry-otlp/internal-logs"]
32+
experimental_metadata_attributes = ["opentelemetry-appender-tracing/experimental_metadata_attributes"]
3233

3334
# Keep tonic as the default client
3435
default = ["tonic-client", "internal-logs"]

opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use anyhow::Result;
2-
use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs};
2+
use opentelemetry_proto::tonic::{
3+
common::v1::KeyValue,
4+
logs::v1::{LogRecord, LogsData, ResourceLogs},
5+
};
36
use std::fs::File;
47

58
// Given two ResourceLogs, assert that they are equal except for the timestamps
@@ -75,8 +78,22 @@ impl PartialEq for LogRecordWrapper {
7578
a.severity_text, b.severity_text,
7679
"severity_text does not match"
7780
);
81+
let a_attrs = a.attributes.clone();
82+
#[cfg(feature = "experimental_metadata_attributes")]
83+
let a_attrs: Vec<_> = a_attrs
84+
.into_iter()
85+
.filter(|KeyValue { key, .. }| !key.as_str().starts_with("code."))
86+
.collect();
87+
88+
let b_attrs = b.attributes.clone();
89+
#[cfg(feature = "experimental_metadata_attributes")]
90+
let b_attrs: Vec<_> = b_attrs
91+
.into_iter()
92+
.filter(|KeyValue { key, .. }| !key.as_str().starts_with("code."))
93+
.collect();
94+
7895
assert_eq!(a.body, b.body, "body does not match");
79-
assert_eq!(a.attributes, b.attributes, "attributes do not match");
96+
assert_eq!(a_attrs, b_attrs, "attributes do not match");
8097
assert_eq!(
8198
a.dropped_attributes_count, b.dropped_attributes_count,
8299
"dropped_attributes_count does not match"

opentelemetry-otlp/tests/integration_test/tests/logs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ mod logtests {
255255
// current thread
256256
#[test]
257257
#[cfg(feature = "reqwest-blocking-client")]
258+
#[cfg(not(feature = "tonic-client"))]
258259
pub fn logs_batch_non_tokio_main_with_init_logs_outside_rt() -> Result<()> {
259260
logs_non_tokio_helper(false, false)
260261
}
@@ -295,7 +296,8 @@ mod logtests {
295296
// Client - Reqwest-blocking
296297
#[test]
297298
#[cfg(feature = "reqwest-blocking-client")]
298-
pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt_blocking() -> Result<()> {
299+
#[cfg(not(feature = "tonic-client"))]
300+
pub fn logs_simple_non_tokio_main_with_init_logs_outside_rt_blocking() -> Result<()> {
299301
logs_non_tokio_helper(true, false)
300302
}
301303

@@ -309,7 +311,7 @@ mod logtests {
309311
feature = "tonic-client",
310312
feature = "reqwest-client"
311313
))]
312-
pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt() -> Result<()> {
314+
pub fn logs_simple_non_tokio_main_with_init_logs_outside_rt() -> Result<()> {
313315
logs_non_tokio_helper(true, false)
314316
}
315317

opentelemetry-otlp/tests/integration_test/tests/logs_serialize_deserialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub async fn test_logs() -> Result<()> {
3737
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
3838
}
3939

40-
let _ = logger_provider.shutdown();
40+
logger_provider.shutdown().unwrap();
4141
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
4242
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?;
4343
Ok(())

opentelemetry-sdk/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ internal-logs = ["tracing"]
5656
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
5757
spec_unstable_metrics_views = ["metrics"]
5858
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
59+
experimental_logs_concurrent_log_processor = ["logs"]
5960
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]
6061
experimental_metrics_disable_name_validation = ["metrics"]
6162

@@ -88,6 +89,11 @@ harness = false
8889
name = "log_processor"
8990
harness = false
9091

92+
[[bench]]
93+
name = "log_enabled"
94+
harness = false
95+
required-features = ["spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"]
96+
9197
[[bench]]
9298
name = "tracer_creation"
9399
harness = false
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
The benchmark results:
3+
criterion = "0.5.1"
4+
Hardware: Apple M4 Pro
5+
Total Number of Cores:   14 (10 performance and 4 efficiency)
6+
| Test | Average time|
7+
|---------------------------------------------|-------------|
8+
| exporter_disabled_concurrent_processor | 1.9 ns |
9+
| exporter_disabled_simple_processor | 5.0 ns |
10+
*/
11+
12+
use criterion::{criterion_group, criterion_main, Criterion};
13+
use opentelemetry::logs::{Logger, LoggerProvider};
14+
use opentelemetry_sdk::error::OTelSdkResult;
15+
use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor;
16+
use opentelemetry_sdk::logs::{
17+
LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor,
18+
};
19+
use opentelemetry_sdk::Resource;
20+
#[cfg(not(target_os = "windows"))]
21+
use pprof::criterion::{Output, PProfProfiler};
22+
23+
#[derive(Debug)]
24+
struct NoopExporter;
25+
impl LogExporter for NoopExporter {
26+
async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult {
27+
Ok(())
28+
}
29+
30+
fn shutdown(&self) -> OTelSdkResult {
31+
Ok(())
32+
}
33+
34+
fn event_enabled(
35+
&self,
36+
_level: opentelemetry::logs::Severity,
37+
_target: &str,
38+
_name: Option<&str>,
39+
) -> bool {
40+
false
41+
}
42+
43+
fn set_resource(&mut self, _: &Resource) {}
44+
}
45+
46+
fn benchmark_exporter_enabled_false<T>(c: &mut Criterion, name: &str, processor: T)
47+
where
48+
T: LogProcessor + Send + Sync + 'static,
49+
{
50+
let provider = SdkLoggerProvider::builder()
51+
.with_log_processor(processor)
52+
.build();
53+
let logger = provider.logger("test_logger");
54+
55+
c.bench_function(name, |b| {
56+
b.iter(|| {
57+
logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name"));
58+
});
59+
});
60+
}
61+
62+
fn criterion_benchmark(c: &mut Criterion) {
63+
let processor = SimpleConcurrentLogProcessor::new(NoopExporter);
64+
benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor);
65+
let simple = SimpleLogProcessor::new(NoopExporter);
66+
benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple);
67+
}
68+
69+
#[cfg(not(target_os = "windows"))]
70+
criterion_group! {
71+
name = benches;
72+
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
73+
targets = criterion_benchmark
74+
}
75+
#[cfg(target_os = "windows")]
76+
criterion_group! {
77+
name = benches;
78+
config = Criterion::default();
79+
targets = criterion_benchmark
80+
}
81+
criterion_main!(benches);

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,8 @@ impl Default for BatchConfigBuilder {
614614
/// * `OTEL_BLRP_SCHEDULE_DELAY`
615615
/// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
616616
/// * `OTEL_BLRP_EXPORT_TIMEOUT`
617+
///
618+
/// Note: Programmatic configuration overrides any value set via the environment variable.
617619
fn default() -> Self {
618620
BatchConfigBuilder {
619621
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
@@ -630,7 +632,11 @@ impl BatchConfigBuilder {
630632
/// Set max_queue_size for [`BatchConfigBuilder`].
631633
/// It's the maximum queue size to buffer logs for delayed processing.
632634
/// If the queue gets full it will drop the logs.
633-
/// The default value of is 2048.
635+
/// The default value is 2048.
636+
///
637+
/// Corresponding environment variable: `OTEL_BLRP_MAX_QUEUE_SIZE`.
638+
///
639+
/// Note: Programmatically setting this will override any value set via the environment variable.
634640
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
635641
self.max_queue_size = max_queue_size;
636642
self
@@ -639,6 +645,10 @@ impl BatchConfigBuilder {
639645
/// Set scheduled_delay for [`BatchConfigBuilder`].
640646
/// It's the delay interval in milliseconds between two consecutive processing of batches.
641647
/// The default value is 1000 milliseconds.
648+
///
649+
/// Corresponding environment variable: `OTEL_BLRP_SCHEDULE_DELAY`.
650+
///
651+
/// Note: Programmatically setting this will override any value set via the environment variable.
642652
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
643653
self.scheduled_delay = scheduled_delay;
644654
self
@@ -647,6 +657,10 @@ impl BatchConfigBuilder {
647657
/// Set max_export_timeout for [`BatchConfigBuilder`].
648658
/// It's the maximum duration to export a batch of data.
649659
/// The default value is 30000 milliseconds.
660+
///
661+
/// Corresponding environment variable: `OTEL_BLRP_EXPORT_TIMEOUT`.
662+
///
663+
/// Note: Programmatically setting this will override any value set via the environment variable.
650664
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
651665
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
652666
self.max_export_timeout = max_export_timeout;
@@ -658,6 +672,10 @@ impl BatchConfigBuilder {
658672
/// more than one batch worth of logs then it processes multiple batches
659673
/// of logs one batch after the other without any delay.
660674
/// The default value is 512.
675+
///
676+
/// Corresponding environment variable: `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`.
677+
///
678+
/// Note: Programmatically setting this will override any value set via the environment variable.
661679
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
662680
self.max_export_batch_size = max_export_batch_size;
663681
self
@@ -774,6 +792,27 @@ mod tests {
774792
);
775793
}
776794

795+
#[test]
796+
fn test_code_based_config_overrides_env_vars() {
797+
let env_vars = vec![
798+
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
799+
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
800+
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
801+
];
802+
803+
temp_env::with_vars(env_vars, || {
804+
let config = BatchConfigBuilder::default()
805+
.with_max_queue_size(2048)
806+
.with_scheduled_delay(Duration::from_millis(1000))
807+
.with_max_export_batch_size(512)
808+
.build();
809+
810+
assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
811+
assert_eq!(config.max_queue_size, 2048);
812+
assert_eq!(config.max_export_batch_size, 512);
813+
});
814+
}
815+
777816
#[test]
778817
fn test_batch_config_configurable_by_env_vars() {
779818
let env_vars = vec![
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use opentelemetry::{otel_info, InstrumentationScope};
2+
3+
use crate::error::OTelSdkResult;
4+
5+
use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord};
6+
7+
/// A concurrent log processor calls exporter's export method on each emit. This
8+
/// processor does not buffer logs. Note: This invokes exporter's export method
9+
/// on the current thread without synchronization. i.e multiple export() calls
10+
/// can happen simultaneously from different threads. This is not a problem if
11+
/// the exporter is designed to handle that. As of now, exporters in the
12+
/// opentelemetry-rust project (stdout/otlp) are not thread-safe.
13+
/// This is intended to be used when exporting to operating system
14+
/// tracing facilities like Windows ETW, Linux TracePoints etc.
15+
#[derive(Debug)]
16+
pub struct SimpleConcurrentLogProcessor<T: LogExporter> {
17+
exporter: T,
18+
}
19+
20+
impl<T: LogExporter> SimpleConcurrentLogProcessor<T> {
21+
/// Creates a new `ConcurrentExportProcessor` with the given exporter.
22+
pub fn new(exporter: T) -> Self {
23+
Self { exporter }
24+
}
25+
}
26+
27+
impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> {
28+
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
29+
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
30+
let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
31+
if let Err(err) = result {
32+
otel_info!(
33+
name: "SimpleConcurrentLogProcessor.Emit.ExportError",
34+
error = format!("{}",err)
35+
);
36+
}
37+
}
38+
39+
fn force_flush(&self) -> OTelSdkResult {
40+
// TODO: invoke flush on exporter
41+
// once https://github.com/open-telemetry/opentelemetry-rust/issues/2261
42+
// is resolved
43+
Ok(())
44+
}
45+
46+
fn shutdown(&self) -> OTelSdkResult {
47+
self.exporter.shutdown()
48+
}
49+
50+
#[cfg(feature = "spec_unstable_logs_enabled")]
51+
fn event_enabled(
52+
&self,
53+
level: opentelemetry::logs::Severity,
54+
target: &str,
55+
name: Option<&str>,
56+
) -> bool {
57+
self.exporter.event_enabled(level, target, name)
58+
}
59+
}

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,15 +200,22 @@ impl LoggerProviderBuilder {
200200
LoggerProviderBuilder { processors, ..self }
201201
}
202202

203-
/// Adds a [BatchLogProcessor] with the configured exporter to the pipeline.
203+
/// Adds a [BatchLogProcessor] with the configured exporter to the pipeline,
204+
/// using the default [super::BatchConfig].
205+
///
206+
/// The following environment variables can be used to configure the batching configuration:
207+
///
208+
/// * `OTEL_BLRP_SCHEDULE_DELAY` - Corresponds to `with_scheduled_delay`.
209+
/// * `OTEL_BLRP_MAX_QUEUE_SIZE` - Corresponds to `with_max_queue_size`.
210+
/// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` - Corresponds to `with_max_export_batch_size`.
204211
///
205212
/// # Arguments
206213
///
207-
/// * `exporter` - The exporter to be used by the BatchLogProcessor.
214+
/// * `exporter` - The exporter to be used by the `BatchLogProcessor`.
208215
///
209216
/// # Returns
210217
///
211-
/// A new `Builder` instance with the BatchLogProcessor added to the pipeline.
218+
/// A new `LoggerProviderBuilder` instance with the `BatchLogProcessor` added to the pipeline.
212219
///
213220
/// Processors are invoked in the order they are added.
214221
pub fn with_batch_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {

0 commit comments

Comments
 (0)