Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opentelemetry_sdk::logs::LogError;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::metrics::MetricError;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{trace as sdktrace, Resource};
use std::error::Error;
use tracing::info;
use tracing_subscriber::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)

#[cfg(feature = "grpc-tonic")]
use opentelemetry::otel_debug;
use std::fmt::Debug;

Expand Down
12 changes: 10 additions & 2 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
Expand Down Expand Up @@ -207,7 +207,6 @@
instrumentation.clone(),
))));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
Expand Down Expand Up @@ -317,6 +316,12 @@
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchLogProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),

Check warning on line 321 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L321

Added line #L321 was not covered by tests
max_export_batch_size = config.max_export_batch_size,
max_queue_size = max_queue_size,
);
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);
Expand Down Expand Up @@ -387,6 +392,9 @@
}
}
}
otel_info!(
name: "BatchLogProcessor.ThreadStopped"
);
})
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure

Expand Down
18 changes: 13 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
use crate::export::trace::{SpanData, SpanExporter};
use crate::resource::Resource;
use crate::trace::Span;
use opentelemetry::otel_error;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry::{otel_error, otel_info};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
Expand Down Expand Up @@ -258,8 +258,14 @@
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);

let handle = thread::Builder::new()
.name("BatchSpanProcessorThread".to_string())
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchSpanProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),

Check warning on line 265 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L265

Added line #L265 was not covered by tests
max_export_batch_size = config.max_export_batch_size,
max_queue_size = config.max_queue_size,
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

Expand Down Expand Up @@ -321,6 +327,9 @@
}
}
}
otel_info!(
name: "BatchSpanProcessor.ThreadStopped"
);
})
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure

Expand Down Expand Up @@ -363,13 +372,12 @@
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
// emit a warning.
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the reqwest-client feature
echo
echo ####
echo Integration Tests: Reqwest Client
echo Integration Tests: Reqwest Client (Disabled now)
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "reqwest-client","internal-logs"

# Run tests with the reqwest-client feature
# Run tests with the reqwest-blocking-client feature
echo
echo ####
echo Integration Tests: Reqwest Blocking Client
Expand All @@ -33,10 +33,10 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the hyper-client feature
echo
echo ####
echo Integration Tests: Hyper Client
echo Integration Tests: Hyper Client (Disabled now)
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "hyper-client","internal-logs"
else
echo "Directory $TEST_DIR does not exist. Skipping tests."
Expand Down
Loading