Skip to content

Commit 798f977

Browse files
Implement rust management APIs (Azure#1854)
* Implement rust management APIs
1 parent e96116b commit 798f977

File tree

5 files changed

+49
-46
lines changed

5 files changed

+49
-46
lines changed

sdk/core/azure_core_amqp/src/connection.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ impl AmqpConnectionOptions {
3030
pub fn builder() -> builders::AmqpConnectionOptionsBuilder {
3131
builders::AmqpConnectionOptionsBuilder::new()
3232
}
33-
3433
pub fn max_frame_size(&self) -> Option<u32> {
3534
self.max_frame_size
3635
}
@@ -124,47 +123,38 @@ pub mod builders {
124123
options: Default::default(),
125124
}
126125
}
127-
pub fn build(&mut self) -> AmqpConnectionOptions {
128-
self.options.clone()
126+
pub fn build(self) -> AmqpConnectionOptions {
127+
self.options
129128
}
130-
pub fn with_max_frame_size(&mut self, max_frame_size: u32) -> &mut Self {
129+
pub fn with_max_frame_size(mut self, max_frame_size: u32) -> Self {
131130
self.options.max_frame_size = Some(max_frame_size);
132131
self
133132
}
134-
pub fn with_channel_max(&mut self, channel_max: u16) -> &mut Self {
133+
pub fn with_channel_max(mut self, channel_max: u16) -> Self {
135134
self.options.channel_max = Some(channel_max);
136135
self
137136
}
138-
pub fn with_idle_timeout(&mut self, idle_timeout: Duration) -> &mut Self {
137+
pub fn with_idle_timeout(mut self, idle_timeout: Duration) -> Self {
139138
self.options.idle_timeout = Some(idle_timeout);
140139
self
141140
}
142-
pub fn with_outgoing_locales(&mut self, outgoing_locales: Vec<String>) -> &mut Self {
141+
pub fn with_outgoing_locales(mut self, outgoing_locales: Vec<String>) -> Self {
143142
self.options.outgoing_locales = Some(outgoing_locales);
144143
self
145144
}
146-
pub fn with_incoming_locales(&mut self, incoming_locales: Vec<String>) -> &mut Self {
145+
pub fn with_incoming_locales(mut self, incoming_locales: Vec<String>) -> Self {
147146
self.options.incoming_locales = Some(incoming_locales);
148147
self
149148
}
150-
pub fn with_offered_capabilities(
151-
&mut self,
152-
offered_capabilities: Vec<AmqpSymbol>,
153-
) -> &mut Self {
149+
pub fn with_offered_capabilities(mut self, offered_capabilities: Vec<AmqpSymbol>) -> Self {
154150
self.options.offered_capabilities = Some(offered_capabilities);
155151
self
156152
}
157-
pub fn with_desired_capabilities(
158-
&mut self,
159-
desired_capabilities: Vec<AmqpSymbol>,
160-
) -> &mut Self {
153+
pub fn with_desired_capabilities(mut self, desired_capabilities: Vec<AmqpSymbol>) -> Self {
161154
self.options.desired_capabilities = Some(desired_capabilities);
162155
self
163156
}
164-
pub fn with_properties<K, V>(
165-
&mut self,
166-
properties: impl Into<AmqpOrderedMap<K, V>>,
167-
) -> &mut Self
157+
pub fn with_properties<K, V>(mut self, properties: impl Into<AmqpOrderedMap<K, V>>) -> Self
168158
where
169159
K: Into<AmqpSymbol> + Debug + Default + PartialEq,
170160
V: Into<AmqpValue> + Debug + Default,
@@ -177,7 +167,7 @@ pub mod builders {
177167
self.options.properties = Some(properties_map);
178168
self
179169
}
180-
pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self {
170+
pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
181171
self.options.buffer_size = Some(buffer_size);
182172
self
183173
}

sdk/core/azure_core_amqp/src/fe2o3/management.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@ use crate::{
1111
};
1212

1313
use async_std::sync::Mutex;
14-
use azure_core::{credentials::AccessToken, error::Result};
14+
use azure_core::{
15+
credentials::AccessToken,
16+
error::{ErrorKind, Result},
17+
Error,
18+
};
1519
use fe2o3_amqp_management::operations::ReadResponse;
1620
use fe2o3_amqp_types::{messaging::ApplicationProperties, primitives::SimpleValue};
1721
use std::sync::{Arc, OnceLock};
1822
use tracing::debug;
1923

20-
use super::error::{AmqpManagement, AmqpManagementAttach};
24+
use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach};
2125

2226
#[derive(Debug)]
2327
pub(crate) struct Fe2o3AmqpManagement {
@@ -67,6 +71,18 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
6771
})?;
6872
Ok(())
6973
}
74+
75+
async fn detach(mut self) -> Result<()> {
76+
// Detach the management client from the session.
77+
let management = self
78+
.management
79+
.take()
80+
.ok_or_else(|| Error::message(ErrorKind::Other, "Unattached management node."))?;
81+
let management = management.into_inner();
82+
management.close().await.map_err(AmqpLinkDetach::from)?;
83+
Ok(())
84+
}
85+
7086
async fn call(
7187
&self,
7288
operation_type: impl Into<String>,

sdk/core/azure_core_amqp/src/management.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type ManagementImplementation = super::noop::NoopAmqpManagement;
1717

1818
pub trait AmqpManagementApis {
1919
fn attach(&self) -> impl std::future::Future<Output = Result<()>>;
20+
fn detach(self) -> impl std::future::Future<Output = Result<()>>;
2021

2122
#[allow(unused_variables)]
2223
fn call(
@@ -35,6 +36,9 @@ impl AmqpManagementApis for AmqpManagement {
3536
async fn attach(&self) -> Result<()> {
3637
self.implementation.attach().await
3738
}
39+
async fn detach(self) -> Result<()> {
40+
self.implementation.detach().await
41+
}
3842
async fn call(
3943
&self,
4044
operation_type: impl Into<String>,

sdk/core/azure_core_amqp/src/noop.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ impl AmqpManagementApis for NoopAmqpManagement {
123123
unimplemented!();
124124
}
125125

126+
async fn detach(self) -> Result<()> {
127+
unimplemented!();
128+
}
129+
126130
async fn call(
127131
&self,
128132
operation_type: impl Into<String>,
@@ -148,11 +152,9 @@ impl AmqpSenderApis for NoopAmqpSender {
148152
) -> Result<()> {
149153
unimplemented!();
150154
}
151-
152155
async fn detach(self) -> Result<()> {
153156
unimplemented!();
154157
}
155-
156158
fn max_message_size(&self) -> Result<Option<u64>> {
157159
unimplemented!();
158160
}

sdk/core/azure_core_amqp/src/sender.rs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,65 +146,56 @@ pub mod builders {
146146
}
147147
}
148148

149-
pub fn with_sender_settle_mode(
150-
&mut self,
151-
sender_settle_mode: SenderSettleMode,
152-
) -> &mut Self {
149+
pub fn with_sender_settle_mode(mut self, sender_settle_mode: SenderSettleMode) -> Self {
153150
self.options.sender_settle_mode = Some(sender_settle_mode);
154151
self
155152
}
156153

157154
pub fn with_receiver_settle_mode(
158-
&mut self,
155+
mut self,
159156
receiver_settle_mode: ReceiverSettleMode,
160-
) -> &mut Self {
157+
) -> Self {
161158
self.options.receiver_settle_mode = Some(receiver_settle_mode);
162159
self
163160
}
164161

165-
pub fn with_source(&mut self, source: impl Into<AmqpSource>) -> &mut Self {
162+
pub fn with_source(mut self, source: impl Into<AmqpSource>) -> Self {
166163
self.options.source = Some(source.into());
167164
self
168165
}
169-
pub fn with_offered_capabilities(
170-
&mut self,
171-
offered_capabilities: Vec<AmqpSymbol>,
172-
) -> &mut Self {
166+
pub fn with_offered_capabilities(mut self, offered_capabilities: Vec<AmqpSymbol>) -> Self {
173167
self.options.offered_capabilities = Some(offered_capabilities);
174168
self
175169
}
176170
#[allow(dead_code)]
177-
pub fn with_desired_capabilities(
178-
&mut self,
179-
desired_capabilities: Vec<AmqpSymbol>,
180-
) -> &mut Self {
171+
pub fn with_desired_capabilities(mut self, desired_capabilities: Vec<AmqpSymbol>) -> Self {
181172
self.options.desired_capabilities = Some(desired_capabilities);
182173
self
183174
}
184175

185176
pub fn with_properties(
186-
&mut self,
177+
mut self,
187178
properties: impl Into<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
188-
) -> &mut Self {
179+
) -> Self {
189180
let properties_map: AmqpOrderedMap<AmqpSymbol, AmqpValue> =
190181
properties.into().iter().collect();
191182

192183
self.options.properties = Some(properties_map);
193184
self
194185
}
195186

196-
pub fn with_initial_delivery_count(&mut self, initial_delivery_count: u32) -> &mut Self {
187+
pub fn with_initial_delivery_count(mut self, initial_delivery_count: u32) -> Self {
197188
self.options.initial_delivery_count = Some(initial_delivery_count);
198189
self
199190
}
200191

201-
pub fn with_max_message_size(&mut self, max_message_size: u64) -> &mut Self {
192+
pub fn with_max_message_size(mut self, max_message_size: u64) -> Self {
202193
self.options.max_message_size = Some(max_message_size);
203194
self
204195
}
205196

206-
pub fn build(&mut self) -> AmqpSenderOptions {
207-
self.options.clone()
197+
pub fn build(self) -> AmqpSenderOptions {
198+
self.options
208199
}
209200
}
210201
}

0 commit comments

Comments
 (0)