diff --git a/CHANGES.md b/CHANGES.md index ef16a0d..11a59db 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,12 @@ # Changes +## [5.6.0] - 2026-02-16 + +* AmqpServiceConfig is not Clone + ## [5.5.0] - 2026-02-16 -* SharedCfg is not Copy anymore +* SharedCfg is not Copy ## [5.4.0] - 2026-01-29 diff --git a/Cargo.toml b/Cargo.toml index 2b10d3c..51a4843 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "5.5.0" +version = "5.6.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -58,7 +58,7 @@ thiserror = "2" env_logger = "0.11" rand = "0.9" ntex-amqp = { path = ".", features = ["frame-trace"] } -ntex = "3.0.0-pre.14" +ntex = "3.3" [patch.crates-io] ntex-amqp = { path = "." } diff --git a/src/client/connection.rs b/src/client/connection.rs index 9a42275..7d8ea6d 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -5,15 +5,16 @@ use ntex_util::future::Ready; use crate::codec::{AmqpCodec, AmqpFrame}; use crate::control::ControlFrame; +use crate::dispatcher::Dispatcher; use crate::error::{AmqpDispatcherError, LinkError}; -use crate::{AmqpServiceConfig, Connection, ConnectionRef, State, dispatcher::Dispatcher}; +use crate::{Connection, ConnectionRef, RemoteServiceConfig, State}; /// Mqtt client pub struct Client { io: IoBoxed, codec: AmqpCodec, connection: Connection, - remote_config: AmqpServiceConfig, + remote_config: RemoteServiceConfig, _st: State, } @@ -23,7 +24,7 @@ impl Client { io: IoBoxed, codec: AmqpCodec, connection: Connection, - remote_config: AmqpServiceConfig, + remote_config: RemoteServiceConfig, ) -> Self { Client { io, diff --git a/src/client/connector.rs b/src/client/connector.rs index f7984fc..62ef3e1 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -8,7 +8,7 @@ use ntex_util::time::timeout_checked; use crate::codec::protocol::{Frame, ProtocolId, SaslCode, SaslFrameBody, SaslInit}; use crate::codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec, SaslFrame, types::Symbol}; -use crate::{AmqpServiceConfig, Connection, error::ProtocolIdError}; +use crate::{AmqpServiceConfig, Connection, RemoteServiceConfig, error::ProtocolIdError}; use super::{Connect, SaslAuth, connection::Client, error::ConnectError}; @@ -249,7 +249,7 @@ async fn connect_plain_inner( if let Frame::Open(open) = frame.performative() { log::trace!("{}: Open confirmed: {:?}", io.tag(), open); - let remote_config = config.from_remote(open); + let remote_config = RemoteServiceConfig::new(open); let connection = Connection::new(io.get_ref(), config, &remote_config); let client = Client::new(io, codec, connection, remote_config); Ok(client) diff --git a/src/connection.rs b/src/connection.rs index 85496ee..ff39e8a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -10,7 +10,9 @@ use crate::codec::{AmqpCodec, AmqpFrame, types}; use crate::control::ControlQueue; use crate::session::{INITIAL_NEXT_OUTGOING_ID, Session, SessionInner}; use crate::sndlink::{SenderLink, SenderLinkInner}; -use crate::{AmqpServiceConfig, cell::Cell, error::AmqpProtocolError, types::Action}; +use crate::{ + AmqpServiceConfig, RemoteServiceConfig, cell::Cell, error::AmqpProtocolError, types::Action, +}; pub struct Connection(ConnectionRef); @@ -56,7 +58,7 @@ impl Connection { pub(crate) fn new( io: IoRef, local_config: Cfg, - remote_config: &AmqpServiceConfig, + remote_config: &RemoteServiceConfig, ) -> Connection { Connection(ConnectionRef(Cell::new(ConnectionInner { io, diff --git a/src/lib.rs b/src/lib.rs index 53f84c9..c0489ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,7 @@ pub mod codec { } /// Amqp1 transport configuration. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct AmqpServiceConfig { pub max_frame_size: u32, pub channel_max: u16, @@ -71,6 +71,17 @@ pub struct AmqpServiceConfig { config: CfgContext, } +/// Amqp1 transport configuration. +#[derive(Debug)] +pub struct RemoteServiceConfig { + pub max_frame_size: u32, + pub channel_max: u16, + pub idle_time_out: Milliseconds, + pub hostname: Option, + pub offered_capabilities: Option, + pub desired_capabilities: Option, +} + impl Default for AmqpServiceConfig { fn default() -> Self { Self::new() @@ -219,28 +230,27 @@ impl AmqpServiceConfig { properties: None, })) } +} - #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)] - pub(crate) fn timeout_remote_secs(&self) -> Seconds { - if self.idle_time_out > 0 { - Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize) - } else { - Seconds::ZERO - } - } - +impl RemoteServiceConfig { #[must_use] - pub fn from_remote(&self, open: &Open) -> AmqpServiceConfig { - AmqpServiceConfig { + pub fn new(open: &Open) -> RemoteServiceConfig { + RemoteServiceConfig { max_frame_size: open.max_frame_size(), channel_max: open.channel_max(), idle_time_out: open.idle_time_out().unwrap_or(0), hostname: open.hostname().cloned(), - max_size: self.max_size, - handshake_timeout: self.handshake_timeout, offered_capabilities: open.0.offered_capabilities.clone(), desired_capabilities: open.0.desired_capabilities.clone(), - config: self.config.clone(), + } + } + + #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)] + pub(crate) fn timeout_remote_secs(&self) -> Seconds { + if self.idle_time_out > 0 { + Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize) + } else { + Seconds::ZERO } } } diff --git a/src/server/handshake.rs b/src/server/handshake.rs index d057532..eb02bf0 100644 --- a/src/server/handshake.rs +++ b/src/server/handshake.rs @@ -3,7 +3,7 @@ use ntex_service::cfg::Cfg; use ntex_util::time::Seconds; use crate::codec::{AmqpCodec, AmqpFrame, protocol::Frame, protocol::Open}; -use crate::{AmqpServiceConfig, connection::Connection}; +use crate::{AmqpServiceConfig, RemoteServiceConfig, connection::Connection}; use super::{error::HandshakeError, sasl::Sasl}; @@ -66,8 +66,8 @@ impl HandshakeAmqp { match frame { Frame::Open(frame) => { log::trace!("{}: Got open frame: {:?}", state.tag(), frame); - let remote_config = local_config.from_remote(&frame); - let sink = Connection::new(state.get_ref(), local_config, &remote_config); + let remote_config = RemoteServiceConfig::new(&frame); + let sink = Connection::new(state.get_ref(), local_config.clone(), &remote_config); Ok(HandshakeAmqpOpened { frame, sink, @@ -87,7 +87,7 @@ pub struct HandshakeAmqpOpened { sink: Connection, state: IoBoxed, local_config: Cfg, - remote_config: AmqpServiceConfig, + remote_config: RemoteServiceConfig, } impl HandshakeAmqpOpened { @@ -96,7 +96,7 @@ impl HandshakeAmqpOpened { sink: Connection, state: IoBoxed, local_config: Cfg, - remote_config: AmqpServiceConfig, + remote_config: RemoteServiceConfig, ) -> Self { Self { frame, @@ -123,7 +123,7 @@ impl HandshakeAmqpOpened { } /// Get remote configuration - pub fn remote_config(&self) -> &AmqpServiceConfig { + pub fn remote_config(&self) -> &RemoteServiceConfig { &self.remote_config } diff --git a/src/server/sasl.rs b/src/server/sasl.rs index 2dfdd9b..86b1cfd 100644 --- a/src/server/sasl.rs +++ b/src/server/sasl.rs @@ -8,9 +8,9 @@ use crate::codec::protocol::{ self, ProtocolId, SaslChallenge, SaslCode, SaslFrameBody, SaslMechanisms, SaslOutcome, Symbols, }; use crate::codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec, ProtocolIdError, SaslFrame}; +use crate::{AmqpServiceConfig, RemoteServiceConfig, connection::Connection}; use super::{HandshakeError, handshake::HandshakeAmqpOpened}; -use crate::{AmqpServiceConfig, connection::Connection}; #[derive(Debug)] pub struct Sasl { @@ -264,8 +264,9 @@ impl SaslSuccess { log::trace!("{}: Got open frame: {:?}", state.tag(), frame); let local_config = self.local_config; - let remote_config = local_config.from_remote(&frame); - let sink = Connection::new(state.clone(), local_config, &remote_config); + let remote_config = RemoteServiceConfig::new(&frame); + let sink = + Connection::new(state.clone(), local_config.clone(), &remote_config); Ok(HandshakeAmqpOpened::new( frame,