Skip to content

Commit a024399

Browse files
authored
Use ntex_dispatcher::Dispatcher instead of ntex-io (#82)
1 parent 8437629 commit a024399

File tree

14 files changed

+75
-70
lines changed

14 files changed

+75
-70
lines changed

.github/workflows/linux.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
fail-fast: false
99
matrix:
1010
version:
11-
- 1.86.0 # MSRV
11+
- 1.88.0 # MSRV
1212
- stable
1313
- nightly
1414

CHANGES.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Changes
22

3-
## [5.3.0] - 2026-01-xx
3+
## [5.4.0] - 2026-01-29
4+
5+
* Use ntex_dispatcher::Dispatcher instead of ntex-io
6+
7+
## [5.3.0] - 2026-01-26
48

59
* Add `Begin` frame to `Session`
610

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "5.3.0"
3+
version = "5.4.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -10,7 +10,7 @@ keywords = ["AMQP", "IoT", "messaging"]
1010
license = "MIT OR Apache-2.0"
1111
exclude = [".gitignore", ".travis.yml", ".cargo/config"]
1212
edition = "2024"
13-
rust-version = "1.86"
13+
rust-version = "1.88"
1414

1515
[workspace]
1616
members = [
@@ -28,9 +28,10 @@ frame-trace = []
2828
ntex-amqp-codec = "2.0.0"
2929

3030
ntex-bytes = "1.4"
31+
ntex-dispatcher = "3"
3132
ntex-router = "1"
32-
ntex-rt = "3.4"
33-
ntex-io = "3.3"
33+
ntex-rt = "3.5"
34+
ntex-io = "3.5"
3435
ntex-net = "3.5"
3536
ntex-util = "3.4"
3637
ntex-service = "4"
@@ -47,7 +48,7 @@ thiserror = "2"
4748
env_logger = "0.11"
4849
rand = "0.9"
4950
ntex-amqp = { path = ".", features = ["frame-trace"] }
50-
ntex = "3.0.0-pre.11"
51+
ntex = "3.0.0-pre.14"
5152

5253
[patch.crates-io]
5354
ntex-amqp = { path = "." }

codec/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
[package]
22
name = "ntex-amqp-codec"
3-
version = "2.1.0"
3+
version = "2.1.1"
44
description = "AMQP 1.0 Protocol Codec"
55
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Max Gortman <mgortman@microsoft.com>", "Mike Yagley <myagley@gmail.com>"]
66
license = "MIT/Apache-2.0"
77
edition = "2024"
8-
rust-version = "1.86"
8+
rust-version = "1.88"
99

1010
[dependencies]
11-
ntex-bytes = "1.3"
11+
ntex-bytes = "1.4"
1212
ntex-codec = "1.1"
1313
ntex-util = "3"
1414
byteorder = "1"

examples/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ async fn main() -> std::io::Result<()> {
1313
// std::env::set_var("RUST_LOG", "trace,ntex_io=info");
1414
env_logger::init();
1515

16-
ntex::server::Server::build()
16+
ntex::server::Server::builder()
1717
.bind("amqp", "127.0.0.1:5671", async |_| {
1818
server::Server::build(|con: server::Handshake| async move {
1919
match con {

src/client/connection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use ntex_io::{Dispatcher as IoDispatcher, IoBoxed};
1+
use ntex_dispatcher::Dispatcher as IoDispatcher;
2+
use ntex_io::IoBoxed;
23
use ntex_service::{IntoService, Pipeline, Service, fn_service};
34
use ntex_util::future::Ready;
45

src/connection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ impl ConnectionRef {
182182
}
183183

184184
pub(crate) fn close_session(&self, id: usize) {
185-
if let Some(state) = self.0.get_mut().sessions.get_mut(id) {
186-
if let SessionState::Established(inner) = state {
187-
*state = SessionState::Closing(inner.clone());
188-
}
185+
if let Some(state) = self.0.get_mut().sessions.get_mut(id)
186+
&& let SessionState::Established(inner) = state
187+
{
188+
*state = SessionState::Closing(inner.clone());
189189
}
190190
}
191191

src/delivery.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,10 @@ impl TransferBuilder {
330330
} else if inner.closed {
331331
Err(AmqpProtocolError::Disconnected)
332332
} else {
333-
if let Some(limit) = inner.max_message_size {
334-
if self.data.len() > limit as usize {
335-
return Err(AmqpProtocolError::BodyTooLarge);
336-
}
333+
if let Some(limit) = inner.max_message_size
334+
&& self.data.len() > limit as usize
335+
{
336+
return Err(AmqpProtocolError::BodyTooLarge);
337337
}
338338

339339
let (id, tag) = self

src/dispatcher.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::task::{Context, Poll, ready};
22
use std::{cell, cmp, future::Future, future::poll_fn, marker, pin::Pin};
33

4-
use ntex_io::DispatchItem;
4+
use ntex_dispatcher::{DispatchItem, Reason};
55
use ntex_rt::spawn;
66
use ntex_service::{Pipeline, PipelineBinding, PipelineCall, Service, ServiceCtx};
77
use ntex_util::time::{Millis, Sleep, sleep};
@@ -261,31 +261,31 @@ where
261261

262262
Ok(None)
263263
}
264-
DispatchItem::EncoderError(err) | DispatchItem::DecoderError(err) => {
264+
DispatchItem::Stop(Reason::Encoder(err)) | DispatchItem::Stop(Reason::Decoder(err)) => {
265265
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
266266
err.into(),
267267
)));
268268
Ok(None)
269269
}
270-
DispatchItem::KeepAliveTimeout => {
270+
DispatchItem::Stop(Reason::KeepAliveTimeout) => {
271271
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
272272
AmqpProtocolError::KeepAliveTimeout,
273273
)));
274274
Ok(None)
275275
}
276-
DispatchItem::ReadTimeout => {
276+
DispatchItem::Stop(Reason::ReadTimeout) => {
277277
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
278278
AmqpProtocolError::ReadTimeout,
279279
)));
280280
Ok(None)
281281
}
282-
DispatchItem::Disconnect(e) => {
282+
DispatchItem::Stop(Reason::Io(e)) => {
283283
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::Disconnected(
284284
e,
285285
)));
286286
Ok(None)
287287
}
288-
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => Ok(None),
288+
DispatchItem::Control(_) => Ok(None),
289289
}
290290
}
291291
}

src/router.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ impl<S: 'static> Service<Message> for RouterService<S> {
182182
Ok(())
183183
}
184184
Message::Transfer(link) => {
185-
if let Some(Some(_)) = self.0.get_ref().handlers.get(&link) {
186-
if let Some((delivery, tr)) = link.get_delivery() {
187-
service_call(link, delivery, tr, &self.0).await?;
188-
}
185+
if let Some(Some(_)) = self.0.get_ref().handlers.get(&link)
186+
&& let Some((delivery, tr)) = link.get_delivery()
187+
{
188+
service_call(link, delivery, tr, &self.0).await?;
189189
}
190190
Ok(())
191191
}

0 commit comments

Comments
 (0)