Skip to content

Commit 45e57b9

Browse files
committed
use async-rs to check for runtime shutdown errors
1 parent 6a64fc6 commit 45e57b9

File tree

5 files changed

+16
-31
lines changed

5 files changed

+16
-31
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ default-runtime = ["tokio"]
2020
async-global-executor = ["amq-protocol/async-global-executor", "async-rs/async-global-executor", "async-rs/async-io"]
2121
hickory-dns = ["amq-protocol/hickory-dns", "async-rs/hickory-dns"]
2222
smol = ["amq-protocol/smol", "async-rs/smol"]
23-
tokio = ["amq-protocol/tokio", "async-rs/tokio", "dep:tokio"]
23+
tokio = ["amq-protocol/tokio", "async-rs/tokio"]
2424

2525
codegen = ["codegen-internal", "amq-protocol/codegen"]
2626
codegen-internal = ["dep:amq-protocol-codegen", "dep:serde_json"]
@@ -56,7 +56,7 @@ version = "^10.0.1"
5656
default-features = false
5757

5858
[dependencies.async-rs]
59-
version = "^0.8"
59+
version = "^0.8.1"
6060
default-features = false
6161

6262
[dependencies.backon]
@@ -69,12 +69,6 @@ version = "^0.12"
6969
default-features = false
7070
features = ["async"]
7171

72-
[dependencies.tokio]
73-
version = "^1.50"
74-
default-features = false
75-
features = ["rt"]
76-
optional = true
77-
7872
[dependencies.tracing]
7973
version = "^0.1"
8074
default-features = false

src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ impl Connect for AMQPUri {
317317
async move |uri, runtime| {
318318
AMQPUriTcpExt::connect_with_config_async(&uri, config.as_ref(), &runtime)
319319
.await
320-
.map_err(Error::io)
320+
.map_err(|err| Error::io(err, &runtime))
321321
},
322322
options,
323323
)

src/error.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use amq_protocol::{
55
frame::{GenError, ParserError, ProtocolVersion},
66
protocol::AMQPErrorKind,
77
};
8-
use cfg_if::cfg_if;
8+
use async_rs::{Runtime, traits::*};
99
use std::{error, fmt, io, sync::Arc};
1010

1111
/// A std Result with a lapin::Error error type
@@ -47,8 +47,8 @@ impl Error {
4747
io::Error::other(error).into()
4848
}
4949

50-
pub(crate) fn io(error: io::Error) -> Self {
51-
if io_error_is_runtime_shutdown(&error) {
50+
pub(crate) fn io<RK: RuntimeKit>(error: io::Error, rt: &Runtime<RK>) -> Self {
51+
if rt.is_runtime_shutdown_error(&error) {
5252
ErrorKind::RuntimeShutdownError(Arc::new(error)).into()
5353
} else {
5454
error.into()
@@ -144,18 +144,6 @@ impl Error {
144144
}
145145
}
146146

147-
cfg_if! {
148-
if #[cfg(feature = "tokio")] {
149-
fn io_error_is_runtime_shutdown(err: &io::Error) -> bool {
150-
tokio::runtime::is_rt_shutdown_err(err)
151-
}
152-
} else {
153-
fn io_error_is_runtime_shutdown(_: &io::Error) -> bool {
154-
false
155-
}
156-
}
157-
}
158-
159147
impl fmt::Display for Error {
160148
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161149
match self.kind() {

src/io_loop.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl<
199199
}
200200

201201
pub(crate) fn start(mut self) -> Result<JoinHandle> {
202+
let runtime = self.runtime.clone();
202203
let waker = self.socket_state.handle();
203204
let connect_span = tracing::Span::current();
204205
let handle = ThreadBuilder::new()
@@ -270,7 +271,7 @@ impl<
270271
error!(?err, "Failed to close IO stream");
271272
}
272273
res
273-
}).map_err(Error::io)?;
274+
}).map_err(|err| Error::io(err, &runtime))?;
274275
waker.wake();
275276
Ok(handle)
276277
}
@@ -391,7 +392,9 @@ impl<
391392
stream: Pin<&mut T>,
392393
writable_context: &mut Context<'_>,
393394
) -> Result<()> {
394-
let res = stream.poll_flush(writable_context).map_err(Error::io)?;
395+
let res = stream
396+
.poll_flush(writable_context)
397+
.map_err(|err| Error::io(err, &self.runtime))?;
395398
self.socket_state.handle_write_poll(res);
396399
Ok(())
397400
}
@@ -440,7 +443,7 @@ impl<
440443
let res = self
441444
.send_buffer
442445
.poll_write_to(writable_context, stream.as_mut())
443-
.map_err(Error::io)?;
446+
.map_err(|err| Error::io(err, &self.runtime))?;
444447

445448
if let Some(sz) = self.socket_state.handle_write_poll(res) {
446449
if sz > 0 {
@@ -499,7 +502,7 @@ impl<
499502
let res = self
500503
.receive_buffer
501504
.poll_read_from(readable_context, stream)
502-
.map_err(Error::io)?;
505+
.map_err(|err| Error::io(err, &self.runtime))?;
503506

504507
if let Some(sz) = self.socket_state.handle_read_poll(res) {
505508
if sz > 0 {
@@ -540,7 +543,7 @@ impl<
540543
return Ok(true);
541544
}
542545
self.socket_state
543-
.handle_io_result(Err(Error::io(io::ErrorKind::ConnectionAborted.into())))?;
546+
.handle_io_result(Err(io::Error::from(io::ErrorKind::ConnectionAborted).into()))?;
544547
Ok(false)
545548
}
546549

0 commit comments

Comments
 (0)