Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Changes

## [5.6.0] - 2026-02-16

* AmqpServiceConfig is not Clone

## [5.5.0] - 2026-02-16

* SharedCfg is not Copy anymore
* SharedCfg is not Copy

## [5.4.0] - 2026-01-29

Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "5.5.0"
version = "5.6.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down Expand Up @@ -58,7 +58,7 @@ thiserror = "2"
env_logger = "0.11"
rand = "0.9"
ntex-amqp = { path = ".", features = ["frame-trace"] }
ntex = "3.0.0-pre.14"
ntex = "3.3"

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
7 changes: 4 additions & 3 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use ntex_util::future::Ready;

use crate::codec::{AmqpCodec, AmqpFrame};
use crate::control::ControlFrame;
use crate::dispatcher::Dispatcher;
use crate::error::{AmqpDispatcherError, LinkError};
use crate::{AmqpServiceConfig, Connection, ConnectionRef, State, dispatcher::Dispatcher};
use crate::{Connection, ConnectionRef, RemoteServiceConfig, State};

/// Mqtt client
pub struct Client<St = ()> {
io: IoBoxed,
codec: AmqpCodec<AmqpFrame>,
connection: Connection,
remote_config: AmqpServiceConfig,
remote_config: RemoteServiceConfig,
_st: State<St>,
}

Expand All @@ -23,7 +24,7 @@ impl Client {
io: IoBoxed,
codec: AmqpCodec<AmqpFrame>,
connection: Connection,
remote_config: AmqpServiceConfig,
remote_config: RemoteServiceConfig,
) -> Self {
Client {
io,
Expand Down
4 changes: 2 additions & 2 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ntex_util::time::timeout_checked;

use crate::codec::protocol::{Frame, ProtocolId, SaslCode, SaslFrameBody, SaslInit};
use crate::codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec, SaslFrame, types::Symbol};
use crate::{AmqpServiceConfig, Connection, error::ProtocolIdError};
use crate::{AmqpServiceConfig, Connection, RemoteServiceConfig, error::ProtocolIdError};

use super::{Connect, SaslAuth, connection::Client, error::ConnectError};

Expand Down Expand Up @@ -249,7 +249,7 @@ async fn connect_plain_inner(

if let Frame::Open(open) = frame.performative() {
log::trace!("{}: Open confirmed: {:?}", io.tag(), open);
let remote_config = config.from_remote(open);
let remote_config = RemoteServiceConfig::new(open);
let connection = Connection::new(io.get_ref(), config, &remote_config);
let client = Client::new(io, codec, connection, remote_config);
Ok(client)
Expand Down
6 changes: 4 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::codec::{AmqpCodec, AmqpFrame, types};
use crate::control::ControlQueue;
use crate::session::{INITIAL_NEXT_OUTGOING_ID, Session, SessionInner};
use crate::sndlink::{SenderLink, SenderLinkInner};
use crate::{AmqpServiceConfig, cell::Cell, error::AmqpProtocolError, types::Action};
use crate::{
AmqpServiceConfig, RemoteServiceConfig, cell::Cell, error::AmqpProtocolError, types::Action,
};

pub struct Connection(ConnectionRef);

Expand Down Expand Up @@ -56,7 +58,7 @@ impl Connection {
pub(crate) fn new(
io: IoRef,
local_config: Cfg<AmqpServiceConfig>,
remote_config: &AmqpServiceConfig,
remote_config: &RemoteServiceConfig,
) -> Connection {
Connection(ConnectionRef(Cell::new(ConnectionInner {
io,
Expand Down
40 changes: 25 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub mod codec {
}

/// Amqp1 transport configuration.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct AmqpServiceConfig {
pub max_frame_size: u32,
pub channel_max: u16,
Expand All @@ -71,6 +71,17 @@ pub struct AmqpServiceConfig {
config: CfgContext,
}

/// Amqp1 transport configuration.
#[derive(Debug)]
pub struct RemoteServiceConfig {
pub max_frame_size: u32,
pub channel_max: u16,
pub idle_time_out: Milliseconds,
pub hostname: Option<ByteString>,
pub offered_capabilities: Option<Symbols>,
pub desired_capabilities: Option<Symbols>,
}

impl Default for AmqpServiceConfig {
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -219,28 +230,27 @@ impl AmqpServiceConfig {
properties: None,
}))
}
}

#[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
pub(crate) fn timeout_remote_secs(&self) -> Seconds {
if self.idle_time_out > 0 {
Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
} else {
Seconds::ZERO
}
}

impl RemoteServiceConfig {
#[must_use]
pub fn from_remote(&self, open: &Open) -> AmqpServiceConfig {
AmqpServiceConfig {
pub fn new(open: &Open) -> RemoteServiceConfig {
RemoteServiceConfig {
max_frame_size: open.max_frame_size(),
channel_max: open.channel_max(),
idle_time_out: open.idle_time_out().unwrap_or(0),
hostname: open.hostname().cloned(),
max_size: self.max_size,
handshake_timeout: self.handshake_timeout,
offered_capabilities: open.0.offered_capabilities.clone(),
desired_capabilities: open.0.desired_capabilities.clone(),
config: self.config.clone(),
}
}

#[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
pub(crate) fn timeout_remote_secs(&self) -> Seconds {
if self.idle_time_out > 0 {
Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
} else {
Seconds::ZERO
}
}
}
12 changes: 6 additions & 6 deletions src/server/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ntex_service::cfg::Cfg;
use ntex_util::time::Seconds;

use crate::codec::{AmqpCodec, AmqpFrame, protocol::Frame, protocol::Open};
use crate::{AmqpServiceConfig, connection::Connection};
use crate::{AmqpServiceConfig, RemoteServiceConfig, connection::Connection};

use super::{error::HandshakeError, sasl::Sasl};

Expand Down Expand Up @@ -66,8 +66,8 @@ impl HandshakeAmqp {
match frame {
Frame::Open(frame) => {
log::trace!("{}: Got open frame: {:?}", state.tag(), frame);
let remote_config = local_config.from_remote(&frame);
let sink = Connection::new(state.get_ref(), local_config, &remote_config);
let remote_config = RemoteServiceConfig::new(&frame);
let sink = Connection::new(state.get_ref(), local_config.clone(), &remote_config);
Ok(HandshakeAmqpOpened {
frame,
sink,
Expand All @@ -87,7 +87,7 @@ pub struct HandshakeAmqpOpened {
sink: Connection,
state: IoBoxed,
local_config: Cfg<AmqpServiceConfig>,
remote_config: AmqpServiceConfig,
remote_config: RemoteServiceConfig,
}

impl HandshakeAmqpOpened {
Expand All @@ -96,7 +96,7 @@ impl HandshakeAmqpOpened {
sink: Connection,
state: IoBoxed,
local_config: Cfg<AmqpServiceConfig>,
remote_config: AmqpServiceConfig,
remote_config: RemoteServiceConfig,
) -> Self {
Self {
frame,
Expand All @@ -123,7 +123,7 @@ impl HandshakeAmqpOpened {
}

/// Get remote configuration
pub fn remote_config(&self) -> &AmqpServiceConfig {
pub fn remote_config(&self) -> &RemoteServiceConfig {
&self.remote_config
}

Expand Down
7 changes: 4 additions & 3 deletions src/server/sasl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::codec::protocol::{
self, ProtocolId, SaslChallenge, SaslCode, SaslFrameBody, SaslMechanisms, SaslOutcome, Symbols,
};
use crate::codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec, ProtocolIdError, SaslFrame};
use crate::{AmqpServiceConfig, RemoteServiceConfig, connection::Connection};

use super::{HandshakeError, handshake::HandshakeAmqpOpened};
use crate::{AmqpServiceConfig, connection::Connection};

#[derive(Debug)]
pub struct Sasl {
Expand Down Expand Up @@ -264,8 +264,9 @@ impl SaslSuccess {
log::trace!("{}: Got open frame: {:?}", state.tag(), frame);

let local_config = self.local_config;
let remote_config = local_config.from_remote(&frame);
let sink = Connection::new(state.clone(), local_config, &remote_config);
let remote_config = RemoteServiceConfig::new(&frame);
let sink =
Connection::new(state.clone(), local_config.clone(), &remote_config);

Ok(HandshakeAmqpOpened::new(
frame,
Expand Down
Loading