Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [5.5.0] - 2026-02-16

* SharedCfg is not Copy anymore

## [5.4.0] - 2026-01-29

* Use ntex_dispatcher::Dispatcher instead of ntex-io
Expand Down
22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "5.4.0"
version = "5.5.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -18,6 +18,16 @@ members = [
"codec",
]

[workspace.lints.rust]
async_fn_in_trait = { level = "allow", priority = -1 }
rust_2018_idioms = "deny"
warnings = "deny"
unreachable_pub = "deny"
#missing_debug_implementations = "deny"

[lints]
workspace = true

[features]
default = []

Expand All @@ -27,14 +37,14 @@ frame-trace = []
[dependencies]
ntex-amqp-codec = "2.0.0"

ntex-bytes = "1.4"
ntex-bytes = "1.5"
ntex-dispatcher = "3"
ntex-router = "1"
ntex-rt = "3.5"
ntex-io = "3.5"
ntex-net = "3.5"
ntex-rt = "3.7"
ntex-io = "3.8"
ntex-net = "3.6"
ntex-util = "3.4"
ntex-service = "4"
ntex-service = "4.4"

bitflags = "2"
log = "0.4"
Expand Down
5 changes: 4 additions & 1 deletion codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ license = "MIT/Apache-2.0"
edition = "2024"
rust-version = "1.88"

[lints]
workspace = true

[dependencies]
ntex-bytes = "1.4"
ntex-bytes = "1.5"
ntex-codec = "1.1"
ntex-util = "3"
byteorder = "1"
Expand Down
7 changes: 5 additions & 2 deletions codec/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<T: DecodeFormatted> Decode for T {
}
}

pub fn decode_format_code(input: &mut Bytes) -> Result<u8, AmqpParseError> {
pub(crate) fn decode_format_code(input: &mut Bytes) -> Result<u8, AmqpParseError> {
decode_check_len!(input, 1);
let code = input.get_u8();
Ok(code)
Expand Down Expand Up @@ -108,18 +108,21 @@ pub mod format_codes {
pub const FORMATCODE_ARRAY32: u8 = 0xf0;
}

pub use self::format_codes::*;
pub(crate) use self::format_codes::*;

#[derive(Copy, Clone, Debug)]
pub struct ListHeader {
pub size: u32,
pub count: u32,
}

#[derive(Copy, Clone, Debug)]
pub struct MapHeader {
pub size: u32,
pub count: u32,
}

#[derive(Copy, Clone, Debug)]
pub struct ArrayHeader {
pub size: u32,
pub count: u32,
Expand Down
3 changes: 1 addition & 2 deletions codec/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::protocol::{AmqpError, ProtocolId};
pub use crate::protocol::{Error, ErrorInner};
use crate::protocol::{AmqpError, Error, ErrorInner, ProtocolId};
use crate::types::Descriptor;

#[derive(Debug, Clone, thiserror::Error)]
Expand Down
6 changes: 3 additions & 3 deletions codec/src/framing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::protocol;

/// Length in bytes of the fixed frame header
pub const HEADER_LEN: usize = 8;
pub(crate) const HEADER_LEN: usize = 8;

/// AMQP Frame type marker (0)
pub const FRAME_TYPE_AMQP: u8 = 0x00;
pub const FRAME_TYPE_SASL: u8 = 0x01;
pub(crate) const FRAME_TYPE_AMQP: u8 = 0x00;
pub(crate) const FRAME_TYPE_SASL: u8 = 0x01;

/// Represents an AMQP Frame
#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion codec/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::types::{
use crate::{HashMap, error::AmqpParseError, message::Message};

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}
Expand Down
2 changes: 1 addition & 1 deletion codec/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl PartialEq<str> for Str {
}

impl fmt::Debug for Str {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
4 changes: 2 additions & 2 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ async fn main() -> std::io::Result<()> {
}
})
.finish(
server::Router::new()
server::Router::builder()
.service("test", fn_factory_with_config(server))
.finish(),
.build(),
)
})?
.workers(1)
Expand Down
18 changes: 9 additions & 9 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ where
let io = IoBoxed::from(io);

if let Some(auth) = sasl {
_connect_sasl(io, auth, self.config, hostname).await
connect_sasl_inner(io, auth, self.config.clone(), hostname).await
} else {
_connect_plain(io, self.config, hostname).await
connect_plain_inner(io, self.config.clone(), hostname).await
}
};
timeout_checked(self.config.handshake_timeout, fut)
.await
.map_err(|_| ConnectError::HandshakeTimeout)
.map_err(|()| ConnectError::HandshakeTimeout)
.and_then(|res| res)
}
}
Expand All @@ -136,7 +136,7 @@ where
) -> Result<Client, ConnectError> {
log::trace!("{}: Negotiation client protocol id: Amqp", io.tag());

_connect_plain(io, self.config, hostname).await
connect_plain_inner(io, self.config.clone(), hostname).await
}

/// Negotiate amqp sasl protocol over opened socket
Expand All @@ -148,11 +148,11 @@ where
) -> Result<Client, ConnectError> {
log::trace!("{}: Negotiation client protocol id: Amqp", io.tag());

_connect_sasl(io, auth, self.config, hostname).await
connect_sasl_inner(io, auth, self.config.clone(), hostname).await
}
}

async fn _connect_sasl(
async fn connect_sasl_inner(
io: IoBoxed,
auth: SaslAuth,
config: Cfg<AmqpServiceConfig>,
Expand Down Expand Up @@ -204,10 +204,10 @@ async fn _connect_sasl(
return Err(ConnectError::Disconnected);
}

_connect_plain(io, config, hostname).await
connect_plain_inner(io, config, hostname).await
}

async fn _connect_plain(
async fn connect_plain_inner(
io: IoBoxed,
config: Cfg<AmqpServiceConfig>,
hostname: Option<ByteString>,
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn _connect_plain(
if let Frame::Open(open) = frame.performative() {
log::trace!("{}: Open confirmed: {:?}", io.tag(), open);
let remote_config = config.from_remote(open);
let connection = Connection::new(io.get_ref(), &config, &remote_config);
let connection = Connection::new(io.get_ref(), config, &remote_config);
let client = Client::new(io, codec, connection, remote_config);
Ok(client)
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct Connect<T: Address> {
}

impl<T: Address> Connect<T> {
#[inline]
pub fn new(addr: T) -> Self {
Self {
addr,
Expand All @@ -27,7 +26,7 @@ impl<T: Address> Connect<T> {
}
}

#[inline]
#[must_use]
/// Use Sasl auth
pub fn sasl_auth(
mut self,
Expand All @@ -43,6 +42,7 @@ impl<T: Address> Connect<T> {
self
}

#[must_use]
/// Set connection hostname
///
/// Hostname is not set by default
Expand Down
Loading
Loading