Skip to content

Commit 33b7572

Browse files
authored
Merge branch 'main' into log-async-trait-impl
2 parents 85f653c + c617be7 commit 33b7572

File tree

9 files changed

+236
-19
lines changed

9 files changed

+236
-19
lines changed

examples/tracing-grpc/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ fn init_tracer() -> sdktrace::TracerProvider {
2222

2323
struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);
2424

25-
impl<'a> Injector for MetadataMap<'a> {
25+
impl Injector for MetadataMap<'_> {
2626
/// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
2727
fn set(&mut self, key: &str, value: String) {
2828
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {

examples/tracing-grpc/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub mod hello_world {
2929

3030
struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
3131

32-
impl<'a> Extractor for MetadataMap<'a> {
32+
impl Extractor for MetadataMap<'_> {
3333
/// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
3434
fn get(&self, key: &str) -> Option<&str> {
3535
self.0.get(key).and_then(|metadata| metadata.to_str().ok())

opentelemetry-appender-log/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ mod any_value {
239239
pub(crate) fn serialize(value: log::kv::Value) -> Option<AnyValue> {
240240
struct ValueVisitor(Option<AnyValue>);
241241

242-
impl<'kvs> log::kv::VisitValue<'kvs> for ValueVisitor {
242+
impl log::kv::VisitValue<'_> for ValueVisitor {
243243
fn visit_any(&mut self, value: log::kv::Value) -> Result<(), log::kv::Error> {
244244
self.0 = Some(AnyValue::String(StringValue::from(value.to_string())));
245245

opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Result;
12
use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs};
23
use std::fs::File;
34

@@ -96,9 +97,9 @@ impl std::fmt::Debug for LogRecordWrapper {
9697
}
9798

9899
// read a file contains ResourceSpans in json format
99-
pub fn read_logs_from_json(file: File) -> Vec<ResourceLogs> {
100+
pub fn read_logs_from_json(file: File) -> Result<Vec<ResourceLogs>> {
100101
let reader = std::io::BufReader::new(file);
101102

102-
let log_data: LogsData = serde_json::from_reader(reader).unwrap();
103-
log_data.resource_logs
103+
let log_data: LogsData = serde_json::from_reader(reader)?;
104+
Ok(log_data.resource_logs)
104105
}

opentelemetry-otlp/tests/integration_test/tests/logs.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,33 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
3838
mod logtests {
3939
use super::*;
4040
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
41+
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
4142
use std::{fs::File, time::Duration};
43+
use tracing::info;
44+
use tracing_subscriber::layer::SubscriberExt;
45+
4246
#[test]
4347
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
4448
pub fn test_assert_logs_eq_failure() {
45-
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
46-
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
49+
let left = read_logs_from_json(
50+
File::open("./expected/logs.json").expect("failed to open expected file"),
51+
)
52+
.expect("Failed to read logs from expected file");
53+
54+
let right = read_logs_from_json(
55+
File::open("./expected/failed_logs.json")
56+
.expect("failed to open expected failed log file"),
57+
)
58+
.expect("Failed to read logs from expected failed log file");
4759
LogsAsserter::new(right, left).assert();
4860
}
4961

5062
#[test]
51-
pub fn test_assert_logs_eq() {
52-
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
63+
pub fn test_assert_logs_eq() -> Result<()> {
64+
let logs = read_logs_from_json(File::open("./expected/logs.json")?)?;
5365
LogsAsserter::new(logs.clone(), logs).assert();
66+
67+
Ok(())
5468
}
5569

5670
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
@@ -84,15 +98,44 @@ mod logtests {
8498

8599
Ok(())
86100
}
101+
102+
#[test]
103+
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
104+
pub fn logs_batch_non_tokio_main() -> Result<()> {
105+
// Initialize the logger provider inside a tokio runtime
106+
// as this allows tonic client to capture the runtime,
107+
// but actual export occurs from the dedicated std::thread
108+
// created by BatchLogProcessor.
109+
let rt = tokio::runtime::Runtime::new()?;
110+
let logger_provider = rt.block_on(async {
111+
// While we're here setup our collector container too, as this needs tokio to run
112+
test_utils::start_collector_container().await?;
113+
init_logs()
114+
})?;
115+
116+
info!("LoggerProvider created");
117+
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
118+
let subscriber = tracing_subscriber::registry().with(layer);
119+
{
120+
let _guard = tracing::subscriber::set_default(subscriber);
121+
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
122+
}
123+
let _ = logger_provider.shutdown();
124+
// tokio::time::sleep(Duration::from_secs(10)).await;
125+
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");
126+
127+
Ok(())
128+
}
87129
}
88130

89-
pub fn assert_logs_results(result: &str, expected: &str) {
90-
let left = read_logs_from_json(File::open(expected).unwrap());
91-
let right = read_logs_from_json(File::open(result).unwrap());
131+
pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
132+
let left = read_logs_from_json(File::open(expected)?)?;
133+
let right = read_logs_from_json(File::open(result)?)?;
92134

93135
LogsAsserter::new(left, right).assert();
94136

95-
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0)
137+
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
138+
Ok(())
96139
}
97140

98141
///

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,4 +1592,176 @@ mod tests {
15921592

15931593
assert_eq!(exporter.len(), 1);
15941594
}
1595+
1596+
#[test]
1597+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1598+
fn test_build_batch_log_processor_builder_rt() {
1599+
let mut env_vars = vec![
1600+
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
1601+
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
1602+
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
1603+
];
1604+
temp_env::with_vars(env_vars.clone(), || {
1605+
let builder = BatchLogProcessorWithAsyncRuntime::builder(
1606+
InMemoryLogExporter::default(),
1607+
runtime::Tokio,
1608+
);
1609+
1610+
assert_eq!(builder.config.max_export_batch_size, 500);
1611+
assert_eq!(
1612+
builder.config.scheduled_delay,
1613+
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
1614+
);
1615+
assert_eq!(
1616+
builder.config.max_queue_size,
1617+
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
1618+
);
1619+
assert_eq!(
1620+
builder.config.max_export_timeout,
1621+
Duration::from_millis(2046)
1622+
);
1623+
});
1624+
1625+
env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
1626+
1627+
temp_env::with_vars(env_vars, || {
1628+
let builder = BatchLogProcessorWithAsyncRuntime::builder(
1629+
InMemoryLogExporter::default(),
1630+
runtime::Tokio,
1631+
);
1632+
assert_eq!(builder.config.max_export_batch_size, 120);
1633+
assert_eq!(builder.config.max_queue_size, 120);
1634+
});
1635+
}
1636+
1637+
#[test]
1638+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1639+
fn test_build_batch_log_processor_builder_rt_with_custom_config() {
1640+
let expected = BatchConfigBuilder::default()
1641+
.with_max_export_batch_size(1)
1642+
.with_scheduled_delay(Duration::from_millis(2))
1643+
.with_max_export_timeout(Duration::from_millis(3))
1644+
.with_max_queue_size(4)
1645+
.build();
1646+
1647+
let builder = BatchLogProcessorWithAsyncRuntime::builder(
1648+
InMemoryLogExporter::default(),
1649+
runtime::Tokio,
1650+
)
1651+
.with_batch_config(expected);
1652+
1653+
let actual = &builder.config;
1654+
assert_eq!(actual.max_export_batch_size, 1);
1655+
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
1656+
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
1657+
assert_eq!(actual.max_queue_size, 4);
1658+
}
1659+
1660+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1661+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1662+
async fn test_set_resource_batch_processor_rt() {
1663+
let exporter = MockLogExporter {
1664+
resource: Arc::new(Mutex::new(None)),
1665+
};
1666+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1667+
Box::new(exporter.clone()),
1668+
BatchConfig::default(),
1669+
runtime::Tokio,
1670+
);
1671+
let provider = LoggerProvider::builder()
1672+
.with_log_processor(processor)
1673+
.with_resource(Resource::new(vec![
1674+
KeyValue::new("k1", "v1"),
1675+
KeyValue::new("k2", "v3"),
1676+
KeyValue::new("k3", "v3"),
1677+
KeyValue::new("k4", "v4"),
1678+
KeyValue::new("k5", "v5"),
1679+
]))
1680+
.build();
1681+
tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
1682+
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
1683+
let _ = provider.shutdown();
1684+
}
1685+
1686+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1687+
#[tokio::test(flavor = "multi_thread")]
1688+
async fn test_batch_shutdown_rt() {
1689+
// assert we will receive an error
1690+
// setup
1691+
let exporter = InMemoryLogExporterBuilder::default()
1692+
.keep_records_on_shutdown()
1693+
.build();
1694+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1695+
Box::new(exporter.clone()),
1696+
BatchConfig::default(),
1697+
runtime::Tokio,
1698+
);
1699+
1700+
let mut record = LogRecord::default();
1701+
let instrumentation = InstrumentationScope::default();
1702+
1703+
processor.emit(&mut record, &instrumentation);
1704+
processor.force_flush().unwrap();
1705+
processor.shutdown().unwrap();
1706+
// todo: expect to see errors here. How should we assert this?
1707+
processor.emit(&mut record, &instrumentation);
1708+
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1709+
}
1710+
1711+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1712+
#[tokio::test(flavor = "current_thread")]
1713+
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
1714+
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
1715+
let exporter = InMemoryLogExporterBuilder::default().build();
1716+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1717+
Box::new(exporter.clone()),
1718+
BatchConfig::default(),
1719+
runtime::Tokio,
1720+
);
1721+
1722+
//
1723+
// deadloack happens in shutdown with tokio current_thread runtime
1724+
//
1725+
processor.shutdown().unwrap();
1726+
}
1727+
1728+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1729+
#[tokio::test(flavor = "current_thread")]
1730+
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
1731+
{
1732+
let exporter = InMemoryLogExporterBuilder::default().build();
1733+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1734+
Box::new(exporter.clone()),
1735+
BatchConfig::default(),
1736+
runtime::TokioCurrentThread,
1737+
);
1738+
1739+
processor.shutdown().unwrap();
1740+
}
1741+
1742+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1743+
#[tokio::test(flavor = "multi_thread")]
1744+
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
1745+
let exporter = InMemoryLogExporterBuilder::default().build();
1746+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1747+
Box::new(exporter.clone()),
1748+
BatchConfig::default(),
1749+
runtime::Tokio,
1750+
);
1751+
1752+
processor.shutdown().unwrap();
1753+
}
1754+
1755+
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1756+
#[tokio::test(flavor = "multi_thread")]
1757+
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
1758+
let exporter = InMemoryLogExporterBuilder::default().build();
1759+
let processor = BatchLogProcessorWithAsyncRuntime::new(
1760+
Box::new(exporter.clone()),
1761+
BatchConfig::default(),
1762+
runtime::TokioCurrentThread,
1763+
);
1764+
1765+
processor.shutdown().unwrap();
1766+
}
15951767
}

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex};
2424
/// let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
2525
/// //Create a LoggerProvider and register the exporter
2626
/// let logger_provider = LoggerProvider::builder()
27-
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
27+
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
2828
/// .build();
2929
/// // Setup Log Appenders and emit logs. (Not shown here)
3030
/// logger_provider.force_flush();
@@ -83,7 +83,7 @@ pub struct LogDataWithResource {
8383
/// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build();
8484
/// //Create a LoggerProvider and register the exporter
8585
/// let logger_provider = LoggerProvider::builder()
86-
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
86+
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
8787
/// .build();
8888
/// // Setup Log Appenders and emit logs. (Not shown here)
8989
/// logger_provider.force_flush();

opentelemetry-zipkin/src/exporter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub struct ZipkinPipelineBuilder {
5454
impl Default for ZipkinPipelineBuilder {
5555
fn default() -> Self {
5656
let timeout = env::get_timeout();
57+
5758
ZipkinPipelineBuilder {
5859
#[cfg(feature = "reqwest-blocking-client")]
5960
client: Some(Arc::new(

stress/src/throughput.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ struct UnsafeSlice<'a> {
140140
slice: &'a [UnsafeCell<WorkerStats>],
141141
}
142142

143-
unsafe impl<'a> Send for UnsafeSlice<'a> {}
144-
unsafe impl<'a> Sync for UnsafeSlice<'a> {}
143+
unsafe impl Send for UnsafeSlice<'_> {}
144+
unsafe impl Sync for UnsafeSlice<'_> {}
145145

146146
impl<'a> UnsafeSlice<'a> {
147147
fn new(slice: &'a mut [WorkerStats]) -> Self {
@@ -155,7 +155,7 @@ impl<'a> UnsafeSlice<'a> {
155155
#[inline(always)]
156156
unsafe fn increment(&self, i: usize) {
157157
let value = self.slice[i].get();
158-
(*value).count = (*value).count + 1;
158+
(*value).count += 1;
159159
}
160160

161161
#[inline(always)]

0 commit comments

Comments
 (0)