Skip to content

Commit d8a0b34

Browse files
authored
RUST-253 Implement Drop on Cursor to run killCursors (#101)
1 parent 09c8792 commit d8a0b34

File tree

3 files changed

+104
-3
lines changed

3 files changed

+104
-3
lines changed

src/cursor.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{collections::VecDeque, time::Duration};
22

3-
use bson::Document;
3+
use bson::{bson, doc, Document};
44

55
use crate::{error::Result, operation::GetMore, options::StreamAddress, Client, Namespace};
66

@@ -84,6 +84,24 @@ impl Cursor {
8484
}
8585
}
8686

87+
impl Drop for Cursor {
88+
fn drop(&mut self) {
89+
if self.exhausted {
90+
return;
91+
}
92+
93+
let namespace = self.get_more.namespace();
94+
95+
let _ = self.client.database(&namespace.db).run_command(
96+
doc! {
97+
"killCursors": &namespace.coll,
98+
"cursors": [self.get_more.cursor_id()]
99+
},
100+
None,
101+
);
102+
}
103+
}
104+
87105
impl Iterator for Cursor {
88106
type Item = Result<Document>;
89107

src/operation/get_more/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ impl GetMore {
5757
max_time,
5858
}
5959
}
60+
61+
pub(crate) fn namespace(&self) -> &Namespace {
62+
&self.ns
63+
}
64+
65+
pub(crate) fn cursor_id(&self) -> i64 {
66+
self.cursor_id
67+
}
6068
}
6169

6270
impl Operation for GetMore {

src/test/coll.rs

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
use bson::{bson, doc, Bson};
22

33
use crate::{
4-
options::{AggregateOptions, UpdateOptions},
5-
test::{util::drop_collection, CLIENT, LOCK},
4+
event::command::CommandStartedEvent,
5+
options::{AggregateOptions, FindOptions, UpdateOptions},
6+
test::{
7+
util::{drop_collection, CommandEvent, EventClient},
8+
CLIENT,
9+
LOCK,
10+
},
611
};
712

813
#[test]
@@ -144,3 +149,73 @@ fn aggregate_out() {
144149
.into_iter()
145150
.any(|name| name.as_str() == out_coll.name()));
146151
}
152+
153+
fn kill_cursors_sent(client: &EventClient) -> bool {
154+
client
155+
.command_events
156+
.read()
157+
.unwrap()
158+
.iter()
159+
.any(|event| match event {
160+
CommandEvent::CommandStartedEvent(CommandStartedEvent { command_name, .. }) => {
161+
command_name == "killCursors"
162+
}
163+
_ => false,
164+
})
165+
}
166+
167+
#[test]
168+
#[function_name::named]
169+
fn kill_cursors_on_drop() {
170+
let _guard = LOCK.run_concurrently();
171+
172+
let db = CLIENT.database(function_name!());
173+
let coll = db.collection(function_name!());
174+
175+
drop_collection(&coll);
176+
177+
coll.insert_many(vec![doc! { "x": 1 }, doc! { "x": 2 }], None)
178+
.unwrap();
179+
180+
let event_client = EventClient::new();
181+
let coll = event_client
182+
.database(function_name!())
183+
.collection(function_name!());
184+
185+
let cursor = coll
186+
.find(None, FindOptions::builder().batch_size(1).build())
187+
.unwrap();
188+
189+
assert!(!kill_cursors_sent(&event_client));
190+
191+
std::mem::drop(cursor);
192+
193+
assert!(kill_cursors_sent(&event_client));
194+
}
195+
196+
#[test]
197+
#[function_name::named]
198+
fn no_kill_cursors_on_exhausted() {
199+
let _guard = LOCK.run_concurrently();
200+
201+
let db = CLIENT.database(function_name!());
202+
let coll = db.collection(function_name!());
203+
204+
drop_collection(&coll);
205+
206+
coll.insert_many(vec![doc! { "x": 1 }, doc! { "x": 2 }], None)
207+
.unwrap();
208+
209+
let event_client = EventClient::new();
210+
let coll = event_client
211+
.database(function_name!())
212+
.collection(function_name!());
213+
214+
let cursor = coll.find(None, FindOptions::builder().build()).unwrap();
215+
216+
assert!(!kill_cursors_sent(&event_client));
217+
218+
std::mem::drop(cursor);
219+
220+
assert!(!kill_cursors_sent(&event_client));
221+
}

0 commit comments

Comments
 (0)