Skip to content

Commit 0a29a27

Browse files
Refactored RecoverableConnection (Azure#2670)
* Convert jitter from seconds to duration * Refactored recoverable connection into individual types, one per category
1 parent 5d74ed4 commit 0a29a27

File tree

13 files changed

+1176
-1039
lines changed

13 files changed

+1176
-1039
lines changed

sdk/eventhubs/azure_messaging_eventhubs/src/common/authorizer.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4-
use super::recoverable_connection::RecoverableConnection;
4+
use super::recoverable::RecoverableConnection;
55
use crate::error::{ErrorKind, EventHubsError};
66
use async_lock::Mutex as AsyncMutex;
77
use azure_core::{
@@ -16,20 +16,20 @@ use rand::{thread_rng, Rng};
1616
use std::collections::HashMap;
1717
use std::sync::{Arc, Mutex as SyncMutex, OnceLock, Weak};
1818
use time::{Duration, OffsetDateTime};
19-
use tracing::{debug, error, trace};
19+
use tracing::{debug, trace, warn};
2020

2121
// The number of seconds before token expiration that we wake up to refresh the token.
2222
const TOKEN_REFRESH_BIAS: Duration = Duration::minutes(6); // By default, we refresh tokens 6 minutes before they expire.
23-
const TOKEN_REFRESH_JITTER_MIN: i64 = -5; // Minimum jitter in seconds
24-
const TOKEN_REFRESH_JITTER_MAX: i64 = 5; // Maximum jitter in seconds
23+
const TOKEN_REFRESH_JITTER_MIN: Duration = Duration::seconds(-5); // Minimum jitter in seconds
24+
const TOKEN_REFRESH_JITTER_MAX: Duration = Duration::seconds(5); // Maximum jitter in seconds
2525

2626
const EVENTHUBS_AUTHORIZATION_SCOPE: &str = "https://eventhubs.azure.net/.default";
2727

2828
#[derive(Debug)]
2929
struct TokenRefreshTimes {
3030
before_expiration_refresh_time: Duration,
31-
jitter_min: i64,
32-
jitter_max: i64,
31+
jitter_min: Duration,
32+
jitter_max: Duration,
3333
}
3434

3535
impl Default for TokenRefreshTimes {
@@ -174,7 +174,7 @@ impl Authorizer {
174174
async fn refresh_tokens_task(self: Arc<Self>) {
175175
let result = self.refresh_tokens().await;
176176
if let Err(e) = result {
177-
error!("Error refreshing tokens: {e}");
177+
warn!(err=?e, "Error refreshing tokens: {e}");
178178
}
179179
debug!("Token refresher task completed.");
180180
}
@@ -250,10 +250,10 @@ impl Authorizer {
250250

251251
debug!("Token refresh times: {token_refresh_times:?}");
252252

253-
let expiration_jitter = Duration::seconds(
254-
thread_rng()
255-
.gen_range(token_refresh_times.jitter_min..token_refresh_times.jitter_max),
256-
);
253+
let jitter_min = token_refresh_times.jitter_min.whole_milliseconds() as i64;
254+
let jitter_max = token_refresh_times.jitter_max.whole_milliseconds() as i64;
255+
let expiration_jitter =
256+
Duration::milliseconds(thread_rng().gen_range(jitter_min..jitter_max));
257257
debug!("Expiration jitter: {expiration_jitter}");
258258

259259
token_refresh_bias = token_refresh_times
@@ -543,8 +543,8 @@ mod tests {
543543
authorizer
544544
.set_token_refresh_times(TokenRefreshTimes {
545545
before_expiration_refresh_time: Duration::seconds(10),
546-
jitter_min: -2,
547-
jitter_max: 2,
546+
jitter_min: Duration::seconds(-2),
547+
jitter_max: Duration::seconds(2),
548548
})
549549
.unwrap();
550550

@@ -611,8 +611,8 @@ mod tests {
611611
authorizer
612612
.set_token_refresh_times(TokenRefreshTimes {
613613
before_expiration_refresh_time: Duration::seconds(5),
614-
jitter_min: -1,
615-
jitter_max: 1,
614+
jitter_min: Duration::milliseconds(-500),
615+
jitter_max: Duration::milliseconds(500),
616616
})
617617
.unwrap();
618618

sdk/eventhubs/azure_messaging_eventhubs/src/common/management.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4-
use super::recoverable_connection::RecoverableConnection;
4+
use super::recoverable::RecoverableConnection;
55
use crate::{
66
error::{ErrorKind, EventHubsError},
77
models::{EventHubPartitionProperties, EventHubProperties},

sdk/eventhubs/azure_messaging_eventhubs/src/common/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
pub(crate) mod authorizer;
55
pub(crate) mod management;
6-
pub(crate) mod recoverable_connection;
6+
pub(crate) mod recoverable;
77
pub mod retry;
88
pub(crate) mod user_agent;
99

1010
// Public API
1111
pub(crate) use management::ManagementInstance;
12-
pub(crate) use retry::{retry_azure_operation, RetryOptions};
12+
pub(crate) use retry::retry_azure_operation;
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (c) Microsoft Corporation. All Rights reserved
2+
// Licensed under the MIT license.
3+
4+
use super::RecoverableConnection;
5+
use crate::{common::retry_azure_operation, RetryOptions};
6+
use azure_core::{credentials::Secret, error::ErrorKind as AzureErrorKind, error::Result};
7+
use azure_core_amqp::{
8+
AmqpClaimsBasedSecurity, AmqpClaimsBasedSecurityApis, AmqpConnection, AmqpError, AmqpSession,
9+
AmqpSessionApis,
10+
};
11+
use std::error::Error;
12+
use std::sync::Arc;
13+
use tracing::{debug, warn};
14+
15+
/// Thin wrapper around the [`AmqpClaimsBasedSecurityApis`] trait that implements the retry functionality.
16+
///
17+
/// A RecoverableClaimsBasedSecurity is a thin wrapper around the [`AmqpClaimsBasedSecurityApis`] trait which implements
18+
/// the retry functionality. That allows implementations which call into the authorize_path API to not have
19+
/// to worry about retrying the operation themselves.
20+
pub(crate) struct RecoverableClaimsBasedSecurity {
21+
recoverable_connection: Arc<RecoverableConnection>,
22+
}
23+
24+
impl RecoverableClaimsBasedSecurity {
25+
/// Creates a new RecoverableClaimsBasedSecurity.
26+
///
27+
/// # Arguments
28+
///
29+
/// * `recoverable_connection` - The recoverable connection to use for authorization.
30+
pub(super) fn new(recoverable_connection: Arc<RecoverableConnection>) -> Self {
31+
Self {
32+
recoverable_connection,
33+
}
34+
}
35+
36+
pub(super) async fn create_claims_based_security(
37+
connection: Arc<AmqpConnection>,
38+
retry_options: &RetryOptions,
39+
) -> Result<Arc<AmqpClaimsBasedSecurity>> {
40+
retry_azure_operation(
41+
|| async {
42+
let session = AmqpSession::new();
43+
session.begin(connection.as_ref(), None).await?;
44+
45+
let claims_based_security = Arc::new(AmqpClaimsBasedSecurity::new(session)?);
46+
47+
// Attach the claims_based_security client to the session.
48+
claims_based_security.attach().await?;
49+
Ok(claims_based_security)
50+
},
51+
retry_options,
52+
Some(Self::should_retry_claims_based_security_response),
53+
)
54+
.await
55+
}
56+
57+
fn should_retry_claims_based_security_response(e: &azure_core::Error) -> bool {
58+
match e.kind() {
59+
AzureErrorKind::Amqp => {
60+
warn!(err=?e, "Amqp operation failed: {:?}", e.source());
61+
if let Some(e) = e.source() {
62+
debug!(err=?e, "Error: {e}");
63+
64+
if let Some(amqp_error) = e.downcast_ref::<Box<AmqpError>>() {
65+
RecoverableConnection::should_retry_amqp_error(amqp_error)
66+
} else if let Some(amqp_error) = e.downcast_ref::<AmqpError>() {
67+
RecoverableConnection::should_retry_amqp_error(amqp_error)
68+
} else {
69+
debug!(err=?e, "Non AMQP error: {e}");
70+
false
71+
}
72+
} else {
73+
debug!("No source error found");
74+
false
75+
}
76+
}
77+
_ => {
78+
debug!(err=?e, "Non AMQP error: {e}");
79+
false
80+
}
81+
}
82+
}
83+
}
84+
85+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
86+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
87+
impl AmqpClaimsBasedSecurityApis for RecoverableClaimsBasedSecurity {
88+
async fn authorize_path(
89+
&self,
90+
path: String,
91+
token_type: Option<String>,
92+
secret: &Secret,
93+
expires_on: time::OffsetDateTime,
94+
) -> Result<()> {
95+
let result = retry_azure_operation(
96+
|| {
97+
let path = path.clone();
98+
let token_type = token_type.clone();
99+
let secret = secret.clone();
100+
101+
async move {
102+
let claims_based_security_client =
103+
self.recoverable_connection.ensure_amqp_cbs().await?;
104+
claims_based_security_client
105+
.authorize_path(path, token_type, &secret, expires_on)
106+
.await
107+
}
108+
},
109+
&self.recoverable_connection.retry_options,
110+
Some(Self::should_retry_claims_based_security_response),
111+
)
112+
.await?;
113+
Ok(result)
114+
}
115+
116+
async fn attach(&self) -> azure_core::Result<()> {
117+
unimplemented!("AmqpClaimsBasedSecurityClient does not support attach operation");
118+
}
119+
120+
async fn detach(self) -> azure_core::Result<()> {
121+
unimplemented!("AmqpClaimsBasedSecurityClient does not support detach operation");
122+
}
123+
}

0 commit comments

Comments
 (0)