Skip to content

Commit 4863a0e

Browse files
authored
Merge branch 'main' into ban/separate-example-names
2 parents 9d4514c + e0159ad commit 4863a0e

File tree

11 files changed

+191
-64
lines changed

11 files changed

+191
-64
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ jobs:
2727
- rust: stable
2828
os: actuated-arm64-4cpu-16gb
2929
runs-on: ${{ matrix.os }}
30+
continue-on-error: ${{ matrix.rust == 'beta' }}
3031
steps:
3132
- name: Free disk space
3233
if: ${{ matrix.os == 'ubuntu-latest'}}

examples/metrics-basic/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@ license = "Apache-2.0"
66
publish = false
77

88
[dependencies]
9-
opentelemetry = { path = "../../opentelemetry", features = ["metrics", "otel_unstable"] }
9+
opentelemetry = { path = "../../opentelemetry", features = ["metrics"] }
1010
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] }
1111
opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]}
1212
tokio = { workspace = true, features = ["full"] }
1313
serde_json = { workspace = true }
1414

15-
[features]
16-
default = ["otel_unstable"]
17-
otel_unstable = ["opentelemetry/otel_unstable"]

examples/self-diagnostics/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ opentelemetry-stdout = { path = "../../opentelemetry-stdout"}
1212
tokio = { workspace = true, features = ["full"] }
1313
tracing = { workspace = true, features = ["std"]}
1414
tracing-core = { workspace = true }
15-
tracing-subscriber = { version = "0.3.18", features = ["env-filter","registry", "std"]}
15+
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"]}

opentelemetry-sdk/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## vNext
44

5+
## 0.27.1
6+
7+
Released 2024-Nov-27
8+
59
- **DEPRECATED**:
610
- `trace::Config` methods are moving onto `TracerProvider` Builder to be consistent with other signals. See https://github.com/open-telemetry/opentelemetry-rust/pull/2303 for migration guide.
711
`trace::Config` is scheduled to be removed from public API in `v0.28.0`.

opentelemetry-sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "opentelemetry_sdk"
3-
version = "0.27.0"
3+
version = "0.27.1"
44
description = "The SDK for the OpenTelemetry metrics collection and distributed tracing framework"
55
homepage = "https://github.com/open-telemetry/opentelemetry-rust"
66
repository = "https://github.com/open-telemetry/opentelemetry-rust"

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,19 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
5959
fn logger_with_scope(&self, scope: InstrumentationScope) -> Self::Logger {
6060
// If the provider is shutdown, new logger will refer a no-op logger provider.
6161
if self.inner.is_shutdown.load(Ordering::Relaxed) {
62+
otel_debug!(
63+
name: "LoggerProvider.NoOpLoggerReturned",
64+
logger_name = scope.name(),
65+
);
6266
return Logger::new(scope, noop_logger_provider().clone());
6367
}
6468
if scope.name().is_empty() {
6569
otel_info!(name: "LoggerNameEmpty", message = "Logger name is empty; consider providing a meaningful name. Logger will function normally and the provided name will be used as-is.");
6670
};
71+
otel_debug!(
72+
name: "LoggerProvider.NewLoggerReturned",
73+
logger_name = scope.name(),
74+
);
6775
Logger::new(scope, self.clone())
6876
}
6977
}
@@ -92,6 +100,9 @@ impl LoggerProvider {
92100

93101
/// Shuts down this `LoggerProvider`
94102
pub fn shutdown(&self) -> LogResult<()> {
103+
otel_debug!(
104+
name: "LoggerProvider.ShutdownInvokedByUser",
105+
);
95106
if self
96107
.inner
97108
.is_shutdown
@@ -224,6 +235,10 @@ impl Builder {
224235
for processor in logger_provider.log_processors() {
225236
processor.set_resource(logger_provider.resource());
226237
}
238+
239+
otel_debug!(
240+
name: "LoggerProvider.Built",
241+
);
227242
logger_provider
228243
}
229244
}

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures_util::{
1313
use opentelemetry::logs::Severity;
1414
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
1515

16-
use std::sync::atomic::AtomicBool;
16+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1717
use std::{cmp::min, env, sync::Mutex};
1818
use std::{
1919
fmt::{self, Debug, Formatter},
@@ -154,6 +154,12 @@ impl LogProcessor for SimpleLogProcessor {
154154
/// them at a pre-configured interval.
155155
pub struct BatchLogProcessor<R: RuntimeChannel> {
156156
message_sender: R::Sender<BatchMessage>,
157+
158+
// Track dropped logs - we'll log this at shutdown
159+
dropped_logs_count: AtomicUsize,
160+
161+
// Track the maximum queue size that was configured for this processor
162+
max_queue_size: usize,
157163
}
158164

159165
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
@@ -172,11 +178,13 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
172178
)));
173179

174180
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
175-
if let Err(err) = result {
176-
otel_error!(
177-
name: "BatchLogProcessor.Export.Error",
178-
error = format!("{}", err)
179-
);
181+
if result.is_err() {
182+
// Increment dropped logs count. The first time we have to drop a log,
183+
// emit a warning.
184+
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
185+
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
186+
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
187+
}
180188
}
181189
}
182190

@@ -192,6 +200,17 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
192200
}
193201

194202
fn shutdown(&self) -> LogResult<()> {
203+
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
204+
let max_queue_size = self.max_queue_size;
205+
if dropped_logs > 0 {
206+
otel_warn!(
207+
name: "BatchLogProcessor.LogsDropped",
208+
dropped_logs_count = dropped_logs,
209+
max_queue_size = max_queue_size,
210+
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
211+
);
212+
}
213+
195214
let (res_sender, res_receiver) = oneshot::channel();
196215
self.message_sender
197216
.try_send(BatchMessage::Shutdown(res_sender))
@@ -215,6 +234,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
215234
let (message_sender, message_receiver) =
216235
runtime.batch_message_channel(config.max_queue_size);
217236
let inner_runtime = runtime.clone();
237+
let max_queue_size = config.max_queue_size;
218238

219239
// Spawn worker process via user-defined spawn function.
220240
runtime.spawn(Box::pin(async move {
@@ -296,8 +316,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
296316
}
297317
}
298318
}));
319+
299320
// Return batch processor with link to worker
300-
BatchLogProcessor { message_sender }
321+
BatchLogProcessor {
322+
message_sender,
323+
dropped_logs_count: AtomicUsize::new(0),
324+
max_queue_size,
325+
}
301326
}
302327

303328
/// Create a new batch processor builder

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ use futures_util::{
4545
stream::{self, FusedStream, FuturesUnordered},
4646
StreamExt as _,
4747
};
48-
use opentelemetry::{otel_debug, otel_error};
48+
use opentelemetry::{otel_debug, otel_error, otel_warn};
4949
use opentelemetry::{
5050
trace::{TraceError, TraceResult},
5151
Context,
5252
};
5353
use std::cmp::min;
54+
use std::sync::atomic::{AtomicUsize, Ordering};
5455
use std::sync::{Arc, Mutex};
5556
use std::{env, fmt, str::FromStr, time::Duration};
5657

@@ -227,6 +228,12 @@ impl SpanProcessor for SimpleSpanProcessor {
227228
/// [`async-std`]: https://async.rs
228229
pub struct BatchSpanProcessor<R: RuntimeChannel> {
229230
message_sender: R::Sender<BatchMessage>,
231+
232+
// Track dropped spans
233+
dropped_spans_count: AtomicUsize,
234+
235+
// Track the maximum queue size that was configured for this processor
236+
max_queue_size: usize,
230237
}
231238

232239
impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
@@ -249,11 +256,14 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
249256

250257
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
251258

252-
if let Err(err) = result {
253-
otel_debug!(
254-
name: "BatchSpanProcessor.OnEnd.ExportQueueingFailed",
255-
reason = format!("{:?}", TraceError::Other(err.into()))
256-
);
259+
// If the queue is full, and we can't buffer a span
260+
if result.is_err() {
261+
// Increment the number of dropped spans. If this is the first time we've had to drop,
262+
// emit a warning.
263+
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
264+
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
265+
message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
266+
}
257267
}
258268
}
259269

@@ -269,6 +279,17 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
269279
}
270280

271281
fn shutdown(&self) -> TraceResult<()> {
282+
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
283+
let max_queue_size = self.max_queue_size;
284+
if dropped_spans > 0 {
285+
otel_warn!(
286+
name: "BatchSpanProcessor.Shutdown",
287+
dropped_spans = dropped_spans,
288+
max_queue_size = max_queue_size,
289+
message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
290+
);
291+
}
292+
272293
let (res_sender, res_receiver) = oneshot::channel();
273294
self.message_sender
274295
.try_send(BatchMessage::Shutdown(res_sender))
@@ -469,6 +490,8 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
469490
let (message_sender, message_receiver) =
470491
runtime.batch_message_channel(config.max_queue_size);
471492

493+
let max_queue_size = config.max_queue_size;
494+
472495
let inner_runtime = runtime.clone();
473496
// Spawn worker process via user-defined spawn function.
474497
runtime.spawn(Box::pin(async move {
@@ -493,7 +516,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
493516
}));
494517

495518
// Return batch processor with link to worker
496-
BatchSpanProcessor { message_sender }
519+
BatchSpanProcessor {
520+
message_sender,
521+
dropped_spans_count: AtomicUsize::new(0),
522+
max_queue_size,
523+
}
497524
}
498525

499526
/// Create a new batch processor builder

opentelemetry/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## vNext
44

5+
## 0.27.1
6+
7+
Released 2024-Nov-27
8+
59
## 0.27.0
610

711
Released 2024-Nov-11

opentelemetry/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "opentelemetry"
3-
version = "0.27.0"
3+
version = "0.27.1"
44
description = "OpenTelemetry API for Rust"
55
homepage = "https://github.com/open-telemetry/opentelemetry-rust"
66
repository = "https://github.com/open-telemetry/opentelemetry-rust"

0 commit comments

Comments
 (0)