Skip to content

Commit 8f26bfc

Browse files
patrickfreedsaghm
authored andcommitted
RUST-530 Additional fixes
1 parent 4d4e007 commit 8f26bfc

File tree

3 files changed

+15
-21
lines changed

3 files changed

+15
-21
lines changed

src/cmap/conn/mod.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ impl Connection {
161161
self.generation != current_generation
162162
}
163163

164+
/// Checks if the connection is currently executing an operation.
165+
pub(super) fn is_executing(&self) -> bool {
166+
self.command_executing
167+
}
168+
164169
/// Helper to create a `ConnectionCheckedOutEvent` for the connection.
165170
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
166171
ConnectionCheckedOutEvent {
@@ -257,14 +262,13 @@ impl Connection {
257262
stream: std::mem::replace(&mut self.stream, AsyncStream::Null),
258263
handler: self.handler.take(),
259264
stream_description: self.stream_description.take(),
265+
command_executing: self.command_executing,
260266
}
261267
}
262268
}
263269

264270
impl Drop for Connection {
265271
fn drop(&mut self) {
266-
let command_executing = self.command_executing;
267-
268272
// If the connection has a weak reference to a pool, that means that the connection is
269273
// being dropped when it's checked out. If the pool is still alive, it
270274
// should check itself back in. Otherwise, the connection should close
@@ -279,15 +283,9 @@ impl Drop for Connection {
279283
if let Some(strong_pool_ref) = weak_pool_ref.upgrade() {
280284
let dropped_connection_state = self.take();
281285
RUNTIME.execute(async move {
282-
if command_executing {
283-
strong_pool_ref
284-
.dropped(dropped_connection_state.into())
285-
.await;
286-
} else {
287-
strong_pool_ref
288-
.check_in(dropped_connection_state.into())
289-
.await;
290-
}
286+
strong_pool_ref
287+
.check_in(dropped_connection_state.into())
288+
.await;
291289
});
292290
} else {
293291
self.close(ConnectionClosedReason::PoolClosed);
@@ -315,6 +313,7 @@ struct DroppedConnectionState {
315313
#[derivative(Debug = "ignore")]
316314
handler: Option<Arc<dyn CmapEventHandler>>,
317315
stream_description: Option<StreamDescription>,
316+
command_executing: bool,
318317
}
319318

320319
impl Drop for DroppedConnectionState {
@@ -338,7 +337,7 @@ impl From<DroppedConnectionState> for Connection {
338337
id: state.id,
339338
address: state.address.clone(),
340339
generation: state.generation,
341-
command_executing: false,
340+
command_executing: state.command_executing,
342341
stream: std::mem::replace(&mut state.stream, AsyncStream::Null),
343342
handler: state.handler.take(),
344343
stream_description: state.stream_description.take(),

src/cmap/mod.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,6 @@ impl ConnectionPoolInner {
300300
}
301301
}
302302

303-
async fn dropped(&self, conn: Connection) {
304-
let mut connection_manager = self.connection_manager.lock().await;
305-
306-
connection_manager.close_connection(conn, ConnectionClosedReason::Dropped);
307-
308-
self.wait_queue.wake_front();
309-
}
310-
311303
async fn check_in(&self, mut conn: Connection) {
312304
self.emit_event(|handler| {
313305
handler.handle_connection_checked_in_event(conn.checked_in_event());
@@ -320,6 +312,8 @@ impl ConnectionPoolInner {
320312
// Close the connection if it's stale.
321313
if conn.is_stale(connection_manager.generation) {
322314
connection_manager.close_connection(conn, ConnectionClosedReason::Stale);
315+
} else if conn.is_executing() {
316+
connection_manager.close_connection(conn, ConnectionClosedReason::Dropped)
323317
} else {
324318
connection_manager.checked_in_connections.push(conn);
325319
}

src/test/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ async fn metadata_sent_in_handshake() {
6464
async fn connection_drop_during_read() {
6565
let _guard = LOCK.run_concurrently().await;
6666

67-
let options = CLIENT_OPTIONS.clone();
67+
let mut options = CLIENT_OPTIONS.clone();
68+
options.max_pool_size = Some(1);
6869

6970
let client = Client::with_options(options.clone()).unwrap();
7071
let db = client.database("test");

0 commit comments

Comments
 (0)