Skip to content

Commit 7d4205f

Browse files
Cleaned up eventhubs disconnect operations (Azure#3038)
Partial fix for Azure#2944 The errors generated come from dropping a connection without first closing the connection (and sessions, etc on the connection). This PR makes the connection close logic much more deterministic and close operations are now much more likely to succeed.
1 parent 34b37c5 commit 7d4205f

File tree

18 files changed

+475
-139
lines changed

18 files changed

+475
-139
lines changed

sdk/core/azure_core_amqp/src/fe2o3/management.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
8686
.ok_or_else(Self::amqp_management_not_attached)?;
8787
let management = management.into_inner();
8888
management.close().await.map_err(AmqpError::from)?;
89+
90+
let mut session = self.session.lock().await;
91+
session.end().await.map_err(AmqpError::from)?;
8992
Ok(())
9093
}
9194

sdk/eventhubs/azure_messaging_eventhubs/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66

77
### Breaking Changes
88

9+
- `EventProcessor` now consumes its `ConsumerClient` parameter rather than accepting a clone of an `Arc`.
10+
911
### Bugs Fixed
1012

1113
### Other Changes
1214

15+
- Internal refactoring to ensure that the `close()` method on various clients works as expected.
16+
1317
## 0.7.0 (2025-09-16)
1418

1519
### Features Added

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_processor_client.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl BackgroundProcessor {
3030
// Start the background task to process events
3131
let mut task = self.background_task.lock().await;
3232
// let mut task = task.as_mut();
33-
let processor = Arc::clone(&self.processor);
33+
let processor = self.processor.clone();
3434
task.replace(tokio::spawn(async move { processor.run().await }));
3535
Ok(())
3636
}
@@ -58,21 +58,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5858
let eventhub_namespace = std::env::var("EVENTHUBS_HOST")?;
5959
let eventhub_name = std::env::var("EVENTHUB_NAME")?;
6060

61-
let consumer = Arc::new(
62-
ConsumerClient::builder()
63-
.open(
64-
eventhub_namespace.as_str(),
65-
eventhub_name,
66-
DeveloperToolsCredential::new(None)?.clone(),
67-
)
68-
.await?,
69-
);
61+
let consumer = ConsumerClient::builder()
62+
.open(
63+
eventhub_namespace.as_str(),
64+
eventhub_name,
65+
DeveloperToolsCredential::new(None)?.clone(),
66+
)
67+
.await?;
7068

7169
println!("Opened consumer client");
7270

7371
let checkpoint_store = Arc::new(InMemoryCheckpointStore::new());
7472
let processor = EventProcessor::builder()
75-
.build(consumer.clone(), checkpoint_store)
73+
.build(consumer, checkpoint_store)
7674
.await?;
7775
let background_processor = BackgroundProcessor::new(processor.clone()).await?;
7876

@@ -116,5 +114,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
116114
background_processor.stop().await?;
117115
println!("Stopped background processor");
118116

117+
// Close the processor
118+
if let Ok(processor) = Arc::try_unwrap(processor) {
119+
processor.close().await?;
120+
println!("Closed processor");
121+
} else {
122+
println!("Processor still has references, not closing.");
123+
}
124+
119125
Ok(())
120126
}

sdk/eventhubs/azure_messaging_eventhubs/src/common/authorizer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -566,9 +566,9 @@ mod tests {
566566
let final_count = mock_credential.get_token_get_count();
567567
debug!("After sleeping, token count: {final_count}");
568568

569-
assert_eq!(
570-
final_count, 2,
571-
"Expected token get count to be 2, but got {final_count}"
569+
assert!(
570+
final_count >= 2,
571+
"Expected token get count to be greater or equal to 2, but got {final_count}"
572572
);
573573
info!("Final token get count: {final_count}");
574574
Ok(())
@@ -650,7 +650,7 @@ mod tests {
650650
debug!("After sleeping the first time, token count: {final_count}");
651651
assert!(
652652
final_count >= 3,
653-
"Expected first get token count to be 3, but got {final_count}"
653+
"Expected first get token count to be at least 3, but got {final_count}"
654654
);
655655

656656
info!("First token expiration get count: {}", final_count);

sdk/eventhubs/azure_messaging_eventhubs/src/common/recoverable/connection.rs

Lines changed: 85 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ use azure_core::{credentials::TokenCredential, http::Url, time::Duration, Result
2222
use azure_core_amqp::{
2323
error::{AmqpErrorCondition, AmqpErrorKind},
2424
AmqpClaimsBasedSecurity, AmqpConnection, AmqpConnectionApis, AmqpConnectionOptions, AmqpError,
25-
AmqpManagement, AmqpReceiver, AmqpReceiverApis, AmqpReceiverOptions, AmqpSender,
26-
AmqpSenderApis, AmqpSession, AmqpSessionApis, AmqpSessionOptions, AmqpSource, AmqpSymbol,
25+
AmqpManagement, AmqpManagementApis, AmqpReceiver, AmqpReceiverApis, AmqpReceiverOptions,
26+
AmqpSender, AmqpSenderApis, AmqpSession, AmqpSessionApis, AmqpSessionOptions, AmqpSource,
27+
AmqpSymbol,
2728
};
2829
#[cfg(test)]
2930
use std::sync::Mutex;
@@ -73,12 +74,12 @@ pub(crate) struct RecoverableConnection {
7374
pub(super) url: Url,
7475
application_id: Option<String>,
7576
custom_endpoint: Option<Url>,
76-
connections: AsyncMutex<Option<Arc<AmqpConnection>>>,
7777
mgmt_client: AsyncMutex<Option<Arc<AmqpManagement>>>,
78-
session_instances: AsyncMutex<HashMap<Url, Arc<AmqpSession>>>,
7978
receiver_instances: AsyncMutex<HashMap<Url, Arc<AmqpReceiver>>>,
8079
sender_instances: AsyncMutex<HashMap<Url, Arc<AmqpSender>>>,
80+
session_instances: AsyncMutex<HashMap<Url, Arc<AmqpSession>>>,
8181
pub(super) authorizer: Arc<Authorizer>,
82+
connections: AsyncMutex<Option<Arc<AmqpConnection>>>,
8283
connection_name: String,
8384
pub(super) retry_options: RetryOptions,
8485

@@ -161,10 +162,80 @@ impl RecoverableConnection {
161162
/// This method will close the underlying AMQP connection, if it exists. It will also cause all outstanding sends and receives
162163
/// to complete with an error.
163164
///
164-
pub(crate) async fn close_connection(&self) -> Result<()> {
165-
let connection = self.ensure_connection().await?;
165+
pub(crate) async fn close_connection(self) -> Result<()> {
166+
trace!("Closing recoverable connection for {}.", self.url);
166167

167-
connection.close().await
168+
let mut management_client = self.mgmt_client.lock().await;
169+
if let Some(management_client) = management_client.take() {
170+
trace!("Closing management client for {}.", self.url);
171+
if let Ok(management_client) = Arc::try_unwrap(management_client) {
172+
trace!("Detaching management client for {}.", self.url);
173+
management_client.detach().await?;
174+
} else {
175+
trace!(
176+
"Failed to detach management client for {}, references exist.",
177+
self.url
178+
);
179+
}
180+
}
181+
182+
let mut sender_instances = self.sender_instances.lock().await;
183+
for (path, sender) in sender_instances.drain() {
184+
trace!("Detaching sender for path {}.", path);
185+
if let Ok(sender) = Arc::try_unwrap(sender) {
186+
trace!("Detaching sender for path {}.", path);
187+
sender.detach().await?;
188+
} else {
189+
trace!(
190+
"Failed to detach sender for path {}, references exist.",
191+
path
192+
);
193+
}
194+
}
195+
196+
let mut receiver_instances = self.receiver_instances.lock().await;
197+
for (source_url, receiver) in receiver_instances.drain() {
198+
trace!("Detaching receiver for source URL {}.", source_url);
199+
if let Ok(receiver) = Arc::try_unwrap(receiver) {
200+
trace!("Detaching receiver for source URL {}.", source_url);
201+
receiver.detach().await?;
202+
} else {
203+
trace!(
204+
"Failed to detach receiver for source URL {}, references exist.",
205+
source_url
206+
);
207+
}
208+
}
209+
210+
let mut session_instances = self.session_instances.lock().await;
211+
for (session_id, session) in session_instances.drain() {
212+
trace!("Detaching session for ID {}.", session_id);
213+
if let Ok(session) = Arc::try_unwrap(session) {
214+
session.end().await?;
215+
} else {
216+
trace!(
217+
"Failed to detach session for ID {}, references exist.",
218+
session_id
219+
);
220+
}
221+
}
222+
223+
if let Some(connection) = self.connections.lock().await.take() {
224+
trace!("Closing connection for {}.", self.url);
225+
if let Ok(connection) = Arc::try_unwrap(connection) {
226+
trace!(
227+
"No references, actually closing connection for {}.",
228+
self.url
229+
);
230+
connection.close().await?;
231+
} else {
232+
trace!(
233+
"Failed to close connection for {}, references exist.",
234+
self.url
235+
);
236+
}
237+
}
238+
Ok(())
168239
}
169240

170241
/// Ensures that the connection to the Event Hubs service is established.
@@ -482,8 +553,7 @@ impl RecoverableConnection {
482553
}
483554
}
484555
AmqpErrorKind::ConnectionClosedByRemote(_)
485-
| AmqpErrorKind::ConnectionDetachedByRemote(_)
486-
| AmqpErrorKind::ConnectionDropped(_) => {
556+
| AmqpErrorKind::ConnectionDetachedByRemote(_) => {
487557
debug!("Connection dropped error: {}", amqp_error);
488558
ErrorRecoveryAction::ReconnectConnection
489559
}
@@ -535,6 +605,12 @@ impl RecoverableConnection {
535605
}
536606
}
537607

608+
impl Drop for RecoverableConnection {
609+
fn drop(&mut self) {
610+
trace!("Dropping RecoverableConnection for {}", self.url);
611+
}
612+
}
613+
538614
#[cfg(test)]
539615
mod tests {
540616
use super::*;

sdk/eventhubs/azure_messaging_eventhubs/src/common/recoverable/receiver.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ impl RecoverableReceiver {
6565
}
6666
}
6767

68+
impl Drop for RecoverableReceiver {
69+
fn drop(&mut self) {
70+
debug!("Dropping RecoverableReceiver for {}", self.source_url);
71+
}
72+
}
73+
6874
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
6975
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
7076
impl AmqpReceiverApis for RecoverableReceiver {
@@ -93,24 +99,34 @@ impl AmqpReceiverApis for RecoverableReceiver {
9399
}
94100

95101
async fn receive_delivery(&self) -> azure_core::Result<azure_core_amqp::AmqpDelivery> {
102+
let retry_options = {
103+
self.recoverable_connection
104+
.upgrade()
105+
.ok_or_else(|| EventHubsError::from(ErrorKind::MissingConnection))?
106+
.retry_options
107+
.clone()
108+
};
96109
let delivery = recover_azure_operation(
97110
|| async move {
98-
let connection = self
99-
.recoverable_connection
100-
.upgrade()
101-
.ok_or_else(|| EventHubsError::from(ErrorKind::MissingConnection))?;
102-
103-
// Check for forced error.
104-
#[cfg(test)]
105-
connection.get_forced_error()?;
106-
107-
let receiver = connection
108-
.ensure_receiver(
109-
&self.source_url,
110-
&self.message_source,
111-
&self.receiver_options,
112-
)
113-
.await?;
111+
debug!("Starting receive_delivery operation");
112+
let receiver = {
113+
let connection = self
114+
.recoverable_connection
115+
.upgrade()
116+
.ok_or_else(|| EventHubsError::from(ErrorKind::MissingConnection))?;
117+
118+
// Check for forced error.
119+
#[cfg(test)]
120+
connection.get_forced_error()?;
121+
122+
connection
123+
.ensure_receiver(
124+
&self.source_url,
125+
&self.message_source,
126+
&self.receiver_options,
127+
)
128+
.await?
129+
};
114130
if let Some(delivery_timeout) = self.timeout {
115131
select! {
116132
delivery = receiver.receive_delivery().fuse() => Ok(delivery),
@@ -124,11 +140,7 @@ impl AmqpReceiverApis for RecoverableReceiver {
124140
receiver.receive_delivery().await
125141
}
126142
},
127-
&self
128-
.recoverable_connection
129-
.upgrade()
130-
.ok_or_else(|| EventHubsError::from(ErrorKind::MissingConnection))?
131-
.retry_options,
143+
&retry_options,
132144
Self::should_retry_receive_operation,
133145
Some(move |connection: Weak<RecoverableConnection>, reason| {
134146
let connection = connection.clone();

sdk/eventhubs/azure_messaging_eventhubs/src/consumer/event_receiver.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,9 @@ impl EventReceiver {
144144
self.connection.close_receiver(&self.source_url).await
145145
}
146146
}
147+
148+
impl Drop for EventReceiver {
149+
fn drop(&mut self) {
150+
trace!("Dropping EventReceiver for partition {}", self.partition_id);
151+
}
152+
}

sdk/eventhubs/azure_messaging_eventhubs/src/consumer/mod.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,19 @@ impl ConsumerClient {
148148
/// ```
149149
pub async fn close(self) -> Result<()> {
150150
trace!("Closing consumer client for {}.", self.endpoint);
151-
self.recoverable_connection.close_connection().await
151+
let recoverable_connection =
152+
Arc::try_unwrap(self.recoverable_connection).map_err(|_| {
153+
Error::with_message(
154+
AzureErrorKind::Other,
155+
"Could not close consumer recoverable connection, multiple references exist",
156+
)
157+
})?;
158+
trace!(
159+
"No references to connection, closing connection for {}.",
160+
self.endpoint
161+
);
162+
recoverable_connection.close_connection().await?;
163+
Ok(())
152164
}
153165

154166
/// Forces an error on the connection.
@@ -890,6 +902,12 @@ pub(crate) mod tests {
890902
)
891903
.await?;
892904

905+
if let Ok(consumer) = Arc::try_unwrap(consumer) {
906+
consumer.close().await?;
907+
} else {
908+
panic!("Consumer client has unresolved references.");
909+
}
910+
893911
Ok(())
894912
}
895913

@@ -935,6 +953,12 @@ pub(crate) mod tests {
935953
)
936954
.await?;
937955

956+
if let Ok(consumer) = Arc::try_unwrap(consumer) {
957+
consumer.close().await?;
958+
} else {
959+
panic!("Consumer client has unresolved references.");
960+
}
961+
938962
Ok(())
939963
}
940964
#[recorded::test(live)]

0 commit comments

Comments
 (0)