Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [5.3.0] - 2026-01-xx

* Add `Begin` frame to `Session`

## [5.2.1] - 2026-01-16

* Fix link credit handling
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "5.2.1"
version = "5.3.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -27,7 +27,7 @@ frame-trace = []
[dependencies]
ntex-amqp-codec = "2.0.0"

ntex-bytes = "1.2"
ntex-bytes = "1.4"
ntex-router = "1"
ntex-rt = "3.4"
ntex-io = "3.3"
Expand Down
2 changes: 1 addition & 1 deletion codec/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl DescribedCompound {
/// ```text
/// 0xc0 0x02 0x01 0x50 0x03
/// ```
pub fn decode<T: Decode>(self) -> Result<T, AmqpParseError> {
pub fn decode<T: Decode>(&self) -> Result<T, AmqpParseError> {
let mut buf = self.data.clone();
let result = T::decode(&mut buf)?;
if buf.is_empty() {
Expand Down
19 changes: 8 additions & 11 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl ConnectionInner {
pub(crate) fn register_remote_session(
&mut self,
remote_channel_id: u16,
begin: &Begin,
begin: Begin,
cell: &Cell<ConnectionInner>,
) -> Result<(), AmqpProtocolError> {
log::trace!(
Expand All @@ -256,24 +256,23 @@ impl ConnectionInner {

let entry = self.sessions.vacant_entry();
let local_token = entry.key();
let outgoing_window = begin.incoming_window();

let session = Cell::new(SessionInner::new(
local_token,
false,
ConnectionRef(cell.clone()),
remote_channel_id,
begin.next_outgoing_id(),
begin.incoming_window(),
begin.outgoing_window(),
begin,
));
entry.insert(SessionState::Established(session));
self.sessions_map.insert(remote_channel_id, local_token);

let begin = Begin(Box::new(codec::BeginInner {
outgoing_window,
remote_channel: Some(remote_channel_id),
next_outgoing_id: 1,
incoming_window: u32::MAX,
outgoing_window: begin.incoming_window(),
handle_max: u32::MAX,
offered_capabilities: None,
desired_capabilities: None,
Expand All @@ -293,7 +292,7 @@ impl ConnectionInner {
&mut self,
local_channel_id: u16,
remote_channel_id: u16,
begin: &Begin,
begin: Begin,
) {
log::trace!(
"{}: Begin response received: local {:?} remote {:?}",
Expand All @@ -312,9 +311,7 @@ impl ConnectionInner {
true,
ConnectionRef(cell.clone()),
remote_channel_id,
begin.next_outgoing_id(),
begin.incoming_window(),
begin.outgoing_window(),
begin,
));
self.sessions_map.insert(remote_channel_id, local_token);

Expand Down Expand Up @@ -381,9 +378,9 @@ impl ConnectionInner {
// the remote-channel property in the frame is the local channel id
// we previously sent to the remote
if let Some(local_channel_id) = begin.remote_channel() {
self.complete_session_creation(local_channel_id, channel_id, &begin);
self.complete_session_creation(local_channel_id, channel_id, begin);
} else {
self.register_remote_session(channel_id, &begin, inner)?;
self.register_remote_session(channel_id, begin, inner)?;
}
return Ok(Action::None);
}
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ pub enum AmqpProtocolError {
ConnectionDropped,
}

impl From<AmqpParseError> for AmqpProtocolError {
fn from(err: AmqpParseError) -> Self {
Self::Codec(err.into())
}
}

#[derive(Clone, Debug, thiserror::Error)]
#[error("Amqp error: {:?} {:?} ({:?})", err, description, info)]
pub struct AmqpError {
Expand Down
4 changes: 1 addition & 3 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ impl ReceiverLinkInner {
.connection()
.config()
.write_buf()
.buf_with_capacity(msg.encoded_size())
.into();
.buf_with_capacity(msg.encoded_size());
msg.encode(&mut buf);
buf
}
Expand All @@ -402,7 +401,6 @@ impl ReceiverLinkInner {
.config()
.write_buf()
.buf_with_capacity(16)
.into()
};
self.partial_body = Some(body);

Expand Down
33 changes: 19 additions & 14 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use ntex_util::{HashMap, future::Either, future::Ready};
use slab::Slab;

use ntex_amqp_codec::protocol::{
self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
Transfer, TransferBody, TransferNumber,
self as codec, Accepted, Attach, Begin, DeliveryNumber, DeliveryState, Detach, Disposition,
End, Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode,
Source, Transfer, TransferBody, TransferNumber,
};
use ntex_amqp_codec::{AmqpFrame, Encode};

Expand All @@ -33,6 +33,7 @@ pub(crate) struct SessionInner {
sink: ConnectionRef,
next_outgoing_id: TransferNumber,
flags: Flags,
begin: Begin,

remote_channel_id: u16,
next_incoming_id: TransferNumber,
Expand Down Expand Up @@ -64,6 +65,12 @@ impl Session {
Session { inner }
}

#[inline]
/// Get begin frame reference
pub fn frame(&self) -> &Begin {
&self.inner.get_ref().begin
}

#[inline]
/// Get io tag for current connection
pub fn tag(&self) -> &'static str {
Expand Down Expand Up @@ -282,17 +289,12 @@ impl SessionInner {
local: bool,
sink: ConnectionRef,
remote_channel_id: u16,
next_incoming_id: DeliveryNumber,
remote_incoming_window: u32,
remote_outgoing_window: u32,
begin: Begin,
) -> SessionInner {
SessionInner {
id,
sink,
next_incoming_id,
remote_channel_id,
remote_incoming_window,
remote_outgoing_window,
next_incoming_id: begin.next_outgoing_id(),
remote_incoming_window: begin.incoming_window(),
remote_outgoing_window: begin.outgoing_window(),
flags: if local { Flags::LOCAL } else { Flags::empty() },
next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
unsettled_snd_deliveries: HashMap::default(),
Expand All @@ -305,6 +307,10 @@ impl SessionInner {
pool_notify: pool::new(),
pool_credit: pool::new(),
closed: condition::Condition::new(),
id,
sink,
begin,
remote_channel_id,
}
}

Expand Down Expand Up @@ -1255,8 +1261,7 @@ impl SessionInner {
.sink
.config()
.write_buf()
.buf_with_capacity(msg.encoded_size())
.into();
.buf_with_capacity(msg.encoded_size());
msg.encode(&mut buf);
buf.freeze()
}
Expand Down
Loading