Skip to content

Commit 92bc173

Browse files
Added EventHubs sample which logs tracing_subscriber messages to the EventHubs service. (Azure#2546)
* Tracing example
1 parent a31eaf2 commit 92bc173

File tree

7 files changed

+296
-26
lines changed

7 files changed

+296
-26
lines changed

sdk/core/azure_core_amqp/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ create_extensible_enum!(
9999
impl From<AmqpSymbol> for AmqpErrorCondition {
100100
fn from(condition: AmqpSymbol) -> Self {
101101
// Note that the `from_str` implementation from `create_extensible_enum` will
102-
// never return an error. So the `expect` is there to silence the compiler.
103-
AmqpErrorCondition::from_str(condition.0.as_str()).expect("Invalid AMQP error condition")
102+
// never return an error. So the `unwrap` is there to silence the compiler.
103+
AmqpErrorCondition::from_str(condition.0.as_str()).unwrap()
104104
}
105105
}
106106

sdk/core/azure_core_amqp/src/fe2o3/cbs.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
116116
Ok(amqp_error) => amqp_error.into(),
117117
Err(e) => {
118118
debug!("Failed to convert management error to azure error: {:?}", e);
119-
azure_core::Error::message(
119+
azure_core::Error::full(
120120
azure_core::error::ErrorKind::Amqp,
121-
format!("Failed to convert management error to azure error: {}", e),
121+
e,
122+
"Failed to convert management error to azure error.",
122123
)
123124
}
124125
})?;

sdk/core/azure_core_amqp/src/fe2o3/value.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,9 @@ impl From<Fe2o3SerializationError> for azure_core::Error {
545545
| serde_amqp::Error::SequenceLengthMismatch
546546
| serde_amqp::Error::InvalidLength
547547
| serde_amqp::Error::InvalidValue
548-
| serde_amqp::Error::IsDescribedType => azure_core::Error::new(
549-
ErrorKind::Amqp,
550-
AmqpError::from(AmqpErrorKind::TransportImplementationError(Box::new(err.0))),
551-
),
548+
| serde_amqp::Error::IsDescribedType => {
549+
AmqpError::from(AmqpErrorKind::TransportImplementationError(Box::new(err.0))).into()
550+
}
552551
}
553552
}
554553
}

sdk/core/azure_core_amqp/src/value.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,14 @@ macro_rules! conversions_for_amqp_types {
405405
}
406406
}
407407
}
408+
impl From<&AmqpValue> for $t {
409+
fn from(v: &AmqpValue) -> Self {
410+
match v {
411+
AmqpValue::$field(v) => v.clone(),
412+
_ => panic!("Expected a {}", stringify!($t)),
413+
}
414+
}
415+
}
408416

409417
impl PartialEq<$t> for AmqpValue {
410418
fn eq(&self, other: &$t) -> bool {
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// Copyright (c) Microsoft Corporation. All Rights reserved
2+
// Licensed under the MIT license.
3+
// cspell: ignore callsite
4+
5+
//! This sample shows using the `tracing` crate to log messages to Azure Event Hubs
6+
//! using a custom `Layer`. It demonstrates how to create a custom layer that formats log messages
7+
//! and sends them to Event Hubs asynchronously.
8+
9+
use azure_identity::DefaultAzureCredential;
10+
use azure_messaging_eventhubs::{models::EventData, ProducerClient};
11+
use std::error::Error;
12+
use std::fmt::Debug;
13+
use std::sync::Arc;
14+
use std::{env, time::SystemTime};
15+
use tokio::sync::mpsc;
16+
use tracing::{
17+
field::Visit,
18+
span, {Event, Level, Subscriber},
19+
};
20+
use tracing_subscriber::{
21+
fmt,
22+
fmt::format::FmtSpan,
23+
layer::{Context, Layer},
24+
prelude::*,
25+
registry::{LookupSpan, Registry},
26+
};
27+
28+
/// A custom tracing layer that sends log messages to Azure Event Hubs.
29+
struct EventHubsLayer {
30+
sender: mpsc::Sender<EventData>,
31+
}
32+
33+
impl EventHubsLayer {
34+
async fn new(
35+
fully_qualified_namespace: &str,
36+
eventhub_name: &str,
37+
) -> Result<Self, Box<dyn Error>> {
38+
let producer = ProducerClient::builder()
39+
.open(
40+
fully_qualified_namespace,
41+
eventhub_name,
42+
DefaultAzureCredential::new()?,
43+
)
44+
.await?;
45+
46+
let (sender, mut receiver) = mpsc::channel::<EventData>(100);
47+
let producer_arc = Arc::new(producer);
48+
let producer_clone = producer_arc.clone();
49+
50+
// Spawn a task that receives log messages and sends them to EventHubs
51+
tokio::spawn(async move {
52+
while let Some(event) = receiver.recv().await {
53+
if let Err(err) = producer_clone.send_event(event, None).await {
54+
eprintln!("Failed to send event to EventHubs: {err}");
55+
}
56+
}
57+
});
58+
59+
Ok(Self { sender })
60+
}
61+
}
62+
63+
/// Constants for property names in the EventData.
64+
/// These constants are used to set properties in the EventData object.
65+
const LOG_LEVEL_PROPERTY: &str = "log_level";
66+
const NAME_PROPERTY: &str = "name";
67+
const TARGET_PROPERTY: &str = "target";
68+
const MODULE_PATH_PROPERTY: &str = "module_path";
69+
const FILE_PROPERTY: &str = "file";
70+
const LINE_PROPERTY: &str = "line";
71+
const FIELDS_PROPERTY: &str = "fields";
72+
const IS_SPAN_PROPERTY: &str = "is_span";
73+
const IS_EVENT_PROPERTY: &str = "is_event";
74+
const TIMESTAMP_PROPERTY: &str = "timestamp";
75+
const EVENT_TYPE_PROPERTY: &str = "event_type";
76+
const SPAN_ID_PROPERTY: &str = "span_id";
77+
78+
const EVENT_TYPE: &str = "event";
79+
const SPAN_OPEN: &str = "span_open";
80+
const SPAN_CLOSE: &str = "span_close";
81+
82+
impl<S> Layer<S> for EventHubsLayer
83+
where
84+
S: Subscriber + for<'a> LookupSpan<'a>,
85+
{
86+
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
87+
// Format the event into a string
88+
let mut visitor = EventVisitor {
89+
buffer: String::new(),
90+
};
91+
92+
event.record(&mut visitor);
93+
94+
// Add timestamp, level, and target
95+
let metadata = event.metadata();
96+
97+
// Build an EventData object containing both the event body and properties reflecting the event's metadata.
98+
let mut event_data_builder = EventData::builder()
99+
.with_body(visitor.buffer)
100+
.add_property(EVENT_TYPE_PROPERTY.into(), EVENT_TYPE)
101+
.add_property(LOG_LEVEL_PROPERTY.into(), metadata.level().as_str())
102+
.add_property(
103+
TIMESTAMP_PROPERTY.to_string(),
104+
SystemTime::now()
105+
.duration_since(SystemTime::UNIX_EPOCH)
106+
.unwrap()
107+
.as_secs(),
108+
)
109+
.add_property(TARGET_PROPERTY.to_string(), metadata.target())
110+
.add_property(NAME_PROPERTY.to_string(), metadata.name())
111+
.add_property(FIELDS_PROPERTY.to_string(), metadata.fields().to_string())
112+
.add_property(IS_EVENT_PROPERTY.to_string(), metadata.is_event())
113+
.add_property(IS_SPAN_PROPERTY.to_string(), metadata.is_span());
114+
115+
// Handle optional metadata fields.
116+
if let Some(module_path) = metadata.module_path() {
117+
event_data_builder =
118+
event_data_builder.add_property(MODULE_PATH_PROPERTY.to_string(), module_path);
119+
}
120+
if let Some(file) = metadata.file() {
121+
event_data_builder = event_data_builder.add_property(FILE_PROPERTY.to_string(), file);
122+
}
123+
if let Some(line) = metadata.line() {
124+
event_data_builder = event_data_builder.add_property(LINE_PROPERTY.to_string(), line);
125+
}
126+
127+
let event_data = event_data_builder.build();
128+
129+
// Send the event to Event Hubs asynchronously
130+
self.sender.try_send(event_data).unwrap_or_else(|_| {
131+
eprintln!("Failed to send event to EventHubs");
132+
});
133+
}
134+
135+
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, _ctx: Context<'_, S>) {
136+
let event_data = EventData::builder()
137+
.add_property(EVENT_TYPE_PROPERTY.into(), SPAN_OPEN)
138+
.add_property(LOG_LEVEL_PROPERTY.into(), attrs.metadata().level().as_str())
139+
.add_property(SPAN_ID_PROPERTY.into(), id.into_u64())
140+
.add_property(
141+
TIMESTAMP_PROPERTY.to_string(),
142+
SystemTime::now()
143+
.duration_since(SystemTime::UNIX_EPOCH)
144+
.unwrap()
145+
.as_secs(),
146+
)
147+
.add_property(NAME_PROPERTY.to_string(), attrs.metadata().name())
148+
.build();
149+
self.sender.try_send(event_data).unwrap_or_else(|_| {
150+
eprintln!("Failed to send span open event to EventHubs");
151+
});
152+
}
153+
fn on_close(&self, id: span::Id, _ctx: Context<'_, S>) {
154+
// This method is called when a span is closed.
155+
let event_data = EventData::builder()
156+
.add_property(EVENT_TYPE_PROPERTY.into(), SPAN_CLOSE)
157+
.add_property(NAME_PROPERTY.into(), id.into_u64())
158+
.add_property(
159+
TIMESTAMP_PROPERTY.to_string(),
160+
SystemTime::now()
161+
.duration_since(SystemTime::UNIX_EPOCH)
162+
.unwrap()
163+
.as_secs(),
164+
)
165+
.build();
166+
self.sender.try_send(event_data).unwrap_or_else(|_| {
167+
eprintln!("Failed to send span close event to EventHubs");
168+
});
169+
}
170+
}
171+
172+
/// A visitor that formats log messages and collects them into a buffer.
173+
struct EventVisitor {
174+
buffer: String,
175+
}
176+
177+
impl Visit for EventVisitor {
178+
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
179+
// Append the field name and value to the buffer
180+
self.buffer
181+
.push_str(&format!("{}: {}", field.name(), value));
182+
}
183+
184+
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
185+
self.buffer
186+
.push_str(&format!("{}: {:?}", field.name(), value));
187+
}
188+
}
189+
190+
/// Test struct to demonstrate structured data logging.
191+
#[derive(Debug)]
192+
struct StructuredData {
193+
body: String,
194+
properties: std::collections::HashMap<String, String>,
195+
}
196+
197+
impl std::fmt::Display for StructuredData {
198+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199+
write!(f, "StructuredData: {}", self.body)
200+
}
201+
}
202+
203+
/// Enable a tracing subscriber that sends logs to the Azure Event Hubs service.
204+
async fn enable_eventhubs_logging() -> Result<(), Box<dyn Error>> {
205+
// Get EventHubs fully qualified domain name and hub name from environment variables
206+
let fully_qualified_domain_name =
207+
env::var("EVENTHUBS_HOST").expect("EVENTHUBS_HOST must be set");
208+
let eventhub_name = env::var("EVENTHUB_NAME").expect("EVENTHUB_NAME must be set");
209+
210+
// Create our custom EventHubsLayer
211+
let eventhubs_layer = EventHubsLayer::new(&fully_qualified_domain_name, &eventhub_name).await?;
212+
213+
// Set up tracing subscriber with both console and EventHubs outputs. Note that the components of the EventHubs service (which also uses tracing)
214+
// need to be disabled.
215+
let subscriber = Registry::default()
216+
.with(fmt::layer().with_target(true).with_span_events(
217+
FmtSpan::NEW | FmtSpan::ENTER | FmtSpan::EXIT | FmtSpan::CLOSE | FmtSpan::FULL,
218+
))
219+
// Add filters to exclude the AMQP and EventHubs logs to avoid recursive logging.
220+
.with(
221+
tracing_subscriber::EnvFilter::from_default_env()
222+
.add_directive("azure_messaging_eventhubs=off".parse()?)
223+
.add_directive("azure_core_amqp=off".parse()?)
224+
.add_directive("fe2o3_amqp=off".parse()?),
225+
)
226+
.with(eventhubs_layer);
227+
228+
// Initialize the global tracing subscriber
229+
tracing::subscriber::set_global_default(subscriber)?;
230+
Ok(())
231+
}
232+
233+
#[tokio::main]
234+
async fn main() -> Result<(), Box<dyn Error>> {
235+
enable_eventhubs_logging().await?;
236+
237+
// Application code
238+
tracing::info!("Application started");
239+
// ... your application code here ...
240+
tracing::info!("Processing some data");
241+
tracing::warn!("This is a warning message");
242+
tracing::error!("This is an error message");
243+
244+
let mut data = StructuredData {
245+
body: "This is a structured log message".to_string(),
246+
properties: std::collections::HashMap::new(),
247+
};
248+
249+
data.properties
250+
.insert("key1".to_string(), "value1".to_string());
251+
data.properties
252+
.insert("key2".to_string(), "value2".to_string());
253+
254+
tracing::info!("Sending structured data to EventHubs");
255+
tracing::info!("Structured data: {:?}", data);
256+
tracing::event!(Level::TRACE, "Structured data2: {}", data);
257+
258+
tracing::info!(field1 = 1, field2 = "string", "logged a couple fields");
259+
260+
tracing::span!(Level::TRACE, "example_span").in_scope(|| {
261+
tracing::info!("Inside a span");
262+
tracing::debug!("Debug message inside a span");
263+
});
264+
tracing::info!("Exiting the span");
265+
266+
// Sleep to allow logs to be sent
267+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
268+
269+
tracing::info!("Application shutting down");
270+
Ok(())
271+
}

sdk/eventhubs/azure_messaging_eventhubs/src/common/connection_manager.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,16 +423,15 @@ impl ConnectionManager {
423423
match amqp_error.kind() {
424424
AmqpErrorKind::ManagementStatusCode(code, _) => {
425425
debug!("Management operation error: {}", code);
426-
match code {
427-
// Retry on 408 (Request Timeout) and 429 (Too Many Requests)
426+
matches!(
427+
code,
428428
azure_core::http::StatusCode::RequestTimeout
429-
| azure_core::http::StatusCode::TooManyRequests
430-
| azure_core::http::StatusCode::InternalServerError
431-
| azure_core::http::StatusCode::BadGateway
432-
| azure_core::http::StatusCode::ServiceUnavailable
433-
| azure_core::http::StatusCode::GatewayTimeout => true,
434-
_ => false,
435-
}
429+
| azure_core::http::StatusCode::TooManyRequests
430+
| azure_core::http::StatusCode::InternalServerError
431+
| azure_core::http::StatusCode::BadGateway
432+
| azure_core::http::StatusCode::ServiceUnavailable
433+
| azure_core::http::StatusCode::GatewayTimeout
434+
)
436435
}
437436
AmqpErrorKind::AmqpDescribedError(described_error) => {
438437
debug!("AMQP described error: {:?}", described_error);

sdk/eventhubs/azure_messaging_eventhubs/src/common/management.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ impl ManagementInstance {
127127
let name: String = response
128128
.get(EVENTHUB_PROPERTY_NAME)
129129
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
130-
.clone()
131130
.into();
132131
let created_at: Option<SystemTime> = Into::<AmqpTimestamp>::into(
133132
response
@@ -192,40 +191,33 @@ impl ManagementInstance {
192191
beginning_sequence_number: response
193192
.get(EVENTHUB_PARTITION_PROPERTIES_BEGIN_SEQUENCE_NUMBER)
194193
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
195-
.clone()
196194
.into(),
197195
id: response
198196
.get(EVENTHUB_PROPERTY_PARTITION)
199197
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
200-
.clone()
201198
.into(),
202199
eventhub: response
203200
.get(EVENTHUB_PROPERTY_NAME)
204201
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
205-
.clone()
206202
.into(),
207203

208204
last_enqueued_sequence_number: response
209205
.get(EVENTHUB_PARTITION_PROPERTIES_LAST_ENQUEUED_SEQUENCE_NUMBER)
210206
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
211-
.clone()
212207
.into(),
213208
last_enqueued_offset: response
214209
.get(EVENTHUB_PARTITION_PROPERTIES_LAST_ENQUEUED_OFFSET)
215210
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
216-
.clone()
217211
.into(),
218212
last_enqueued_time_utc: Into::<AmqpTimestamp>::into(
219213
response
220214
.get(EVENTHUB_PARTITION_PROPERTIES_LAST_ENQUEUED_TIME_UTC)
221-
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
222-
.clone(),
215+
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?,
223216
)
224217
.0,
225218
is_empty: response
226219
.get(EVENTHUB_PARTITION_PROPERTIES_IS_EMPTY)
227220
.ok_or_else(|| EventHubsError::from(ErrorKind::InvalidManagementResponse))?
228-
.clone()
229221
.into(),
230222
})
231223
}

0 commit comments

Comments
 (0)