Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opentelemetry_sdk::logs::LogError;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::metrics::MetricError;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{trace as sdktrace, Resource};
use std::error::Error;
use tracing::info;
use tracing_subscriber::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)
#[cfg(feature = "grpc-tonic")]
use opentelemetry::otel_debug;
use std::fmt::Debug;

Expand Down
15 changes: 11 additions & 4 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
Expand Down Expand Up @@ -207,7 +207,6 @@
instrumentation.clone(),
))));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
Expand Down Expand Up @@ -317,9 +316,14 @@
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchLogProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),

Check warning on line 321 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L321

Added line #L321 was not covered by tests
max_export_batch_size = config.max_export_batch_size,
max_queue_size = max_queue_size,
);
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);
let mut logs = Vec::with_capacity(config.max_export_batch_size);

loop {
let remaining_time_option = config
Expand Down Expand Up @@ -387,6 +391,9 @@
}
}
}
otel_info!(
name: "BatchLogProcessor.ThreadStopped"
);
})
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure

Expand Down
18 changes: 13 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
use crate::export::trace::{SpanData, SpanExporter};
use crate::resource::Resource;
use crate::trace::Span;
use opentelemetry::otel_error;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry::{otel_error, otel_info};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
Expand Down Expand Up @@ -258,8 +258,14 @@
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);

let handle = thread::Builder::new()
.name("BatchSpanProcessorThread".to_string())
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchSpanProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),

Check warning on line 265 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L265

Added line #L265 was not covered by tests
max_export_batch_size = config.max_export_batch_size,
max_queue_size = config.max_queue_size,
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

Expand Down Expand Up @@ -321,6 +327,9 @@
}
}
}
otel_info!(
name: "BatchSpanProcessor.ThreadStopped"
);
})
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure

Expand Down Expand Up @@ -363,13 +372,12 @@
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
// emit a warning.
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the reqwest-client feature
echo
echo ####
echo Integration Tests: Reqwest Client
echo "Integration Tests: Reqwest Client (Disabled now)"
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "reqwest-client","internal-logs"

# Run tests with the reqwest-client feature
# Run tests with the reqwest-blocking-client feature
echo
echo ####
echo Integration Tests: Reqwest Blocking Client
Expand All @@ -33,10 +33,10 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the hyper-client feature
echo
echo ####
echo Integration Tests: Hyper Client
echo "Integration Tests: Hyper Client (Disabled now)"
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "hyper-client","internal-logs"
else
echo "Directory $TEST_DIR does not exist. Skipping tests."
Expand Down
Loading