diff --git a/CHANGES.md b/CHANGES.md index 0733c01..09e7279 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [5.3.0] - 2026-01-xx + +* Add `Begin` frame to `Session` + ## [5.2.1] - 2026-01-16 * Fix link credit handling diff --git a/Cargo.toml b/Cargo.toml index 530e74e..6381178 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "5.2.1" +version = "5.3.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -27,7 +27,7 @@ frame-trace = [] [dependencies] ntex-amqp-codec = "2.0.0" -ntex-bytes = "1.2" +ntex-bytes = "1.4" ntex-router = "1" ntex-rt = "3.4" ntex-io = "3.3" diff --git a/codec/src/types/variant.rs b/codec/src/types/variant.rs index 6be5484..c8f9fdb 100644 --- a/codec/src/types/variant.rs +++ b/codec/src/types/variant.rs @@ -141,7 +141,7 @@ impl DescribedCompound { /// ```text /// 0xc0 0x02 0x01 0x50 0x03 /// ``` - pub fn decode(self) -> Result { + pub fn decode(&self) -> Result { let mut buf = self.data.clone(); let result = T::decode(&mut buf)?; if buf.is_empty() { diff --git a/src/connection.rs b/src/connection.rs index b2e1f8a..a2a5dd7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -245,7 +245,7 @@ impl ConnectionInner { pub(crate) fn register_remote_session( &mut self, remote_channel_id: u16, - begin: &Begin, + begin: Begin, cell: &Cell, ) -> Result<(), AmqpProtocolError> { log::trace!( @@ -256,24 +256,23 @@ impl ConnectionInner { let entry = self.sessions.vacant_entry(); let local_token = entry.key(); + let outgoing_window = begin.incoming_window(); let session = Cell::new(SessionInner::new( local_token, false, ConnectionRef(cell.clone()), remote_channel_id, - begin.next_outgoing_id(), - begin.incoming_window(), - begin.outgoing_window(), + begin, )); entry.insert(SessionState::Established(session)); self.sessions_map.insert(remote_channel_id, local_token); let begin = Begin(Box::new(codec::BeginInner { + outgoing_window, remote_channel: Some(remote_channel_id), next_outgoing_id: 1, incoming_window: u32::MAX, - outgoing_window: begin.incoming_window(), handle_max: u32::MAX, offered_capabilities: None, desired_capabilities: None, @@ -293,7 +292,7 @@ impl ConnectionInner { &mut self, local_channel_id: u16, remote_channel_id: u16, - begin: &Begin, + begin: Begin, ) { log::trace!( "{}: Begin response received: local {:?} remote {:?}", @@ -312,9 +311,7 @@ impl ConnectionInner { true, ConnectionRef(cell.clone()), remote_channel_id, - begin.next_outgoing_id(), - begin.incoming_window(), - begin.outgoing_window(), + begin, )); self.sessions_map.insert(remote_channel_id, local_token); @@ -381,9 +378,9 @@ impl ConnectionInner { // the remote-channel property in the frame is the local channel id // we previously sent to the remote if let Some(local_channel_id) = begin.remote_channel() { - self.complete_session_creation(local_channel_id, channel_id, &begin); + self.complete_session_creation(local_channel_id, channel_id, begin); } else { - self.register_remote_session(channel_id, &begin, inner)?; + self.register_remote_session(channel_id, begin, inner)?; } return Ok(Action::None); } diff --git a/src/error.rs b/src/error.rs index 6011995..79c7b11 100644 --- a/src/error.rs +++ b/src/error.rs @@ -66,6 +66,12 @@ pub enum AmqpProtocolError { ConnectionDropped, } +impl From for AmqpProtocolError { + fn from(err: AmqpParseError) -> Self { + Self::Codec(err.into()) + } +} + #[derive(Clone, Debug, thiserror::Error)] #[error("Amqp error: {:?} {:?} ({:?})", err, description, info)] pub struct AmqpError { diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 26fa1c8..4222887 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -390,8 +390,7 @@ impl ReceiverLinkInner { .connection() .config() .write_buf() - .buf_with_capacity(msg.encoded_size()) - .into(); + .buf_with_capacity(msg.encoded_size()); msg.encode(&mut buf); buf } @@ -402,7 +401,6 @@ impl ReceiverLinkInner { .config() .write_buf() .buf_with_capacity(16) - .into() }; self.partial_body = Some(body); diff --git a/src/session.rs b/src/session.rs index c36f3cb..4134177 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,9 +6,9 @@ use ntex_util::{HashMap, future::Either, future::Ready}; use slab::Slab; use ntex_amqp_codec::protocol::{ - self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End, - Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source, - Transfer, TransferBody, TransferNumber, + self as codec, Accepted, Attach, Begin, DeliveryNumber, DeliveryState, Detach, Disposition, + End, Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, + Source, Transfer, TransferBody, TransferNumber, }; use ntex_amqp_codec::{AmqpFrame, Encode}; @@ -33,6 +33,7 @@ pub(crate) struct SessionInner { sink: ConnectionRef, next_outgoing_id: TransferNumber, flags: Flags, + begin: Begin, remote_channel_id: u16, next_incoming_id: TransferNumber, @@ -64,6 +65,12 @@ impl Session { Session { inner } } + #[inline] + /// Get begin frame reference + pub fn frame(&self) -> &Begin { + &self.inner.get_ref().begin + } + #[inline] /// Get io tag for current connection pub fn tag(&self) -> &'static str { @@ -282,17 +289,12 @@ impl SessionInner { local: bool, sink: ConnectionRef, remote_channel_id: u16, - next_incoming_id: DeliveryNumber, - remote_incoming_window: u32, - remote_outgoing_window: u32, + begin: Begin, ) -> SessionInner { SessionInner { - id, - sink, - next_incoming_id, - remote_channel_id, - remote_incoming_window, - remote_outgoing_window, + next_incoming_id: begin.next_outgoing_id(), + remote_incoming_window: begin.incoming_window(), + remote_outgoing_window: begin.outgoing_window(), flags: if local { Flags::LOCAL } else { Flags::empty() }, next_outgoing_id: INITIAL_NEXT_OUTGOING_ID, unsettled_snd_deliveries: HashMap::default(), @@ -305,6 +307,10 @@ impl SessionInner { pool_notify: pool::new(), pool_credit: pool::new(), closed: condition::Condition::new(), + id, + sink, + begin, + remote_channel_id, } } @@ -1255,8 +1261,7 @@ impl SessionInner { .sink .config() .write_buf() - .buf_with_capacity(msg.encoded_size()) - .into(); + .buf_with_capacity(msg.encoded_size()); msg.encode(&mut buf); buf.freeze() }