Skip to content

Commit 077671d

Browse files
committed
silence newer clippy warnings
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
1 parent 83b5da9 commit 077671d

File tree

5 files changed

+44
-47
lines changed

5 files changed

+44
-47
lines changed

src/channel.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,11 @@ impl Channel {
129129
}
130130

131131
pub async fn wait_for_recovery(&self, error: Error) -> Result<()> {
132-
if self.recovery_config.can_recover(&error) {
133-
if let Some(notifier) = error.notifier() {
134-
notifier.await;
135-
return Ok(());
136-
}
132+
if self.recovery_config.can_recover(&error)
133+
&& let Some(notifier) = error.notifier()
134+
{
135+
notifier.await;
136+
return Ok(());
137137
}
138138
Err(error)
139139
}

src/error.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,19 @@ impl Error {
8585
}
8686

8787
pub fn is_amqp_soft_error(&self) -> bool {
88-
if let ErrorKind::ProtocolError(e) = self.kind() {
89-
if let AMQPErrorKind::Soft(_) = e.kind() {
90-
return true;
91-
}
88+
if let ErrorKind::ProtocolError(e) = self.kind()
89+
&& let AMQPErrorKind::Soft(_) = e.kind()
90+
{
91+
return true;
9292
}
9393
false
9494
}
9595

9696
pub fn is_amqp_hard_error(&self) -> bool {
97-
if let ErrorKind::ProtocolError(e) = self.kind() {
98-
if let AMQPErrorKind::Hard(_) = e.kind() {
99-
return true;
100-
}
97+
if let ErrorKind::ProtocolError(e) = self.kind()
98+
&& let AMQPErrorKind::Hard(_) = e.kind()
99+
{
100+
return true;
101101
}
102102
false
103103
}

src/frames.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -312,33 +312,31 @@ impl Inner {
312312
{
313313
return Some(frame);
314314
}
315-
if flow {
316-
if let Some(frame) = self.low_prio_frames.pop_front() {
317-
// If the next frame is a header, that means we're a basic.publish
318-
// Header frame needs to follow directly the basic.publish frame, and Body frames
319-
// need to be sent just after those or the AMQP server will close the connection.
320-
// Push the header into publish_frames which is there to handle just that.
321-
if self
322-
.low_prio_frames
323-
.front()
324-
.map(|frame| frame.is_header())
325-
.unwrap_or(false)
326-
{
327-
while let Some(next_frame) = self.low_prio_frames.pop_front() {
328-
match *next_frame {
329-
AMQPFrame::Header(..) | AMQPFrame::Body(..) => {
330-
self.publish_frames.push_back(next_frame);
331-
}
332-
_ => {
333-
// We've exhausted Body frames for this publish, push back the next one and exit
334-
self.low_prio_frames.push_front(next_frame);
335-
break;
336-
}
315+
if flow && let Some(frame) = self.low_prio_frames.pop_front() {
316+
// If the next frame is a header, that means we're a basic.publish
317+
// Header frame needs to follow directly the basic.publish frame, and Body frames
318+
// need to be sent just after those or the AMQP server will close the connection.
319+
// Push the header into publish_frames which is there to handle just that.
320+
if self
321+
.low_prio_frames
322+
.front()
323+
.map(|frame| frame.is_header())
324+
.unwrap_or(false)
325+
{
326+
while let Some(next_frame) = self.low_prio_frames.pop_front() {
327+
match *next_frame {
328+
AMQPFrame::Header(..) | AMQPFrame::Body(..) => {
329+
self.publish_frames.push_back(next_frame);
330+
}
331+
_ => {
332+
// We've exhausted Body frames for this publish, push back the next one and exit
333+
self.low_prio_frames.push_front(next_frame);
334+
break;
337335
}
338336
}
339337
}
340-
return Some(frame);
341338
}
339+
return Some(frame);
342340
}
343341
None
344342
}

src/io_loop.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,13 @@ impl<
231231

232232
trace!(status=?self.status, connection_status=?self.connection_status.state(), "io_loop entering exit/cleanup phase");
233233
self.internal_rpc.stop();
234-
if self.heartbeat.killswitch().killed() {
235-
if let Err(err) = self.runtime.block_on(std::future::poll_fn(move |cx| {
234+
if self.heartbeat.killswitch().killed()
235+
&& let Err(err) = self.runtime.block_on(std::future::poll_fn(move |cx| {
236236
Pin::new(&mut stream)
237237
.poll_close(cx)
238238
})) {
239239
error!(?err, "Failed to close IO stream");
240240
}
241-
}
242241
res
243242
})?;
244243
waker.wake();

src/thread.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ impl ThreadHandle {
2828
}
2929

3030
pub(crate) fn wait(&self, context: &'static str) -> Result<()> {
31-
if let Some(handle) = self.take() {
32-
if handle.thread().id() != thread::current().id() {
33-
match handle.join() {
34-
Ok(res) => return res,
35-
Err(e) => {
36-
error!(%context, "Failed waiting for thread");
37-
panic::resume_unwind(e);
38-
}
31+
if let Some(handle) = self.take()
32+
&& handle.thread().id() != thread::current().id()
33+
{
34+
match handle.join() {
35+
Ok(res) => return res,
36+
Err(e) => {
37+
error!(%context, "Failed waiting for thread");
38+
panic::resume_unwind(e);
3939
}
4040
}
4141
}

0 commit comments

Comments
 (0)