Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ experimental_metrics_periodicreader_with_async_runtime = ["opentelemetry_sdk/exp
[dependencies]
once_cell = { workspace = true }
opentelemetry = { path = "../../../opentelemetry" }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs", "experimental_metrics_periodicreader_with_async_runtime"]}
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "experimental_metrics_periodicreader_with_async_runtime"]}
opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false}
opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let tracer_provider = init_traces()?;
global::set_tracer_provider(tracer_provider.clone());

let meter_provider = init_metrics()?;
global::set_meter_provider(meter_provider.clone());

let logger_provider = init_logs()?;

// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
Expand Down Expand Up @@ -126,6 +120,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
.with(fmt_layer)
.init();

let tracer_provider = init_traces()?;
global::set_tracer_provider(tracer_provider.clone());

let meter_provider = init_metrics()?;
global::set_meter_provider(meter_provider.clone());

let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")];
let scope = InstrumentationScope::builder("basic")
.with_version("1.0")
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/examples/basic-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ publish = false

[dependencies]
once_cell = { workspace = true }
opentelemetry = { path = "../../../opentelemetry", features = ["metrics", "logs"] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "logs"] }
opentelemetry-otlp = { path = "../../../opentelemetry-otlp", features = ["tonic", "metrics", "logs"] }
opentelemetry = { path = "../../../opentelemetry" }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio"] }
opentelemetry-otlp = { path = "../../../opentelemetry-otlp" }
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }
tokio = { version = "1.0", features = ["full"] }
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
Expand Down
12 changes: 10 additions & 2 deletions opentelemetry-otlp/examples/basic-otlp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ The example employs a `BatchExporter` for logs and traces, which is the
recommended approach when using OTLP exporters. While it can be modified to use
a `SimpleExporter`, this requires the main method to be a `tokio::main` function
since the `tonic` client requires a Tokio runtime. If you prefer not to use
`tokio::main`, then the `init_logs`, `init_traces`, and `init_metrics` functions
must be executed within a Tokio runtime. Below is an example:
`tokio::main`, then the `init_logs` and `init_traces` functions must be executed
within a Tokio runtime.

This examples uses the default `PeriodicReader` for metrics, which uses own
thread for background processing/exporting. Since the `tonic` client requires a
Tokio runtime, the main method must be a `tokio::main` function. If you prefer not
to use `tokio::main`, then the `init_metrics` function must be executed within a
Tokio runtime.

Below is an example on how to use non `tokio::main`:

```rust
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ fn init_logs() -> Result<opentelemetry_sdk::logs::LoggerProvider, LogError> {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let tracer_provider = init_traces()?;
global::set_tracer_provider(tracer_provider.clone());

let meter_provider = init_metrics()?;
global::set_meter_provider(meter_provider.clone());

let logger_provider = init_logs()?;

// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
Expand Down Expand Up @@ -100,6 +94,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
.with(fmt_layer)
.init();

let tracer_provider = init_traces()?;
global::set_tracer_provider(tracer_provider.clone());

let meter_provider = init_metrics()?;
global::set_meter_provider(meter_provider.clone());

let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")];
let scope = InstrumentationScope::builder("basic")
.with_version("1.0")
Expand Down
27 changes: 10 additions & 17 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@
let mut remaining_interval = interval;
otel_info!(
name: "PeriodReaderThreadStarted",
interval = interval.as_secs(),
timeout = timeout.as_secs()
interval_in_secs = interval.as_secs(),
timeout_in_secs = timeout.as_secs()

Check warning on line 179 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L178-L179

Added lines #L178 - L179 were not covered by tests
);
loop {
otel_debug!(
name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval = remaining_interval.as_millis()
name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()

Check warning on line 183 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L183

Added line #L183 was not covered by tests
);
match message_receiver.recv_timeout(remaining_interval) {
Ok(Message::Flush(response_sender)) => {
Expand Down Expand Up @@ -228,22 +228,19 @@
Err(mpsc::RecvTimeoutError::Timeout) => {
let export_start = Instant::now();
otel_debug!(
name: "PeriodReaderThreadExportingDueToTimer",
event_name = "PeriodReaderThreadExportingDueToTimer"
name: "PeriodReaderThreadExportingDueToTimer"
);

if let Err(_e) = cloned_reader.collect_and_export(timeout) {
otel_debug!(
name: "PeriodReaderThreadExportingDueToTimerFailed",
event_name = "PeriodReaderThreadExportingDueToTimerFailed"
name: "PeriodReaderThreadExportingDueToTimerFailed"
);
}

let time_taken_for_export = export_start.elapsed();
if time_taken_for_export > interval {
otel_debug!(
name: "PeriodReaderThreadExportTookLongerThanInterval",
event_name = "PeriodReaderThreadExportTookLongerThanInterval"
name: "PeriodReaderThreadExportTookLongerThanInterval"

Check warning on line 243 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L243

Added line #L243 was not covered by tests
);
// if export took longer than interval, do the
// next export immediately.
Expand All @@ -265,8 +262,7 @@
}
}
otel_info!(
name: "PeriodReaderThreadStopped",
event_name = "PeriodReaderThreadStopped"
name: "PeriodReaderThreadStopped"
);
});

Expand All @@ -275,7 +271,6 @@
if let Err(e) = result_thread_creation {
otel_error!(
name: "PeriodReaderThreadStartError",
event_name = "PeriodReaderThreadStartError",
error = format!("{:?}", e)
);
}
Expand Down Expand Up @@ -339,7 +334,6 @@
if let Err(e) = collect_result {
otel_warn!(
name: "PeriodReaderCollectError",
event_name = "PeriodReaderCollectError",
error = format!("{:?}", e)
);
return Err(e);
Expand All @@ -350,6 +344,8 @@
return Ok(());
}

otel_debug!(name: "PeriodicReaderMetricsCollected", count = rm.scope_metrics.len());

// TODO: substract the time taken for collect from the timeout. collect
// involves observable callbacks too, which are user defined and can
// take arbitrary time.
Expand All @@ -362,7 +358,6 @@
if let Err(e) = exporter_result {
otel_warn!(
name: "PeriodReaderExportError",
event_name = "PeriodReaderExportError",
error = format!("{:?}", e)
);
return Err(e);
Expand All @@ -385,7 +380,6 @@
Err(e) => {
otel_debug!(
name: "PeriodReaderForceFlushError",
event_name = "PeriodReaderForceFlushError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
Expand Down Expand Up @@ -420,7 +414,6 @@
Err(e) => {
otel_debug!(
name: "PeriodReaderShutdownError",
event_name = "PeriodReaderShutdownError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
Expand Down Expand Up @@ -504,7 +497,7 @@
};

// use below command to run all tests
// cargo test metrics::periodic_reader_with_own_thread::tests --features=testing,experimental_metrics_periodic_reader_no_runtime -- --nocapture
// cargo test metrics::periodic_reader::tests --features=testing,spec_unstable_metrics_views -- --nocapture

#[derive(Debug, Clone)]
struct MetricExporterThatFailsOnlyOnFirst {
Expand Down
Loading