Skip to content

Commit 6a64fc6

Browse files
committed
split RuntimeShutdownError into its own error kind
1 parent 1acf700 commit 6a64fc6

File tree

4 files changed

+36
-20
lines changed

4 files changed

+36
-20
lines changed

src/connection.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use amq_protocol::frame::{AMQPFrame, ProtocolVersion};
2121
use async_rs::{Runtime, traits::*};
2222
use async_trait::async_trait;
2323
use futures_core::Stream;
24-
use std::{fmt, io, sync::Arc};
24+
use std::{fmt, sync::Arc};
2525
use tracing::trace;
2626

2727
/// A TCP connection to the AMQP server.
@@ -315,10 +315,9 @@ impl Connect for AMQPUri {
315315
self,
316316
runtime,
317317
async move |uri, runtime| {
318-
Ok(
319-
AMQPUriTcpExt::connect_with_config_async(&uri, config.as_ref(), &runtime)
320-
.await?,
321-
)
318+
AMQPUriTcpExt::connect_with_config_async(&uri, config.as_ref(), &runtime)
319+
.await
320+
.map_err(Error::io)
322321
},
323322
options,
324323
)

src/connection_builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use crate::{
44
};
55

66
use async_rs::{Runtime, traits::*};
7-
use std::io;
87

98
#[derive(Debug)]
109
pub struct ConnectionBuilder<RK: RuntimeKit + Send + Sync + Clone + 'static> {

src/error.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub enum ErrorKind {
3333
InvalidConnectionState(ConnectionState),
3434

3535
IOError(Arc<io::Error>),
36+
RuntimeShutdownError(Arc<io::Error>),
3637
ParsingError(ParserError),
3738
ProtocolError(AMQPError),
3839
SerialisationError(Arc<GenError>),
@@ -43,7 +44,15 @@ pub enum ErrorKind {
4344

4445
impl Error {
4546
pub(crate) fn other<E: Into<Box<dyn error::Error + Send + Sync>>>(error: E) -> Self {
46-
ErrorKind::IOError(Arc::new(io::Error::other(error))).into()
47+
io::Error::other(error).into()
48+
}
49+
50+
pub(crate) fn io(error: io::Error) -> Self {
51+
if io_error_is_runtime_shutdown(&error) {
52+
ErrorKind::RuntimeShutdownError(Arc::new(error)).into()
53+
} else {
54+
error.into()
55+
}
4756
}
4857

4958
pub fn kind(&self) -> &ErrorKind {
@@ -79,6 +88,13 @@ impl Error {
7988
if let ErrorKind::IOError(_) = self.kind() {
8089
return true;
8190
}
91+
self.is_runtime_shutdown_error()
92+
}
93+
94+
pub fn is_runtime_shutdown_error(&self) -> bool {
95+
if let ErrorKind::RuntimeShutdownError(_) = self.kind() {
96+
return true;
97+
}
8298
false
8399
}
84100

@@ -107,13 +123,6 @@ impl Error {
107123
false
108124
}
109125

110-
pub fn is_runtime_shutdown_error(&self) -> bool {
111-
if let ErrorKind::IOError(e) = self.kind() {
112-
return io_error_is_runtime_shutdown(e);
113-
}
114-
false
115-
}
116-
117126
pub fn can_be_recovered(&self) -> bool {
118127
match self.kind() {
119128
ErrorKind::ChannelsLimitReached => false,
@@ -123,7 +132,8 @@ impl Error {
123132
ErrorKind::InvalidChannelState(..) => true,
124133
ErrorKind::InvalidConnectionState(_) => true,
125134

126-
ErrorKind::IOError(e) => !io_error_is_runtime_shutdown(e),
135+
ErrorKind::IOError(_) => true,
136+
ErrorKind::RuntimeShutdownError(_) => false,
127137
ErrorKind::ParsingError(_) => false,
128138
ErrorKind::ProtocolError(_) => true,
129139
ErrorKind::SerialisationError(_) => false,
@@ -166,6 +176,7 @@ impl fmt::Display for Error {
166176
}
167177

168178
ErrorKind::IOError(e) => write!(f, "IO error: {e}"),
179+
ErrorKind::RuntimeShutdownError(e) => write!(f, "runtime shutdown error: {e}"),
169180
ErrorKind::ParsingError(e) => write!(f, "failed to parse: {e}"),
170181
ErrorKind::ProtocolError(e) => write!(f, "protocol error: {e}"),
171182
ErrorKind::SerialisationError(e) => write!(f, "failed to serialise: {e}"),
@@ -182,6 +193,7 @@ impl error::Error for Error {
182193
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
183194
match self.kind() {
184195
ErrorKind::IOError(e) => Some(&**e),
196+
ErrorKind::RuntimeShutdownError(e) => Some(&**e),
185197
ErrorKind::ParsingError(e) => Some(e),
186198
ErrorKind::ProtocolError(e) => Some(e),
187199
ErrorKind::SerialisationError(e) => Some(&**e),
@@ -229,6 +241,10 @@ impl PartialEq for Error {
229241
error!("Unable to compare lapin::ErrorKind::IOError");
230242
false
231243
}
244+
(RuntimeShutdownError(_), RuntimeShutdownError(_)) => {
245+
error!("Unable to compare lapin::ErrorKind::RuntimeShutdownError");
246+
false
247+
}
232248
(ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
233249
(ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
234250
(SerialisationError(_), SerialisationError(_)) => {

src/io_loop.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ impl<
270270
error!(?err, "Failed to close IO stream");
271271
}
272272
res
273-
})?;
273+
}).map_err(Error::io)?;
274274
waker.wake();
275275
Ok(handle)
276276
}
@@ -391,7 +391,7 @@ impl<
391391
stream: Pin<&mut T>,
392392
writable_context: &mut Context<'_>,
393393
) -> Result<()> {
394-
let res = stream.poll_flush(writable_context)?;
394+
let res = stream.poll_flush(writable_context).map_err(Error::io)?;
395395
self.socket_state.handle_write_poll(res);
396396
Ok(())
397397
}
@@ -439,7 +439,8 @@ impl<
439439

440440
let res = self
441441
.send_buffer
442-
.poll_write_to(writable_context, stream.as_mut())?;
442+
.poll_write_to(writable_context, stream.as_mut())
443+
.map_err(Error::io)?;
443444

444445
if let Some(sz) = self.socket_state.handle_write_poll(res) {
445446
if sz > 0 {
@@ -497,7 +498,8 @@ impl<
497498
_ => {
498499
let res = self
499500
.receive_buffer
500-
.poll_read_from(readable_context, stream)?;
501+
.poll_read_from(readable_context, stream)
502+
.map_err(Error::io)?;
501503

502504
if let Some(sz) = self.socket_state.handle_read_poll(res) {
503505
if sz > 0 {
@@ -538,7 +540,7 @@ impl<
538540
return Ok(true);
539541
}
540542
self.socket_state
541-
.handle_io_result(Err(io::Error::from(io::ErrorKind::ConnectionAborted).into()))?;
543+
.handle_io_result(Err(Error::io(io::ErrorKind::ConnectionAborted.into())))?;
542544
Ok(false)
543545
}
544546

0 commit comments

Comments
 (0)