Skip to content

Commit 11ae4ec

Browse files
Enable Custom Endpoints for EventHubs (#2245)
* Added ability to use AMQP proxy server; Refactored Eventhubs to consolidate common infrastructure * Removed uuid and url direct dependencies; fixed up AMQP cargo.toml file to properly declare the cplusplus feature
1 parent a03d206 commit 11ae4ec

File tree

28 files changed

+492
-421
lines changed

28 files changed

+492
-421
lines changed

.vscode/cspell.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
".vscode/tasks.json",
1818
"NOTICE.txt",
1919
"eng/",
20-
"*.dict.txt",
20+
"**/.dict.txt",
2121
"rust-toolchain.toml"
2222
],
2323
"words": [

Cargo.lock

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/azure_core_amqp/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,17 @@ fe2o3-amqp-management = { workspace = true, optional = true }
2626
fe2o3-amqp-types = { workspace = true, optional = true }
2727
serde_amqp = { workspace = true, optional = true }
2828
serde_bytes = { workspace = true, optional = true }
29-
typespec = { workspace = true, features = ["amqp"] }
3029
time.workspace = true
3130
tokio.workspace = true
3231
tracing.workspace = true
33-
uuid = { workspace = true }
34-
url = { workspace = true }
32+
typespec = { workspace = true, features = ["amqp"] }
3533

3634
[dev-dependencies]
3735
tracing-subscriber = { workspace = true, features = ["env-filter"] }
3836

3937
[features]
4038
default = ["fe2o3-amqp"]
39+
cplusplus = []
4140
fe2o3-amqp = [
4241
"dep:fe2o3-amqp",
4342
"fe2o3-amqp-types",
@@ -47,7 +46,6 @@ fe2o3-amqp = [
4746
"serde_amqp",
4847
"serde_bytes",
4948
]
50-
cplusplus = []
5149

5250
[package.metadata.docs.rs]
5351
features = ["fe2o3-amqp"]

sdk/core/azure_core_amqp/src/connection.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,29 @@ type ConnectionImplementation = super::fe2o3::connection::Fe2o3AmqpConnection;
1212
#[cfg(any(not(feature = "fe2o3-amqp"), target_arch = "wasm32"))]
1313
type ConnectionImplementation = super::noop::NoopAmqpConnection;
1414

15+
/// Options for configuring an AMQP connection.
1516
#[derive(Debug, Default, Clone)]
1617
pub struct AmqpConnectionOptions {
18+
/// Maximum frame size for the connection in bytes.
1719
pub max_frame_size: Option<u32>,
20+
/// Maximum number of channels for the connection.
1821
pub channel_max: Option<u16>,
22+
/// Idle timeout for the connection.
1923
pub idle_timeout: Option<Duration>,
24+
/// List of outgoing locales for the connection.
2025
pub outgoing_locales: Option<Vec<String>>,
26+
/// List of incoming locales for the connection.
2127
pub incoming_locales: Option<Vec<String>>,
28+
/// List of offered capabilities for the connection.
2229
pub offered_capabilities: Option<Vec<AmqpSymbol>>,
30+
/// List of desired capabilities for the connection.
2331
pub desired_capabilities: Option<Vec<AmqpSymbol>>,
32+
/// Properties for the connection.
2433
pub properties: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
34+
/// Buffer size for the connection.
2535
pub buffer_size: Option<usize>,
36+
/// Custom endpoint for the connection. Used to connect to a local AMQP proxy server.
37+
pub custom_endpoint: Option<Url>,
2638
}
2739

2840
impl AmqpConnectionOptions {}
@@ -205,6 +217,7 @@ mod tests {
205217
incoming_locales: Some(vec!["en-US".to_string()]),
206218
offered_capabilities: Some(vec!["capability".into()]),
207219
desired_capabilities: Some(vec!["capability".into()]),
220+
custom_endpoint: Some(Url::parse("http://localhost:8080").unwrap()),
208221
properties: Some(
209222
vec![("key", "value")]
210223
.into_iter()
@@ -242,6 +255,10 @@ mod tests {
242255
vec![("key".into(), "value".into())].into_iter().collect() // convert to AmqpOrderedMap
243256
)
244257
);
258+
assert_eq!(
259+
connection_options.custom_endpoint,
260+
Some(Url::parse("http://localhost:8080").unwrap())
261+
);
245262
}
246263

247264
#[tokio::test]

sdk/core/azure_core_amqp/src/fe2o3/cbs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
7171
}
7272

7373
async fn detach(mut self) -> Result<()> {
74-
let cbs = self.cbs.take().ok_or(Self::cbs_not_set())?;
74+
let cbs = self.cbs.take().ok_or_else(Self::cbs_not_set)?;
7575
let cbs = cbs.into_inner();
7676
cbs.close().await.map_err(AmqpError::from)?;
7777
Ok(())

sdk/core/azure_core_amqp/src/fe2o3/connection.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -53,64 +53,71 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
5353
options: Option<AmqpConnectionOptions>,
5454
) -> Result<()> {
5555
{
56+
let options = options.unwrap_or_default();
57+
let mut endpoint = url.clone();
58+
5659
// All AMQP clients have a similar set of options.
5760
let mut builder = fe2o3_amqp::Connection::builder()
5861
.sasl_profile(fe2o3_amqp::sasl_profile::SaslProfile::Anonymous)
5962
.alt_tls_establishment(true)
6063
.container_id(id)
6164
.max_frame_size(65536);
6265

63-
if let Some(options) = options {
64-
if let Some(frame_size) = options.max_frame_size {
65-
builder = builder.max_frame_size(frame_size);
66-
}
66+
if let Some(frame_size) = options.max_frame_size {
67+
builder = builder.max_frame_size(frame_size);
68+
}
6769

68-
if let Some(channel_max) = options.channel_max {
69-
builder = builder.channel_max(channel_max);
70-
}
71-
if let Some(idle_timeout) = options.idle_timeout {
72-
builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32);
73-
}
74-
if let Some(outgoing_locales) = options.outgoing_locales.as_ref() {
75-
for locale in outgoing_locales {
76-
builder = builder.add_outgoing_locales(locale.as_str());
77-
}
78-
}
79-
if let Some(incoming_locales) = options.incoming_locales {
80-
for locale in incoming_locales {
81-
builder = builder.add_incoming_locales(locale.as_str());
82-
}
70+
if let Some(channel_max) = options.channel_max {
71+
builder = builder.channel_max(channel_max);
72+
}
73+
if let Some(idle_timeout) = options.idle_timeout {
74+
builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32);
75+
}
76+
if let Some(outgoing_locales) = options.outgoing_locales.as_ref() {
77+
for locale in outgoing_locales {
78+
builder = builder.add_outgoing_locales(locale.as_str());
8379
}
84-
if let Some(offered_capabilities) = options.offered_capabilities.as_ref() {
85-
for capability in offered_capabilities {
86-
let capability: fe2o3_amqp_types::primitives::Symbol =
87-
capability.clone().into();
88-
builder = builder.add_offered_capabilities(capability);
89-
}
80+
}
81+
if let Some(incoming_locales) = options.incoming_locales {
82+
for locale in incoming_locales {
83+
builder = builder.add_incoming_locales(locale.as_str());
9084
}
91-
if let Some(desired_capabilities) = options.desired_capabilities.as_ref() {
92-
for capability in desired_capabilities {
93-
let capability: fe2o3_amqp_types::primitives::Symbol =
94-
capability.clone().into();
95-
builder = builder.add_desired_capabilities(capability);
96-
}
85+
}
86+
if let Some(offered_capabilities) = options.offered_capabilities.as_ref() {
87+
for capability in offered_capabilities {
88+
let capability: fe2o3_amqp_types::primitives::Symbol =
89+
capability.clone().into();
90+
builder = builder.add_offered_capabilities(capability);
9791
}
98-
if let Some(properties) = options.properties.as_ref() {
99-
let mut fields = fe2o3_amqp::types::definitions::Fields::new();
100-
for property in properties.iter() {
101-
let k = fe2o3_amqp_types::primitives::Symbol::from(property.0);
102-
let v = fe2o3_amqp_types::primitives::Value::from(property.1);
103-
104-
fields.insert(k, v);
105-
}
106-
builder = builder.properties(fields);
92+
}
93+
if let Some(desired_capabilities) = options.desired_capabilities.as_ref() {
94+
for capability in desired_capabilities {
95+
let capability: fe2o3_amqp_types::primitives::Symbol =
96+
capability.clone().into();
97+
builder = builder.add_desired_capabilities(capability);
10798
}
108-
if let Some(buffer_size) = options.buffer_size {
109-
builder = builder.buffer_size(buffer_size);
99+
}
100+
if let Some(properties) = options.properties.as_ref() {
101+
let mut fields = fe2o3_amqp::types::definitions::Fields::new();
102+
for property in properties.iter() {
103+
let k = fe2o3_amqp_types::primitives::Symbol::from(property.0);
104+
let v = fe2o3_amqp_types::primitives::Value::from(property.1);
105+
106+
fields.insert(k, v);
110107
}
108+
builder = builder.properties(fields);
111109
}
110+
if let Some(buffer_size) = options.buffer_size {
111+
builder = builder.buffer_size(buffer_size);
112+
}
113+
114+
if let Some(custom_endpoint) = options.custom_endpoint {
115+
endpoint = custom_endpoint;
116+
builder = builder.hostname(url.host_str());
117+
}
118+
112119
self.connection
113-
.set(Mutex::new(builder.open(url).await.map_err(|e| {
120+
.set(Mutex::new(builder.open(endpoint).await.map_err(|e| {
114121
azure_core::Error::from(Fe2o3ConnectionOpenError(e))
115122
})?))
116123
.map_err(|_| Self::connection_already_set())?;
@@ -142,7 +149,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
142149
let mut connection = self
143150
.connection
144151
.get()
145-
.ok_or(Self::connection_not_set())?
152+
.ok_or_else(Self::connection_not_set)?
146153
.lock()
147154
.await;
148155
let res = connection

sdk/core/azure_core_amqp/src/fe2o3/management.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
8282
let management = self
8383
.management
8484
.take()
85-
.ok_or(Self::amqp_management_not_attached())?;
85+
.ok_or_else(Self::amqp_management_not_attached)?;
8686
let management = management.into_inner();
8787
management.close().await.map_err(AmqpError::from)?;
8888
Ok(())
@@ -96,7 +96,7 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
9696
let mut management = self
9797
.management
9898
.get()
99-
.ok_or(Self::amqp_management_not_attached())?
99+
.ok_or_else(Self::amqp_management_not_attached)?
100100
.lock()
101101
.await;
102102

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ impl From<AmqpMessageId> for fe2o3_amqp_types::messaging::MessageId {
4949

5050
#[test]
5151
fn test_message_id_conversion() {
52-
use crate::Uuid;
53-
52+
use azure_core::Uuid;
5453
{
5554
let message_id = fe2o3_amqp_types::messaging::MessageId::String("test".into());
5655
let amqp_message_id: AmqpMessageId = message_id.clone().into();

sdk/core/azure_core_amqp/src/fe2o3/receiver.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
7878
}
7979

8080
async fn detach(mut self) -> Result<()> {
81-
let receiver = self.receiver.take().ok_or(Self::receiver_not_set())?;
81+
let receiver = self.receiver.take().ok_or_else(Self::receiver_not_set)?;
8282
let res = receiver
8383
.into_inner()
8484
.detach()
@@ -100,21 +100,21 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
100100
}
101101

102102
async fn set_credit_mode(&self, credit_mode: ReceiverCreditMode) -> Result<()> {
103-
let receiver = self.receiver.get().ok_or(Self::receiver_not_set())?;
103+
let receiver = self.receiver.get().ok_or_else(Self::receiver_not_set)?;
104104
receiver.lock().await.set_credit_mode(credit_mode.into());
105105
Ok(())
106106
}
107107

108108
async fn credit_mode(&self) -> Result<ReceiverCreditMode> {
109-
let receiver = self.receiver.get().ok_or(Self::receiver_not_set())?;
109+
let receiver = self.receiver.get().ok_or_else(Self::receiver_not_set)?;
110110
Ok(receiver.lock().await.credit_mode().into())
111111
}
112112

113113
async fn receive_delivery(&self) -> Result<AmqpDelivery> {
114114
let mut receiver = self
115115
.receiver
116116
.get()
117-
.ok_or(Self::receiver_not_set())?
117+
.ok_or_else(Self::receiver_not_set)?
118118
.lock()
119119
.await;
120120

@@ -132,7 +132,7 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
132132
let receiver = self
133133
.receiver
134134
.get()
135-
.ok_or(Self::receiver_not_set())?
135+
.ok_or_else(Self::receiver_not_set)?
136136
.lock()
137137
.await;
138138

@@ -150,7 +150,7 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
150150
let receiver = self
151151
.receiver
152152
.get()
153-
.ok_or(Self::receiver_not_set())?
153+
.ok_or_else(Self::receiver_not_set)?
154154
.lock()
155155
.await;
156156

@@ -168,7 +168,7 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
168168
let receiver = self
169169
.receiver
170170
.get()
171-
.ok_or(Self::receiver_not_set())?
171+
.ok_or_else(Self::receiver_not_set)?
172172
.lock()
173173
.await;
174174

sdk/core/azure_core_amqp/src/fe2o3/sender.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
9494
let sender = self
9595
.sender
9696
.take()
97-
.ok_or(Self::could_not_get_message_sender())?;
97+
.ok_or_else(Self::could_not_get_message_sender)?;
9898
let res = sender
9999
.into_inner()
100100
.detach()
@@ -119,7 +119,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
119119
Ok(self
120120
.sender
121121
.get()
122-
.ok_or(Self::could_not_get_message_sender())?
122+
.ok_or_else(Self::could_not_get_message_sender)?
123123
.lock()
124124
.await
125125
.max_message_size())
@@ -149,7 +149,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
149149
let outcome = self
150150
.sender
151151
.get()
152-
.ok_or(Self::could_not_get_message_sender())?
152+
.ok_or_else(Self::could_not_get_message_sender)?
153153
.lock()
154154
.await
155155
.borrow_mut()

0 commit comments

Comments
 (0)