Skip to content

Commit 4a9a4ab

Browse files
Implement Recoverable connection for AMQP operations. (Azure#2657)
This PR does three different things: It renames the ConnectionManager type to RecoverableConnection to better reflect what is going on. It splits out the authorization functionality from ConnectionManager into a new struct named Authorizer to simplify the logic in connection_manager.rs It consolidates all the AMQP active protocol interactions to RecoverableConnection. This was done to (a) simplify the AMQP interactions for each of the specific protocol elements and (b) lay the foundation for connection, session, and link recovery. See the rather large comment on the RecoverableConnection for more details on how this all works and fits together. There are a few bonus changes in this PR as well: Adds a send_ref function to the AmqpSender struct which allows sending a reference to a message (saving a message clone). AmqpOrderedMap::iter() passes references to the inner elements to be consistent with the standard collection iter() method.
1 parent 84e3653 commit 4a9a4ab

37 files changed

+1677
-1232
lines changed

sdk/core/azure_core_amqp/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
### Breaking Changes
88

9+
- `AmqpClaimsBasedSecurity` now takes ownership of the associated session rather than simply referencing the associated session. This means that all CBS authentication operations should be performed on dedicated AmqpSession objects.
10+
11+
- `AmqpOrderedMap::iter` now iterates over references to key and value, not clones of the key and value, thus eliminating unnecessary clones.
12+
913
### Bugs Fixed
1014

1115
### Other Changes

sdk/core/azure_core_amqp/src/cbs.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4-
use async_trait::async_trait;
5-
use azure_core::error::Result;
6-
74
use super::session::AmqpSession;
5+
use azure_core::{credentials::Secret, error::Result};
86

97
#[cfg(all(feature = "fe2o3_amqp", not(target_arch = "wasm32")))]
10-
type CbsImplementation<'a> = super::fe2o3::cbs::Fe2o3ClaimsBasedSecurity<'a>;
8+
type CbsImplementation = super::fe2o3::cbs::Fe2o3ClaimsBasedSecurity;
119

1210
#[cfg(any(not(any(feature = "fe2o3_amqp")), target_arch = "wasm32"))]
13-
type CbsImplementation<'a> = super::noop::NoopAmqpClaimsBasedSecurity<'a>;
11+
type CbsImplementation = super::noop::NoopAmqpClaimsBasedSecurity;
1412

15-
#[async_trait]
13+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
14+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1615
pub trait AmqpClaimsBasedSecurityApis {
1716
/// Asynchronously attaches the Claims-Based Security (CBS) node to the AMQP session.
1817
///
@@ -52,30 +51,31 @@ pub trait AmqpClaimsBasedSecurityApis {
5251
&self,
5352
path: String,
5453
token_type: Option<String>,
55-
secret: String,
54+
secret: &Secret,
5655
expires_on: time::OffsetDateTime,
5756
) -> Result<()>;
5857
}
5958

60-
pub struct AmqpClaimsBasedSecurity<'a> {
61-
implementation: CbsImplementation<'a>,
59+
pub struct AmqpClaimsBasedSecurity {
60+
implementation: CbsImplementation,
6261
}
6362

64-
impl<'a> AmqpClaimsBasedSecurity<'a> {
65-
pub fn new(session: &'a AmqpSession) -> Result<Self> {
63+
impl AmqpClaimsBasedSecurity {
64+
pub fn new(session: AmqpSession) -> Result<Self> {
6665
Ok(Self {
6766
implementation: CbsImplementation::new(session)?,
6867
})
6968
}
7069
}
7170

72-
#[async_trait]
73-
impl AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'_> {
71+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
72+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
73+
impl AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity {
7474
async fn authorize_path(
7575
&self,
7676
path: String,
7777
token_type: Option<String>,
78-
secret: String,
78+
secret: &Secret,
7979
expires_on: time::OffsetDateTime,
8080
) -> Result<()> {
8181
self.implementation

sdk/core/azure_core_amqp/src/connection.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT license.
33

44
use super::value::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
5-
use async_trait::async_trait;
65
use azure_core::{error::Result, http::Url};
76
use std::fmt::Debug;
87
use time::Duration;
@@ -40,7 +39,8 @@ pub struct AmqpConnectionOptions {
4039

4140
impl AmqpConnectionOptions {}
4241

43-
#[async_trait]
42+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
43+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
4444
pub trait AmqpConnectionApis {
4545
async fn open(
4646
&self,
@@ -62,7 +62,8 @@ pub struct AmqpConnection {
6262
pub(crate) implementation: ConnectionImplementation,
6363
}
6464

65-
#[async_trait]
65+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
66+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
6667
impl AmqpConnectionApis for AmqpConnection {
6768
async fn open(
6869
&self,

sdk/core/azure_core_amqp/src/fe2o3/cbs.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,21 @@
33
// cspell:: words amqp servicebus sastoken
44

55
use crate::{cbs::AmqpClaimsBasedSecurityApis, session::AmqpSession, AmqpError};
6-
use async_trait::async_trait;
7-
use azure_core::error::Result;
6+
use azure_core::{credentials::Secret, error::Result};
87
use fe2o3_amqp_cbs::token::CbsToken;
98
use fe2o3_amqp_types::primitives::Timestamp;
109
use std::borrow::BorrowMut;
1110
use std::sync::OnceLock;
1211
use tokio::sync::Mutex;
1312
use tracing::{debug, trace};
1413

15-
pub(crate) struct Fe2o3ClaimsBasedSecurity<'a> {
14+
pub(crate) struct Fe2o3ClaimsBasedSecurity {
1615
cbs: OnceLock<Mutex<fe2o3_amqp_cbs::client::CbsClient>>,
17-
session: &'a AmqpSession,
16+
session: AmqpSession,
1817
}
1918

20-
impl<'a> Fe2o3ClaimsBasedSecurity<'a> {
21-
pub fn new(session: &'a AmqpSession) -> Result<Self> {
19+
impl Fe2o3ClaimsBasedSecurity {
20+
pub fn new(session: AmqpSession) -> Result<Self> {
2221
Ok(Self {
2322
cbs: OnceLock::new(),
2423
session,
@@ -45,16 +44,17 @@ impl<'a> Fe2o3ClaimsBasedSecurity<'a> {
4544
}
4645
}
4746

48-
impl Fe2o3ClaimsBasedSecurity<'_> {}
47+
impl Fe2o3ClaimsBasedSecurity {}
4948

50-
impl Drop for Fe2o3ClaimsBasedSecurity<'_> {
49+
impl Drop for Fe2o3ClaimsBasedSecurity {
5150
fn drop(&mut self) {
5251
debug!("Dropping Fe2o3ClaimsBasedSecurity.");
5352
}
5453
}
5554

56-
#[async_trait]
57-
impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
55+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
56+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
57+
impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity {
5858
async fn attach(&self) -> Result<()> {
5959
let session = self.session.implementation.get()?;
6060
let mut session = session.lock().await;
@@ -80,7 +80,7 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
8080
&self,
8181
path: String,
8282
token_type: Option<String>,
83-
secret: String,
83+
secret: &Secret,
8484
expires_at: time::OffsetDateTime,
8585
) -> Result<()> {
8686
trace!(
@@ -89,7 +89,7 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
8989
expires_at
9090
);
9191
let cbs_token = CbsToken::new(
92-
secret,
92+
secret.secret(),
9393
token_type.unwrap_or("jwt".to_string()),
9494
Some(Timestamp::from(
9595
expires_at

sdk/core/azure_core_amqp/src/fe2o3/connection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use super::error::{Fe2o3ConnectionError, Fe2o3ConnectionOpenError, Fe2o3Transpor
55
use crate::connection::{AmqpConnectionApis, AmqpConnectionOptions};
66
use crate::error::AmqpErrorKind;
77
use crate::value::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
8-
use async_trait::async_trait;
98
use azure_core::{http::Url, Result};
109
use fe2o3_amqp::connection::ConnectionHandle;
1110
use std::{borrow::BorrowMut, sync::OnceLock};
@@ -45,7 +44,8 @@ impl Drop for Fe2o3AmqpConnection {
4544
}
4645
}
4746

48-
#[async_trait]
47+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
48+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
4949
impl AmqpConnectionApis for Fe2o3AmqpConnection {
5050
async fn open(
5151
&self,

sdk/core/azure_core_amqp/src/fe2o3/management.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ use crate::{
1111
value::{AmqpOrderedMap, AmqpValue},
1212
AmqpError,
1313
};
14-
use async_trait::async_trait;
1514
use azure_core::{credentials::AccessToken, Result};
1615
use fe2o3_amqp_management::operations::ReadResponse;
17-
use fe2o3_amqp_types::{messaging::ApplicationProperties, primitives::SimpleValue};
1816
use std::sync::{Arc, OnceLock};
1917
use tokio::sync::Mutex;
2018
use tracing::debug;
@@ -64,7 +62,8 @@ impl Fe2o3AmqpManagement {
6462
}
6563
}
6664

67-
#[async_trait]
65+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
66+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
6867
impl AmqpManagementApis for Fe2o3AmqpManagement {
6968
async fn attach(&self) -> Result<()> {
7069
let management = fe2o3_amqp_management::client::MgmtClient::builder()
@@ -185,21 +184,24 @@ impl fe2o3_amqp_management::Request for WithApplicationPropertiesRequest<'_> {
185184
fn encode_application_properties(
186185
&mut self,
187186
) -> Option<fe2o3_amqp_types::messaging::ApplicationProperties> {
188-
let builder = ApplicationProperties::builder();
189-
let builder = self
187+
let mut fe2o3_application_properties = self
190188
.application_properties
191189
.iter()
192-
.fold(builder, |builder, (key, value)| {
193-
builder.insert(
190+
.map(|(key, value)| {
191+
(
194192
key.clone(),
195193
Into::<fe2o3_amqp_types::primitives::SimpleValue>::into(value),
196194
)
197195
})
198-
.insert(
199-
"security_token",
200-
Into::<SimpleValue>::into(self.access_token.token.secret()),
201-
);
202-
Some(builder.build())
196+
.collect::<fe2o3_amqp_types::primitives::OrderedMap<_, _>>();
197+
fe2o3_application_properties.insert(
198+
"security_token".to_string(),
199+
fe2o3_amqp_types::primitives::SimpleValue::from(self.access_token.token.secret()),
200+
);
201+
202+
Some(fe2o3_amqp_types::messaging::ApplicationProperties(
203+
fe2o3_application_properties,
204+
))
203205
}
204206
fn encode_body(self) -> Self::Body {}
205207
}

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,19 @@ impl From<AmqpAnnotationKey> for fe2o3_amqp_types::messaging::annotations::Owned
253253
}
254254
}
255255

256+
impl From<&AmqpAnnotationKey> for fe2o3_amqp_types::messaging::annotations::OwnedKey {
257+
fn from(key: &AmqpAnnotationKey) -> Self {
258+
match key {
259+
AmqpAnnotationKey::Ulong(key) => {
260+
fe2o3_amqp_types::messaging::annotations::OwnedKey::Ulong(*key)
261+
}
262+
AmqpAnnotationKey::Symbol(key) => {
263+
fe2o3_amqp_types::messaging::annotations::OwnedKey::Symbol(key.into())
264+
}
265+
}
266+
}
267+
}
268+
256269
impl From<&fe2o3_amqp_types::messaging::annotations::OwnedKey>
257270
for crate::messaging::AmqpAnnotationKey
258271
{

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl From<AmqpSource> for fe2o3_amqp_types::messaging::Source {
3232
builder = builder.dynamic_node_properties(
3333
dynamic_node_properties
3434
.iter()
35-
.map(|(k, v)| (k.0.into(), v.into()))
35+
.map(|(k, v)| (k.into(), v.into()))
3636
.collect::<fe2o3_amqp_types::definitions::Fields>(),
3737
);
3838
}
@@ -43,7 +43,7 @@ impl From<AmqpSource> for fe2o3_amqp_types::messaging::Source {
4343
builder = builder.filter(
4444
filter
4545
.iter()
46-
.map(|(k, v)| (k.0.into(), v.into()))
46+
.map(|(k, v)| (k.into(), v.into()))
4747
.collect::<fe2o3_amqp_types::messaging::FilterSet>(),
4848
);
4949
}

sdk/core/azure_core_amqp/src/fe2o3/messaging/messaging_types.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33

44
use std::sync::OnceLock;
55

6+
use typespec_macros::SafeDebug;
7+
68
use crate::messaging::{
79
AmqpDelivery, AmqpDeliveryApis, AmqpMessage, AmqpOutcome, DeliveryNumber, DeliveryTag,
810
DistributionMode, TerminusDurability, TerminusExpiryPolicy,
911
};
10-
12+
#[derive(SafeDebug)]
13+
#[safe(true)]
1114
pub(crate) struct Fe2o3AmqpDelivery {
1215
pub(crate) delivery: fe2o3_amqp::link::delivery::Delivery<
1316
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
1417
>,
18+
#[safe(false)]
1519
message: OnceLock<AmqpMessage>,
1620
delivery_tag: OnceLock<DeliveryTag>,
1721
}

sdk/core/azure_core_amqp/src/fe2o3/receiver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::{
88
session::AmqpSession,
99
AmqpError,
1010
};
11-
use async_trait::async_trait;
1211
use azure_core::error::Result;
1312
use std::borrow::BorrowMut;
1413
use std::sync::OnceLock;
@@ -44,7 +43,8 @@ impl From<&fe2o3_amqp::link::receiver::CreditMode> for ReceiverCreditMode {
4443
}
4544
}
4645

47-
#[async_trait]
46+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
47+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
4848
impl AmqpReceiverApis for Fe2o3AmqpReceiver {
4949
async fn attach(
5050
&self,

0 commit comments

Comments
 (0)