Skip to content

Commit 34910cb

Browse files
LarryOstermanheathsCopilot
authored
Added Reconnect support for Event Hubs (#2826)
* Refactoring to enable reconnection support * Refactored AMQP errors to distinguish between connection, link, and session disconnect; send_batch consumes the batch --------- Co-authored-by: Heath Stewart <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 142dc2b commit 34910cb

33 files changed

+1234
-344
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ cargo_metadata = "0.18.1"
8484
clap = { version = "4.4.16", features = ["derive"] }
8585
criterion = { version = "0.5", features = ["async_tokio"] }
8686
dyn-clone = "1.0"
87-
fe2o3-amqp = { version = "0.14", features = ["native-tls", "tracing", "uuid"] }
87+
fe2o3-amqp = { version = "0.14", features = ["native-tls", "uuid"] }
8888
fe2o3-amqp-ext = { version = "0.14" }
89-
fe2o3-amqp-management = { version = "0.14", features = ["tracing"] }
89+
fe2o3-amqp-management = { version = "0.14" }
9090
fe2o3-amqp-cbs = { version = "0.14" }
9191
fe2o3-amqp-types = { version = "0.14" }
9292
flate2 = "1.1.0"

sdk/core/azure_core_amqp/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Release History
22

3+
## 0.7.0 (Unreleased)
4+
5+
### Features Added
6+
7+
### Breaking Changes
8+
9+
- Distinguish remote disconnect and remote closed errors by origin
10+
11+
### Bugs Fixed
12+
313
## 0.6.0 (2025-08-01)
414

515
### Other Changes

sdk/core/azure_core_amqp/src/error.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,21 @@ use crate::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
1010
/// Type of AMQP error.
1111
pub enum AmqpErrorKind {
1212
AmqpDescribedError(AmqpDescribedError),
13+
1314
/// Remote peer closed the link
14-
ClosedByRemote(Box<dyn std::error::Error + Send + Sync>),
15+
LinkClosedByRemote(Box<dyn std::error::Error + Send + Sync>),
16+
/// Remote peer closed the session
17+
SessionClosedByRemote(Box<dyn std::error::Error + Send + Sync>),
18+
/// Remote peer closed the connection
19+
ConnectionClosedByRemote(Box<dyn std::error::Error + Send + Sync>),
20+
21+
/// Remote peer detached the link
22+
LinkDetachedByRemote(Box<dyn std::error::Error + Send + Sync>),
23+
/// Remote peer detached the session
24+
SessionDetachedByRemote(Box<dyn std::error::Error + Send + Sync>),
1525

16-
/// Remote peer detached
17-
DetachedByRemote(Box<dyn std::error::Error + Send + Sync>),
26+
/// Remote peer detached the connection
27+
ConnectionDetachedByRemote(Box<dyn std::error::Error + Send + Sync>),
1828

1929
/// The send request was rejected by the remote peer.
2030
NonTerminalDeliveryState,
@@ -83,6 +93,7 @@ create_extensible_enum!(
8393
SessionCannotBeLocked,
8494
"com.microsoft:session-cannot-be-locked"
8595
),
96+
(EntityUpdated, "com.microsoft:entity-updated"),
8697
(MessageNotFound, "com.microsoft:message-not-found"),
8798
(SessionNotFound, "com.microsoft:session-not-found"),
8899
(EntityAlreadyExists, "com.microsoft:entity-already-exists"),
@@ -182,8 +193,12 @@ impl std::error::Error for AmqpError {
182193
match &self.kind {
183194
AmqpErrorKind::TransportImplementationError(s)
184195
| AmqpErrorKind::DetachError(s)
185-
| AmqpErrorKind::ClosedByRemote(s)
186-
| AmqpErrorKind::DetachedByRemote(s)
196+
| AmqpErrorKind::LinkClosedByRemote(s)
197+
| AmqpErrorKind::LinkDetachedByRemote(s)
198+
| AmqpErrorKind::SessionClosedByRemote(s)
199+
| AmqpErrorKind::SessionDetachedByRemote(s)
200+
| AmqpErrorKind::ConnectionClosedByRemote(s)
201+
| AmqpErrorKind::ConnectionDetachedByRemote(s)
187202
| AmqpErrorKind::LinkStateError(s)
188203
| AmqpErrorKind::ConnectionDropped(s) => Some(s.as_ref()),
189204
AmqpErrorKind::ManagementStatusCode(_, _) => None,
@@ -211,11 +226,23 @@ impl std::fmt::Display for AmqpError {
211226
write!(f, "Management API returned status code: {}", status_code,)
212227
}
213228
}
214-
AmqpErrorKind::DetachedByRemote(err) => {
215-
write!(f, "Remote detached with error: {}", err)
229+
AmqpErrorKind::ConnectionDetachedByRemote(err) => {
230+
write!(f, "Remote connection detached with error: {}", err)
231+
}
232+
AmqpErrorKind::LinkDetachedByRemote(err) => {
233+
write!(f, "Remote link detached with error: {}", err)
234+
}
235+
AmqpErrorKind::SessionDetachedByRemote(err) => {
236+
write!(f, "Remote session detached with error: {}", err)
237+
}
238+
AmqpErrorKind::LinkClosedByRemote(err) => {
239+
write!(f, "Remote link closed with error: {}", err)
240+
}
241+
AmqpErrorKind::SessionClosedByRemote(err) => {
242+
write!(f, "Remote session closed with error: {}", err)
216243
}
217-
AmqpErrorKind::ClosedByRemote(err) => {
218-
write!(f, "Remote closed with error: {}", err)
244+
AmqpErrorKind::ConnectionClosedByRemote(err) => {
245+
write!(f, "Remote connection closed with error: {}", err)
219246
}
220247
AmqpErrorKind::DetachError(err) => {
221248
write!(f, "AMQP Detach Error: {} ", err)

sdk/core/azure_core_amqp/src/fe2o3/connection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl From<Fe2o3ConnectionOpenError> for azure_core::Error {
187187
azure_core::Error::from(parse_error)
188188
}
189189
fe2o3_amqp::connection::OpenError::RemoteClosed => {
190-
AmqpErrorKind::ClosedByRemote(Box::new(e.0)).into()
190+
AmqpErrorKind::ConnectionClosedByRemote(Box::new(e.0)).into()
191191
}
192192
fe2o3_amqp::connection::OpenError::RemoteClosedWithError(error) => {
193193
AmqpErrorKind::AmqpDescribedError(error.into()).into()
@@ -207,7 +207,7 @@ impl From<Fe2o3ConnectionError> for azure_core::Error {
207207
azure_core::Error::from(Fe2o3TransportError(error))
208208
}
209209
fe2o3_amqp::connection::Error::RemoteClosed => {
210-
AmqpErrorKind::ClosedByRemote(Box::new(e.0)).into()
210+
AmqpErrorKind::ConnectionClosedByRemote(Box::new(e.0)).into()
211211
}
212212
fe2o3_amqp::connection::Error::RemoteClosedWithError(error) => {
213213
AmqpErrorKind::AmqpDescribedError(error.into()).into()

sdk/core/azure_core_amqp/src/fe2o3/error.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,13 @@ impl From<fe2o3_amqp::link::DetachError> for AmqpError {
115115
fn from(e: fe2o3_amqp::link::DetachError) -> Self {
116116
match e {
117117
fe2o3_amqp::link::DetachError::DetachedByRemote => {
118-
Self::from(AmqpErrorKind::DetachedByRemote(Box::new(e)))
118+
Self::from(AmqpErrorKind::LinkDetachedByRemote(Box::new(e)))
119119
}
120120
fe2o3_amqp::link::DetachError::RemoteDetachedWithError(error) => {
121121
Self::from(AmqpErrorKind::AmqpDescribedError(error.into()))
122122
}
123123
fe2o3_amqp::link::DetachError::ClosedByRemote => {
124-
Self::from(AmqpErrorKind::ClosedByRemote(Box::new(e)))
124+
Self::from(AmqpErrorKind::LinkClosedByRemote(Box::new(e)))
125125
}
126126
fe2o3_amqp::link::DetachError::RemoteClosedWithError(error) => {
127127
Self::from(AmqpErrorKind::AmqpDescribedError(error.into()))
@@ -141,10 +141,10 @@ impl From<fe2o3_amqp::link::LinkStateError> for AmqpError {
141141
AmqpErrorKind::AmqpDescribedError(e.into()).into()
142142
}
143143
fe2o3_amqp::link::LinkStateError::RemoteClosed => {
144-
AmqpErrorKind::ClosedByRemote(Box::new(e)).into()
144+
AmqpErrorKind::LinkClosedByRemote(Box::new(e)).into()
145145
}
146146
fe2o3_amqp::link::LinkStateError::RemoteDetached => {
147-
AmqpErrorKind::DetachedByRemote(Box::new(e)).into()
147+
AmqpErrorKind::LinkDetachedByRemote(Box::new(e)).into()
148148
}
149149
_ => AmqpErrorKind::LinkStateError(e.into()).into(),
150150
}

sdk/core/azure_core_amqp/src/fe2o3/receiver.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
8888
match res {
8989
Ok(_) => Ok(()),
9090
Err(e) => match e.kind() {
91-
AmqpErrorKind::ClosedByRemote(_) => {
91+
AmqpErrorKind::LinkClosedByRemote(_)
92+
| AmqpErrorKind::SessionClosedByRemote(_)
93+
| AmqpErrorKind::ConnectionClosedByRemote(_) => {
9294
info!("Error detaching receiver: {:?} - ignored", e);
9395
Ok(())
9496
}

sdk/core/azure_core_amqp/src/fe2o3/sender.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
107107
match res {
108108
Ok(_) => Ok(()),
109109
Err(e) => match e.kind() {
110-
AmqpErrorKind::ClosedByRemote(_) => {
110+
AmqpErrorKind::LinkClosedByRemote(_)
111+
| AmqpErrorKind::SessionClosedByRemote(_)
112+
| AmqpErrorKind::ConnectionClosedByRemote(_) => {
111113
info!("Error detaching sender: {:?}", e);
112114
Ok(())
113115
}

sdk/core/azure_core_amqp/src/fe2o3/session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl From<fe2o3_amqp::session::BeginError> for AmqpError {
149149
AmqpErrorKind::ConnectionDropped(Box::new(e)).into()
150150
}
151151
fe2o3_amqp::session::BeginError::RemoteEnded => {
152-
AmqpErrorKind::ClosedByRemote(Box::new(e)).into()
152+
AmqpErrorKind::SessionClosedByRemote(Box::new(e)).into()
153153
}
154154
fe2o3_amqp::session::BeginError::RemoteEndedWithError(error) => {
155155
AmqpErrorKind::AmqpDescribedError(error.into()).into()
@@ -173,7 +173,7 @@ impl From<fe2o3_amqp::session::Error> for AmqpError {
173173
AmqpErrorKind::TransportImplementationError(Box::new(e)).into()
174174
}
175175
fe2o3_amqp::session::Error::RemoteEnded => {
176-
AmqpErrorKind::ClosedByRemote(Box::new(e)).into()
176+
AmqpErrorKind::SessionClosedByRemote(Box::new(e)).into()
177177
}
178178
fe2o3_amqp::session::Error::RemoteEndedWithError(error) => {
179179
AmqpErrorKind::AmqpDescribedError(error.into()).into()

sdk/eventhubs/azure_messaging_eventhubs/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44

55
### Features Added
66

7+
- Reconnect support for EventHubs operations.
8+
79
### Breaking Changes
810

11+
- `ProducerClient::send_batch` now consumes its `batch` argument.
12+
- `RetryOptions::max_retries` is a `u32` not a `usize`.
13+
914
### Bugs Fixed
1015

1116
### Other Changes

0 commit comments

Comments
 (0)