Skip to content

Commit b40bd10

Browse files
First draft of logs enrichment example
1 parent df412fe commit b40bd10

File tree

3 files changed

+170
-0
lines changed

3 files changed

+170
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "logs-enrichment"
3+
version = "0.1.0"
4+
edition = "2021"
5+
license = "Apache-2.0"
6+
rust-version = "1.75.0"
7+
publish = false
8+
autobenches = false
9+
10+
[[bin]]
11+
name = "logs-enrichment"
12+
path = "src/main.rs"
13+
bench = false
14+
required-features = ["spec_unstable_logs_enabled"]
15+
16+
[features]
17+
spec_unstable_logs_enabled = ["opentelemetry/spec_unstable_logs_enabled", "opentelemetry_sdk/spec_unstable_logs_enabled"]
18+
19+
[dependencies]
20+
opentelemetry_sdk = { workspace = true, features = ["logs", "spec_unstable_logs_enabled"] }
21+
opentelemetry-stdout = { workspace = true, features = ["logs"] }
22+
opentelemetry-appender-tracing = { workspace = true }
23+
tracing = { workspace = true, features = ["std"]}
24+
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
25+
opentelemetry = { version = "0.31.0", features = ["spec_unstable_logs_enabled"] }

examples/logs-enrichment/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# OpenTelemetry Log Appender for tracing - Example
2+
3+
This example shows how to use the opentelemetry-appender-tracing crate, which is a
4+
[logging
5+
appender](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/glossary.md#log-appender--bridge)
6+
that bridges logs from the [tracing crate](https://tracing.rs/tracing/#events) to
7+
OpenTelemetry. The example setups a LoggerProvider with stdout exporter, so logs
8+
are emitted to stdout.
9+
10+
## Usage
11+
12+
Run the following, and Logs emitted using [tracing](https://docs.rs/tracing/latest/tracing/)
13+
will be written out to stdout.
14+
15+
```shell
16+
cargo run
17+
```
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use std::thread::sleep;
2+
use std::time::Duration;
3+
use opentelemetry_appender_tracing::layer;
4+
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider, SimpleLogProcessor};
5+
use opentelemetry_sdk::Resource;
6+
use tracing::{info, error};
7+
use tracing_subscriber::{prelude::*, EnvFilter};
8+
use opentelemetry::InstrumentationScope;
9+
use opentelemetry::logs::Severity;
10+
use opentelemetry_sdk::error::OTelSdkResult;
11+
12+
fn main() {
13+
let exporter = opentelemetry_stdout::LogExporter::default();
14+
let filtering_processor = FilteringLogProcessor::new(
15+
SimpleLogProcessor::new(exporter),
16+
Severity::Error,
17+
);
18+
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
19+
.with_resource(
20+
Resource::builder()
21+
.with_service_name("log-appender-tracing-example")
22+
.build(),
23+
)
24+
.with_log_processor(SlowEnrichmentLogProcessor::new(filtering_processor))
25+
.build();
26+
27+
// To prevent a telemetry-induced-telemetry loop, OpenTelemetry's own internal
28+
// logging is properly suppressed. However, logs emitted by external components
29+
// (such as reqwest, tonic, etc.) are not suppressed as they do not propagate
30+
// OpenTelemetry context. Until this issue is addressed
31+
// (https://github.com/open-telemetry/opentelemetry-rust/issues/2877),
32+
// filtering like this is the best way to suppress such logs.
33+
//
34+
// The filter levels are set as follows:
35+
// - Allow `info` level and above by default.
36+
// - Completely restrict logs from `hyper`, `tonic`, `h2`, and `reqwest`.
37+
//
38+
// Note: This filtering will also drop logs from these components even when
39+
// they are used outside of the OTLP Exporter.
40+
let filter_otel = EnvFilter::new("info")
41+
.add_directive("hyper=off".parse().unwrap())
42+
.add_directive("tonic=off".parse().unwrap())
43+
.add_directive("h2=off".parse().unwrap())
44+
.add_directive("reqwest=off".parse().unwrap());
45+
let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);
46+
47+
// Create a new tracing::Fmt layer to print the logs to stdout. It has a
48+
// default filter of `info` level and above, and `debug` and above for logs
49+
// from OpenTelemetry crates. The filter levels can be customized as needed.
50+
let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap());
51+
let fmt_layer = tracing_subscriber::fmt::layer()
52+
.with_thread_names(true)
53+
.with_filter(filter_fmt);
54+
55+
tracing_subscriber::registry()
56+
.with(otel_layer)
57+
.with(fmt_layer)
58+
.init();
59+
60+
for i in 0..10 {
61+
info!(name: "my-event-name", target: "my-system", event_id = 20 + i, user_name = "otel", user_email = "[email protected]", message = "This is an example message");
62+
}
63+
error!(name: "my-event-name", target: "my-system", event_id = 50, user_name = "otel", user_email = "[email protected]", message = "This is an example message");
64+
let _ = provider.shutdown();
65+
}
66+
67+
#[derive(Debug)]
68+
pub struct FilteringLogProcessor<P: LogProcessor> {
69+
delegate: P,
70+
min_severity: Severity,
71+
}
72+
73+
impl<P: LogProcessor> FilteringLogProcessor<P> {
74+
pub fn new(delegate: P, min_severity: Severity) -> FilteringLogProcessor<P> {
75+
FilteringLogProcessor {
76+
delegate: delegate,
77+
min_severity: min_severity,
78+
}
79+
}
80+
}
81+
82+
impl<P: LogProcessor> LogProcessor for FilteringLogProcessor<P> {
83+
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
84+
self.delegate.emit(data, instrumentation)
85+
}
86+
87+
fn force_flush(&self) -> OTelSdkResult {
88+
self.delegate.force_flush()
89+
}
90+
91+
#[cfg(feature = "spec_unstable_logs_enabled")]
92+
fn event_enabled(&self, level: Severity, target: &str, name: Option<&str>) -> bool {
93+
println!("filtering: severity is {level:?}");
94+
self.delegate.event_enabled(level, target, name) && level >= self.min_severity
95+
}
96+
}
97+
98+
#[derive(Debug)]
99+
pub struct SlowEnrichmentLogProcessor<P: LogProcessor> {
100+
/// The wrapped processor that will receive enriched log records
101+
delegate: P,
102+
}
103+
104+
impl<P: LogProcessor> SlowEnrichmentLogProcessor<P> {
105+
pub fn new(delegate: P) -> SlowEnrichmentLogProcessor<P> {
106+
SlowEnrichmentLogProcessor {
107+
delegate: delegate,
108+
}
109+
}
110+
}
111+
112+
impl<P: LogProcessor> LogProcessor for SlowEnrichmentLogProcessor<P> {
113+
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
114+
println!("slow enrichment log processor is waiting 1 s");
115+
sleep(Duration::from_secs(1));
116+
self.delegate.emit(data, instrumentation);
117+
}
118+
119+
fn force_flush(&self) -> OTelSdkResult {
120+
self.delegate.force_flush()
121+
}
122+
123+
#[cfg(feature = "spec_unstable_logs_enabled")]
124+
fn event_enabled(&self, level: Severity, target: &str, name: Option<&str>) -> bool {
125+
println!("slow enrichment: severity is {level:?}");
126+
self.delegate.event_enabled(level, target, name)
127+
}
128+
}

0 commit comments

Comments
 (0)