Skip to content

Commit cdd7583

Browse files
minor: fix clippy (#502)
1 parent b7e15ba commit cdd7583

File tree

26 files changed

+151
-113
lines changed

26 files changed

+151
-113
lines changed

src/cmap/conn/stream_description.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub(crate) struct StreamDescription {
1515
pub(crate) max_wire_version: Option<i32>,
1616

1717
/// The minimum wire version that the server understands.
18+
#[allow(dead_code)]
1819
pub(crate) min_wire_version: Option<i32>,
1920

2021
/// The supported authentication mechanisms that the server understands.

src/cmap/connection_requester.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,18 @@ use tokio::sync::{mpsc, oneshot};
33
use super::{worker::PoolWorkerHandle, Connection};
44
use crate::{
55
error::{Error, Result},
6-
options::ServerAddress,
76
runtime::AsyncJoinHandle,
87
};
98

109
/// Returns a new requester/receiver pair.
1110
pub(super) fn channel(
12-
address: ServerAddress,
1311
handle: PoolWorkerHandle,
1412
) -> (ConnectionRequester, ConnectionRequestReceiver) {
1513
let (sender, receiver) = mpsc::unbounded_channel();
1614
(
1715
ConnectionRequester {
18-
address,
1916
sender,
20-
handle,
17+
_handle: handle,
2118
},
2219
ConnectionRequestReceiver { receiver },
2320
)
@@ -28,9 +25,8 @@ pub(super) fn channel(
2825
/// the pool will stop servicing requests, drop its available connections, and close.
2926
#[derive(Clone, Debug)]
3027
pub(super) struct ConnectionRequester {
31-
address: ServerAddress,
3228
sender: mpsc::UnboundedSender<oneshot::Sender<ConnectionRequestResult>>,
33-
handle: PoolWorkerHandle,
29+
_handle: PoolWorkerHandle,
3430
}
3531

3632
impl ConnectionRequester {
@@ -83,7 +79,7 @@ impl ConnectionRequest {
8379
#[derive(Debug)]
8480
pub(super) enum ConnectionRequestResult {
8581
/// A connection that was already established and was simply checked out of the pool.
86-
Pooled(Connection),
82+
Pooled(Box<Connection>),
8783

8884
/// A new connection in the process of being established.
8985
/// The handle can be awaited upon to receive the established connection.
@@ -97,7 +93,7 @@ pub(super) enum ConnectionRequestResult {
9793
impl ConnectionRequestResult {
9894
pub(super) fn unwrap_pooled_connection(self) -> Connection {
9995
match self {
100-
ConnectionRequestResult::Pooled(c) => c,
96+
ConnectionRequestResult::Pooled(c) => *c,
10197
_ => panic!("attempted to unwrap pooled connection when was establishing"),
10298
}
10399
}

src/cmap/establish/handshake/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ pub(crate) struct Handshaker {
146146
credential: Option<Credential>,
147147
#[cfg(test)]
148148
mock_service_id: bool,
149+
// This field is not read without a compression feature flag turned on.
150+
#[allow(dead_code)]
149151
compressors: Option<Vec<Compressor>>,
150152
}
151153

@@ -286,8 +288,13 @@ impl Handshaker {
286288
.iter()
287289
.find(|c| server_compressors.iter().any(|x| c.name() == x))
288290
{
291+
// Without a feature flag turned on, the Compressor enum is empty which causes an
292+
// unreachable code warning.
293+
#[allow(unreachable_code)]
289294
// zlib compression level is already set
290-
conn.compressor = Some(compressor.clone());
295+
{
296+
conn.compressor = Some(compressor.clone());
297+
}
291298
}
292299
}
293300

src/cmap/manager.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl PoolManager {
2727
if self
2828
.sender
2929
.send(PoolManagementRequest::Clear {
30-
completion_handler: message,
30+
_completion_handler: message,
3131
cause,
3232
service_id,
3333
})
@@ -43,7 +43,7 @@ impl PoolManager {
4343
if self
4444
.sender
4545
.send(PoolManagementRequest::MarkAsReady {
46-
completion_handler: message,
46+
_completion_handler: message,
4747
})
4848
.is_ok()
4949
{
@@ -54,7 +54,10 @@ impl PoolManager {
5454
/// Check in the given connection to the pool.
5555
/// This returns an error containing the connection if the pool has been dropped already.
5656
pub(crate) fn check_in(&self, connection: Connection) -> std::result::Result<(), Connection> {
57-
if let Err(request) = self.sender.send(PoolManagementRequest::CheckIn(connection)) {
57+
if let Err(request) = self
58+
.sender
59+
.send(PoolManagementRequest::CheckIn(Box::new(connection)))
60+
{
5861
let conn = request.0.unwrap_check_in();
5962
return Err(conn);
6063
}
@@ -99,18 +102,18 @@ impl ManagementRequestReceiver {
99102
pub(super) enum PoolManagementRequest {
100103
/// Clear the pool, transitioning it to Paused.
101104
Clear {
102-
completion_handler: AcknowledgedMessage<()>,
105+
_completion_handler: AcknowledgedMessage<()>,
103106
cause: Error,
104107
service_id: Option<ObjectId>,
105108
},
106109

107110
/// Mark the pool as Ready, allowing connections to be created and checked out.
108111
MarkAsReady {
109-
completion_handler: AcknowledgedMessage<()>,
112+
_completion_handler: AcknowledgedMessage<()>,
110113
},
111114

112115
/// Check in the given connection.
113-
CheckIn(Connection),
116+
CheckIn(Box<Connection>),
114117

115118
/// Update the pool based on the given establishment error.
116119
HandleConnectionFailed,
@@ -127,15 +130,15 @@ pub(super) enum PoolManagementRequest {
127130
impl PoolManagementRequest {
128131
fn unwrap_check_in(self) -> Connection {
129132
match self {
130-
PoolManagementRequest::CheckIn(conn) => conn,
133+
PoolManagementRequest::CheckIn(conn) => *conn,
131134
_ => panic!("tried to unwrap checkin but got {:?}", self),
132135
}
133136
}
134137
}
135138

136139
#[derive(Debug)]
137140
pub(super) enum ConnectionSucceeded {
138-
ForPool(Connection),
141+
ForPool(Box<Connection>),
139142
Used { service_id: Option<ObjectId> },
140143
}
141144

src/cmap/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl ConnectionPool {
9898
pub(crate) fn new_mocked(address: ServerAddress) -> Self {
9999
let (manager, _) = manager::channel();
100100
let handle = PoolWorkerHandle::new_mocked();
101-
let (connection_requester, _) = connection_requester::channel(Default::default(), handle);
101+
let (connection_requester, _) = connection_requester::channel(handle);
102102
let (_, generation_subscriber) = status::channel(PoolGeneration::normal());
103103

104104
Self {
@@ -134,7 +134,7 @@ impl ConnectionPool {
134134
let response = self.connection_requester.request().await;
135135

136136
let conn = match response {
137-
ConnectionRequestResult::Pooled(c) => Ok(c),
137+
ConnectionRequestResult::Pooled(c) => Ok(*c),
138138
ConnectionRequestResult::Establishing(task) => task.await,
139139
ConnectionRequestResult::PoolCleared(e) => {
140140
Err(Error::pool_cleared_error(&self.address, &e))

src/cmap/test/file.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use crate::{bson_util, cmap::options::ConnectionPoolOptions, error::Result, test
77
use bson::Document;
88

99
#[derive(Debug, Deserialize)]
10+
// TODO RUST-1079: remove the #[allow(dead_code)] tag and add #[serde(deny_unknown_fields)] to
11+
// ensure these tests are being fully run
12+
#[allow(dead_code)]
1013
#[serde(rename_all = "camelCase")]
1114
pub struct TestFile {
1215
version: u8,
@@ -83,6 +86,9 @@ pub enum Operation {
8386
}
8487

8588
#[derive(Debug, Deserialize)]
89+
// TODO RUST-1077: remove the #[allow(dead_code)] tag and add #[serde(deny_unknown_fields)] to
90+
// ensure these tests are being fully run
91+
#[allow(dead_code)]
8692
pub struct Error {
8793
#[serde(rename = "type")]
8894
pub type_: String,

src/cmap/test/integration.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ async fn concurrent_connections() {
140140
consecutive_creations += 1;
141141
}
142142
Event::ConnectionReady(_) => {
143-
if consecutive_creations < 2 {
144-
panic!("connections not created concurrently");
145-
}
143+
assert!(
144+
consecutive_creations >= 2,
145+
"connections not created concurrently"
146+
);
146147
}
147148
_ => (),
148149
}

src/cmap/test/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl Executor {
127127
let handler = Arc::new(EventHandler::new());
128128
let error = test_file.error;
129129

130-
let mut pool_options = test_file.pool_options.unwrap_or_else(Default::default);
130+
let mut pool_options = test_file.pool_options.unwrap_or_default();
131131
pool_options.tls_options = CLIENT_OPTIONS.tls_options();
132132
pool_options.cmap_event_handler = Some(handler.clone());
133133
pool_options.server_api = SERVER_API.clone();

src/cmap/worker.rs

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,7 @@ impl ConnectionPoolWorker {
171171
.map(|pool_options| ConnectionOptions::from(pool_options.clone()));
172172

173173
let (handle, handle_listener) = handle_channel();
174-
let (connection_requester, request_receiver) =
175-
connection_requester::channel(address.clone(), handle);
174+
let (connection_requester, request_receiver) = connection_requester::channel(handle);
176175
let (manager, management_receiver) = manager::channel();
177176

178177
let is_load_balanced = options
@@ -292,31 +291,29 @@ impl ConnectionPoolWorker {
292291
));
293292
}
294293
},
295-
PoolTask::HandleManagementRequest(PoolManagementRequest::CheckIn(connection)) => {
296-
self.check_in(connection);
297-
}
298-
PoolTask::HandleManagementRequest(PoolManagementRequest::Clear {
299-
completion_handler: _,
300-
cause,
301-
service_id,
302-
}) => {
303-
self.clear(cause, service_id);
304-
}
305-
PoolTask::HandleManagementRequest(PoolManagementRequest::MarkAsReady {
306-
completion_handler: _handler,
307-
}) => {
308-
self.mark_as_ready();
309-
}
310-
PoolTask::HandleManagementRequest(
311-
PoolManagementRequest::HandleConnectionSucceeded(conn),
312-
) => self.handle_connection_succeeded(conn),
313-
PoolTask::HandleManagementRequest(
314-
PoolManagementRequest::HandleConnectionFailed,
315-
) => self.handle_connection_failed(),
316-
#[cfg(test)]
317-
PoolTask::HandleManagementRequest(PoolManagementRequest::Sync(tx)) => {
318-
let _ = tx.send(());
319-
}
294+
PoolTask::HandleManagementRequest(request) => match *request {
295+
PoolManagementRequest::CheckIn(connection) => {
296+
self.check_in(*connection);
297+
}
298+
PoolManagementRequest::Clear {
299+
cause, service_id, ..
300+
} => {
301+
self.clear(cause, service_id);
302+
}
303+
PoolManagementRequest::MarkAsReady { .. } => {
304+
self.mark_as_ready();
305+
}
306+
PoolManagementRequest::HandleConnectionSucceeded(conn) => {
307+
self.handle_connection_succeeded(conn);
308+
}
309+
PoolManagementRequest::HandleConnectionFailed => {
310+
self.handle_connection_failed();
311+
}
312+
#[cfg(test)]
313+
PoolManagementRequest::Sync(tx) => {
314+
let _ = tx.send(());
315+
}
316+
},
320317
PoolTask::Maintenance => {
321318
self.perform_maintenance();
322319
}
@@ -372,7 +369,7 @@ impl ConnectionPoolWorker {
372369
}
373370

374371
conn.mark_as_in_use(self.manager.clone());
375-
if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(conn)) {
372+
if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn))) {
376373
// checking out thread stopped listening, indicating it hit the WaitQueue
377374
// timeout, so we put connection back into pool.
378375
let mut connection = request.unwrap_pooled_connection();
@@ -466,7 +463,8 @@ impl ConnectionPoolWorker {
466463
let count = self.service_connection_count.entry(sid).or_insert(0);
467464
*count += 1;
468465
}
469-
if let ConnectionSucceeded::ForPool(mut connection) = connection {
466+
if let ConnectionSucceeded::ForPool(connection) = connection {
467+
let mut connection = *connection;
470468
connection.mark_as_available();
471469
self.available_connections.push_back(connection);
472470
}
@@ -623,8 +621,9 @@ impl ConnectionPoolWorker {
623621
.await;
624622

625623
if let Ok(connection) = connection {
626-
manager
627-
.handle_connection_succeeded(ConnectionSucceeded::ForPool(connection))
624+
manager.handle_connection_succeeded(ConnectionSucceeded::ForPool(Box::new(
625+
connection,
626+
)))
628627
}
629628
});
630629
}
@@ -690,7 +689,7 @@ enum PoolState {
690689
#[derive(Debug)]
691690
enum PoolTask {
692691
/// Handle a management request from a `PoolManager`.
693-
HandleManagementRequest(PoolManagementRequest),
692+
HandleManagementRequest(Box<PoolManagementRequest>),
694693

695694
/// Fulfill the given connection request.
696695
CheckOut(ConnectionRequest),
@@ -701,22 +700,25 @@ enum PoolTask {
701700

702701
impl From<PoolManagementRequest> for PoolTask {
703702
fn from(request: PoolManagementRequest) -> Self {
704-
PoolTask::HandleManagementRequest(request)
703+
PoolTask::HandleManagementRequest(Box::new(request))
705704
}
706705
}
707706

708707
/// Constructs a new channel for for monitoring whether this pool still has references
709708
/// to it.
710709
fn handle_channel() -> (PoolWorkerHandle, HandleListener) {
711710
let (sender, receiver) = mpsc::channel(1);
712-
(PoolWorkerHandle { sender }, HandleListener { receiver })
711+
(
712+
PoolWorkerHandle { _sender: sender },
713+
HandleListener { receiver },
714+
)
713715
}
714716

715717
/// Handle to the worker. Once all handles have been dropped, the worker
716718
/// will stop waiting for new requests and drop the pool itself.
717719
#[derive(Debug, Clone)]
718720
pub(super) struct PoolWorkerHandle {
719-
sender: mpsc::Sender<()>,
721+
_sender: mpsc::Sender<()>,
720722
}
721723

722724
impl PoolWorkerHandle {

src/cursor/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ where
124124
/// on.
125125
#[cfg(test)]
126126
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
127-
if self.kill_watcher.is_some() {
128-
panic!("cursor already has a kill_watcher");
129-
}
127+
assert!(
128+
self.kill_watcher.is_none(),
129+
"cursor already has a kill_watcher"
130+
);
130131
self.kill_watcher = Some(tx);
131132
}
132133
}

0 commit comments

Comments
 (0)