Skip to content

Commit 69301e0

Browse files
First attempt at "rationalizing" AMQP error codes. (Azure#2226)
* Started work to rationalize AMQP errors
1 parent 0d804d9 commit 69301e0

File tree

29 files changed

+1039
-538
lines changed

29 files changed

+1039
-538
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ cargo_metadata = "0.18.1"
7676
clap = { version = "4.4.16", features = ["derive"] }
7777
dyn-clone = "1.0"
7878
fe2o3-amqp = { version = "0.12", features = ["native-tls", "tracing", "uuid"] }
79-
fe2o3-amqp-ext = { version = "0.12", features = [] }
80-
fe2o3-amqp-management = { version = "0.12" }
79+
fe2o3-amqp-ext = { version = "0.12" }
80+
fe2o3-amqp-management = { version = "0.12", features = ["tracing"] }
8181
fe2o3-amqp-cbs = { version = "0.12" }
8282
fe2o3-amqp-types = { version = "0.12" }
8383
futures = "0.3"

sdk/core/azure_core_amqp/.dict.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mgmt
22
sasl
3+
amqps
34
sastoken
45
smallulong
56
smalluint

sdk/core/azure_core_amqp/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
[package]
55
name = "azure_core_amqp"
66
version = "0.1.0"
7-
description = "Rust client Azure SDK"
7+
description = "Rust client library for the AMQP protocol"
88
readme = "README.md"
99
authors = ["Microsoft Corp."]
1010
license = "MIT"
@@ -26,10 +26,12 @@ fe2o3-amqp-management = { workspace = true, optional = true }
2626
fe2o3-amqp-types = { workspace = true, optional = true }
2727
serde_amqp = { workspace = true, optional = true }
2828
serde_bytes = { workspace = true, optional = true }
29+
typespec = { workspace = true, features = ["amqp"] }
2930
time.workspace = true
3031
tokio.workspace = true
3132
tracing.workspace = true
3233
uuid = { workspace = true }
34+
url = { workspace = true }
3335

3436
[dev-dependencies]
3537
tracing-subscriber = { workspace = true, features = ["env-filter"] }

sdk/core/azure_core_amqp/src/connection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ mod tests {
292292

293293
#[tokio::test]
294294
async fn amqp_connection_close_with_error() {
295+
tracing_subscriber::fmt::init();
295296
let address = std::env::var("TEST_BROKER_ADDRESS");
296297
if address.is_ok() {
297298
let connection = AmqpConnection::new();
@@ -310,6 +311,7 @@ mod tests {
310311
match res {
311312
Ok(_) => {}
312313
Err(err) => {
314+
println!("Error: {:?}", err);
313315
assert!(err.to_string().contains("Internal error."));
314316
}
315317
}
Lines changed: 132 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,166 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4-
pub enum ErrorKind {
5-
AmqpReceiverAlreadyAttached,
6-
TransportImplementationError {
7-
source: Box<dyn std::error::Error + Send + Sync>,
8-
},
4+
pub use crate::sender::error::AmqpSenderError;
5+
use crate::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
6+
7+
/// Type of AMQP error.
8+
pub enum AmqpErrorKind {
9+
/// Remote peer closed the link
10+
ClosedByRemote(Option<AmqpDescribedError>),
11+
12+
/// Remote peer detached
13+
DetachedByRemote(Option<AmqpDescribedError>),
14+
15+
/// The connection was dropped.
16+
ConnectionDropped(Box<dyn std::error::Error + Send + Sync>),
17+
18+
/// Link State error.
19+
LinkStateError(Box<dyn std::error::Error + Send + Sync>),
20+
21+
FramingError(Box<dyn std::error::Error + Send + Sync>),
22+
IdleTimeoutElapsed(Box<dyn std::error::Error + Send + Sync>),
23+
24+
/// Transfer Limit Exceeded
25+
TransferLimitExceeded(Box<dyn std::error::Error + Send + Sync>),
26+
27+
/// Management Status code
28+
ManagementStatusCode(azure_core::StatusCode, Option<String>),
29+
30+
DetachError(Box<dyn std::error::Error + Send + Sync>),
31+
SenderError(AmqpSenderError),
32+
TransportImplementationError(Box<dyn std::error::Error + Send + Sync>),
33+
}
34+
35+
#[derive(Debug, Clone)]
36+
pub struct AmqpDescribedError {
37+
condition: AmqpSymbol,
38+
description: Option<String>,
39+
info: AmqpOrderedMap<AmqpSymbol, AmqpValue>,
40+
}
41+
42+
impl AmqpDescribedError {
43+
pub fn new(
44+
condition: AmqpSymbol,
45+
description: Option<String>,
46+
info: AmqpOrderedMap<AmqpSymbol, AmqpValue>,
47+
) -> Self {
48+
Self {
49+
condition,
50+
description,
51+
info,
52+
}
53+
}
54+
55+
pub fn condition(&self) -> &AmqpSymbol {
56+
&self.condition
57+
}
58+
pub fn description(&self) -> Option<&String> {
59+
self.description.as_ref()
60+
}
61+
pub fn info(&self) -> &AmqpOrderedMap<AmqpSymbol, AmqpValue> {
62+
&self.info
63+
}
64+
}
65+
66+
/// An AMQP error from the AMQP stack.
67+
pub struct AmqpError {
68+
/// Type of error.
69+
kind: AmqpErrorKind,
970
}
1071

11-
pub struct Error {
12-
kind: ErrorKind,
72+
impl AmqpError {
73+
pub fn kind(&self) -> &AmqpErrorKind {
74+
&self.kind
75+
}
1376
}
1477

15-
impl Error {
16-
pub fn new(kind: ErrorKind) -> Self {
78+
impl From<AmqpErrorKind> for AmqpError {
79+
fn from(kind: AmqpErrorKind) -> Self {
1780
Self { kind }
1881
}
1982
}
2083

21-
impl std::error::Error for Error {
84+
impl std::error::Error for AmqpError {
2285
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
2386
match &self.kind {
24-
ErrorKind::TransportImplementationError { source } => source.source(),
25-
_ => None,
87+
AmqpErrorKind::TransportImplementationError(s)
88+
| AmqpErrorKind::DetachError(s)
89+
| AmqpErrorKind::LinkStateError(s)
90+
| AmqpErrorKind::ConnectionDropped(s) => Some(s.as_ref()),
91+
AmqpErrorKind::SenderError(e) => e.source(),
92+
AmqpErrorKind::ManagementStatusCode(_, _) => None,
93+
AmqpErrorKind::ClosedByRemote(_) | AmqpErrorKind::DetachedByRemote(_) => None,
94+
AmqpErrorKind::TransferLimitExceeded(e) => Some(e.as_ref()),
95+
AmqpErrorKind::FramingError(e) => Some(e.as_ref()),
96+
AmqpErrorKind::IdleTimeoutElapsed(e) => Some(e.as_ref()),
2697
}
2798
}
2899
}
29100

30-
impl std::fmt::Display for Error {
101+
impl std::fmt::Display for AmqpError {
31102
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32103
match &self.kind {
33-
ErrorKind::AmqpReceiverAlreadyAttached => {
34-
f.write_str("AMQP Receiver is already attached")
104+
AmqpErrorKind::ManagementStatusCode(status_code, d) => {
105+
if let Some(d) = d {
106+
write!(
107+
f,
108+
"Management API returned status code: {} ({})",
109+
status_code, d
110+
)
111+
} else {
112+
write!(f, "Management API returned status code: {}", status_code,)
113+
}
114+
}
115+
AmqpErrorKind::DetachedByRemote(err) => {
116+
write!(f, "Remote detached with error: {:?}", err)
117+
}
118+
AmqpErrorKind::ClosedByRemote(err) => {
119+
write!(f, "Remote closed with error: {:?}", err)
120+
}
121+
AmqpErrorKind::DetachError(err) => {
122+
write!(f, "AMQP Detach Error: {} ", err)
123+
}
124+
AmqpErrorKind::TransportImplementationError(s) => {
125+
write!(f, "Transport Implementation Error: {}", s)
126+
}
127+
AmqpErrorKind::ConnectionDropped(s) => {
128+
write!(f, "Connection dropped: {}", s)
129+
}
130+
AmqpErrorKind::FramingError(s) => {
131+
write!(f, "Connection Framing error: {}", s)
132+
}
133+
AmqpErrorKind::IdleTimeoutElapsed(s) => {
134+
write!(f, "Connection Idle Timeout elapsed: {}", s)
135+
}
136+
AmqpErrorKind::SenderError(err) => {
137+
write!(f, "AMQP Sender Error: {} ", err)
138+
}
139+
AmqpErrorKind::LinkStateError(err) => {
140+
write!(f, "AMQP Link State Error: {} ", err)
35141
}
36-
ErrorKind::TransportImplementationError { source } => {
37-
write!(f, "Transport Implementation Error: {:?}", source)
142+
AmqpErrorKind::TransferLimitExceeded(e) => {
143+
write!(f, "AMQP Transfer Limit Exceeded: {e}")
38144
}
39145
}
40146
}
41147
}
42148

43-
impl std::fmt::Debug for Error {
149+
impl std::fmt::Debug for AmqpError {
44150
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45-
write!(f, "AMQP Transport Error: {}", self)
151+
write!(f, "AmqpError: {}", self)?;
152+
Ok(())
46153
}
47154
}
48155

49-
impl From<Error> for azure_core::Error {
50-
fn from(e: Error) -> Self {
51-
Self::new(azure_core::error::ErrorKind::Other, Box::new(e))
156+
impl From<AmqpError> for azure_core::Error {
157+
fn from(e: AmqpError) -> Self {
158+
Self::new(azure_core::error::ErrorKind::Amqp, Box::new(e))
52159
}
53160
}
54161

55-
impl From<ErrorKind> for azure_core::Error {
56-
fn from(e: ErrorKind) -> Self {
57-
Self::new(
58-
azure_core::error::ErrorKind::Other,
59-
Box::new(Error { kind: e }),
60-
)
162+
impl From<AmqpErrorKind> for azure_core::Error {
163+
fn from(e: AmqpErrorKind) -> Self {
164+
AmqpError::from(e).into()
61165
}
62166
}

sdk/core/azure_core_amqp/src/fe2o3/cbs.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// Licensed under the MIT license.
33
// cspell:: words amqp servicebus sastoken
44

5-
use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach};
6-
use crate::{cbs::AmqpClaimsBasedSecurityApis, session::AmqpSession};
5+
use crate::{
6+
cbs::AmqpClaimsBasedSecurityApis, fe2o3::error::Fe2o3ManagementError, session::AmqpSession,
7+
AmqpError,
8+
};
79
use azure_core::error::Result;
810
use fe2o3_amqp_cbs::token::CbsToken;
911
use fe2o3_amqp_types::primitives::Timestamp;
@@ -24,6 +26,25 @@ impl<'a> Fe2o3ClaimsBasedSecurity<'a> {
2426
session,
2527
})
2628
}
29+
30+
fn cbs_already_attached() -> azure_core::Error {
31+
azure_core::Error::message(
32+
azure_core::error::ErrorKind::Amqp,
33+
"Claims Based Security is already attached",
34+
)
35+
}
36+
fn cbs_not_set() -> azure_core::Error {
37+
azure_core::Error::message(
38+
azure_core::error::ErrorKind::Amqp,
39+
"Claims Based Security is not set",
40+
)
41+
}
42+
fn cbs_not_attached() -> azure_core::Error {
43+
azure_core::Error::message(
44+
azure_core::error::ErrorKind::Amqp,
45+
"Claims Based Security is not attached",
46+
)
47+
}
2748
}
2849

2950
impl Fe2o3ClaimsBasedSecurity<'_> {}
@@ -42,25 +63,17 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
4263
.client_node_addr("rust_amqp_cbs")
4364
.attach(session.borrow_mut())
4465
.await
45-
.map_err(AmqpManagementAttach::from)?;
46-
self.cbs.set(Mutex::new(cbs_client)).map_err(|_| {
47-
azure_core::Error::message(
48-
azure_core::error::ErrorKind::Other,
49-
"Claims Based Security is already set.",
50-
)
51-
})?;
66+
.map_err(|e| azure_core::Error::from(AmqpError::from(e)))?;
67+
self.cbs
68+
.set(Mutex::new(cbs_client))
69+
.map_err(|_| Self::cbs_already_attached())?;
5270
Ok(())
5371
}
5472

5573
async fn detach(mut self) -> Result<()> {
56-
let cbs = self.cbs.take().ok_or_else(|| {
57-
azure_core::Error::message(
58-
azure_core::error::ErrorKind::Other,
59-
"Claims Based Security was not set.",
60-
)
61-
})?;
74+
let cbs = self.cbs.take().ok_or(Self::cbs_not_set())?;
6275
let cbs = cbs.into_inner();
63-
cbs.close().await.map_err(AmqpLinkDetach::from)?;
76+
cbs.close().await.map_err(AmqpError::from)?;
6477
Ok(())
6578
}
6679

@@ -86,26 +99,28 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
8699
.checked_mul(1_000)
87100
.ok_or_else(|| {
88101
azure_core::Error::message(
89-
azure_core::error::ErrorKind::Other,
102+
azure_core::error::ErrorKind::Amqp,
90103
"Unable to convert time to unix timestamp.",
91104
)
92105
})?,
93106
)),
94107
);
95108
self.cbs
96109
.get()
97-
.ok_or_else(|| {
98-
azure_core::Error::message(
99-
azure_core::error::ErrorKind::Other,
100-
"Claims Based Security was not set.",
101-
)
102-
})?
110+
.ok_or::<azure_core::Error>(Self::cbs_not_attached())?
103111
.lock()
104112
.await
105113
.borrow_mut()
106114
.put_token(path, cbs_token)
107115
.await
108-
.map_err(AmqpManagement::from)?;
116+
.map_err(|e| {
117+
let me = azure_core::Error::try_from(Fe2o3ManagementError(e));
118+
if let Err(e) = me {
119+
debug!("Failed to convert management error to azure error: {:?}", e);
120+
return e;
121+
}
122+
me.unwrap()
123+
})?;
109124
Ok(())
110125
}
111126
}

0 commit comments

Comments
 (0)