Skip to content
This repository was archived by the owner on Oct 18, 2021. It is now read-only.

Commit 1aeebaf

Browse files
committed
Merge pull request #120 from ayosec/buffering
Buffered Connections
2 parents a10ad9d + b69a4a8 commit 1aeebaf

File tree

7 files changed

+40
-30
lines changed

7 files changed

+40
-30
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ scan_fmt = "0.1.0"
2323
separator = "0.3.0"
2424
textnonce = "0.1.1"
2525
time = "0.1"
26+
bufstream = "0.1.1"
2627

2728
[dev-dependencies]
2829
nalgebra = "0.2"

src/connstring.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub fn parse(address: &str) -> Result<ConnectionString> {
105105
// Remove scheme
106106
let addr = &address[URI_SCHEME.len()..];
107107

108-
let mut hosts: Vec<Host>;
108+
let hosts: Vec<Host>;
109109
let mut user: Option<String> = None;
110110
let mut password: Option<String> = None;
111111
let mut database: Option<String> = Some("test".to_owned());

src/cursor.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,15 @@ impl Cursor {
250250
is_cmd_cursor: bool,
251251
read_pref: Option<ReadPreference>) -> Result<Cursor> {
252252

253+
let mut stream = stream;
253254
let mut socket = stream.get_socket();
254255
let req_id = client.get_req_id();
255256

256257
let index = namespace.find(".").unwrap_or(namespace.len());
257258
let db_name = namespace[..index].to_owned();
258259
let coll_name = namespace[index + 1..].to_owned();
259260
let cmd_name = cmd_type.to_str();
260-
let connstring = format!("{}", try!(socket.peer_addr()));
261+
let connstring = format!("{}", try!(socket.get_ref().peer_addr()));
261262

262263
let filter : bson::Document = match query.get("$query") {
263264
Some(&Bson::Document(ref doc)) => doc.clone(),
@@ -310,8 +311,8 @@ impl Cursor {
310311
}
311312
}
312313

313-
try_or_emit!(cmd_type, cmd_name, req_id, connstring, message.write(&mut socket), client);
314-
let reply = try_or_emit!(cmd_type, cmd_name, req_id, connstring, Message::read(&mut socket),
314+
try_or_emit!(cmd_type, cmd_name, req_id, connstring, message.write(socket), client);
315+
let reply = try_or_emit!(cmd_type, cmd_name, req_id, connstring, Message::read(socket),
315316
client);
316317

317318
let fin_time = time::precise_time_ns();
@@ -360,7 +361,7 @@ impl Cursor {
360361
}
361362

362363
fn get_from_stream(&mut self) -> Result<()> {
363-
let (stream, _, _) = try!(self.client.acquire_stream(self.read_preference.to_owned()));
364+
let (mut stream, _, _) = try!(self.client.acquire_stream(self.read_preference.to_owned()));
364365
let mut socket = stream.get_socket();
365366

366367
let req_id = self.client.get_req_id();
@@ -370,7 +371,7 @@ impl Cursor {
370371
let index = self.namespace.rfind(".").unwrap_or(self.namespace.len());
371372
let db_name = self.namespace[..index].to_owned();
372373
let cmd_name = "get_more".to_owned();
373-
let connstring = format!("{}", try!(socket.peer_addr()));
374+
let connstring = format!("{}", try!(socket.get_ref().peer_addr()));
374375

375376
if self.cmd_type != CommandType::Suppressed {
376377
let hook_result = self.client.run_start_hooks(&CommandStarted {
@@ -386,8 +387,8 @@ impl Cursor {
386387
}
387388
}
388389

389-
try_or_emit!(self.cmd_type, cmd_name, req_id, connstring, get_more.write(&mut socket), self.client);
390-
let reply = try!(Message::read(&mut socket));
390+
try_or_emit!(self.cmd_type, cmd_name, req_id, connstring, get_more.write(socket.get_mut()), self.client);
391+
let reply = try!(Message::read(socket.get_mut()));
391392

392393
let (_, v, _) = try!(Cursor::get_bson_and_cid_from_message(reply));
393394
self.buffer.extend(v);

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ extern crate scan_fmt;
101101
extern crate separator;
102102
extern crate textnonce;
103103
extern crate time;
104+
extern crate bufstream;
104105

105106
pub mod db;
106107
pub mod coll;

src/pool.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use Result;
44

55
use connstring::Host;
66

7+
use bufstream::BufStream;
78
use std::net::TcpStream;
89
use std::sync::{Arc, Condvar, Mutex};
910
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
@@ -28,7 +29,7 @@ struct Pool {
2829
// The current number of open connections.
2930
pub len: Arc<AtomicUsize>,
3031
// The idle socket pool.
31-
sockets: Vec<TcpStream>,
32+
sockets: Vec<BufStream<TcpStream>>,
3233
// The pool iteration. When a server monitor fails to execute ismaster,
3334
// the connection pool is cleared and the iteration is incremented.
3435
iteration: usize,
@@ -39,7 +40,7 @@ struct Pool {
3940
pub struct PooledStream {
4041
// This socket option will always be Some(stream) until it is
4142
// returned to the pool using take().
42-
socket: Option<TcpStream>,
43+
socket: Option<BufStream<TcpStream>>,
4344
// A reference to the pool that the stream was taken from.
4445
pool: Arc<Mutex<Pool>>,
4546
// A reference to the waiting condvar associated with the pool.
@@ -50,8 +51,8 @@ pub struct PooledStream {
5051

5152
impl PooledStream {
5253
/// Returns a reference to the socket.
53-
pub fn get_socket<'a>(&'a self) -> &'a TcpStream {
54-
self.socket.as_ref().unwrap()
54+
pub fn get_socket<'a>(&'a mut self) -> &'a mut BufStream<TcpStream> {
55+
self.socket.as_mut().unwrap()
5556
}
5657
}
5758

@@ -150,10 +151,10 @@ impl ConnectionPool {
150151
}
151152

152153
// Connects to a MongoDB server as defined by the initial configuration.
153-
fn connect(&self) -> Result<TcpStream> {
154+
fn connect(&self) -> Result<BufStream<TcpStream>> {
154155
let ref host_name = self.host.host_name;
155156
let port = self.host.port;
156-
let stream = try!(TcpStream::connect((&host_name[..], port)));
157+
let stream = BufStream::new(try!(TcpStream::connect((&host_name[..], port))));
157158
Ok(stream)
158159
}
159160
}

src/wire_protocol/operations.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ impl Message {
229229
/// # Return value
230230
///
231231
/// Returns nothing on success, or an Error on failure.
232-
fn write_bson_document(buffer: &mut Write,
233-
bson: &bson::Document) -> Result<()>{
232+
fn write_bson_document<W: Write>(buffer: &mut W,
233+
bson: &bson::Document) -> Result<()>{
234234
let mut temp_buffer = vec![];
235235

236236
try!(bson::encode_document(&mut temp_buffer, bson));
@@ -253,9 +253,9 @@ impl Message {
253253
/// # Return value
254254
///
255255
/// Returns nothing on success, or an Error on failure.
256-
pub fn write_update(buffer: &mut Write, header: &Header, namespace: &str,
257-
flags: &OpUpdateFlags, selector: &bson::Document,
258-
update: &bson::Document) -> Result<()> {
256+
pub fn write_update<W: Write>(buffer: &mut W, header: &Header, namespace: &str,
257+
flags: &OpUpdateFlags, selector: &bson::Document,
258+
update: &bson::Document) -> Result<()> {
259259

260260
try!(header.write(buffer));
261261

@@ -292,8 +292,8 @@ impl Message {
292292
/// # Return value
293293
///
294294
/// Returns nothing on success, or an Error on failure.
295-
fn write_insert(buffer: &mut Write, header: &Header, flags: &OpInsertFlags,
296-
namespace: &str, documents: &[bson::Document]) -> Result<()> {
295+
fn write_insert<W: Write>(buffer: &mut W, header: &Header, flags: &OpInsertFlags,
296+
namespace: &str, documents: &[bson::Document]) -> Result<()> {
297297

298298
try!(header.write(buffer));
299299
try!(buffer.write_i32::<LittleEndian>(flags.to_i32()));
@@ -334,11 +334,11 @@ impl Message {
334334
/// # Return value
335335
///
336336
/// Returns nothing on success, or an Error on failure.
337-
fn write_query(buffer: &mut Write, header: &Header,
338-
flags: &OpQueryFlags, namespace: &str,
339-
number_to_skip: i32, number_to_return: i32,
340-
query: &bson::Document,
341-
return_field_selector: &Option<bson::Document>) -> Result<()> {
337+
fn write_query<W: Write>(buffer: &mut W, header: &Header,
338+
flags: &OpQueryFlags, namespace: &str,
339+
number_to_skip: i32, number_to_return: i32,
340+
query: &bson::Document,
341+
return_field_selector: &Option<bson::Document>) -> Result<()> {
342342

343343
try!(header.write(buffer));
344344
try!(buffer.write_i32::<LittleEndian>(flags.to_i32()));
@@ -378,8 +378,8 @@ impl Message {
378378
/// # Return value
379379
///
380380
/// Returns nothing on success, or an Error on failure.
381-
pub fn write_get_more(buffer: &mut Write, header: &Header, namespace: &str,
382-
number_to_return: i32, cursor_id: i64) -> Result<()> {
381+
pub fn write_get_more<W: Write>(buffer: &mut W, header: &Header, namespace: &str,
382+
number_to_return: i32, cursor_id: i64) -> Result<()> {
383383

384384
try!(header.write(buffer));
385385

@@ -409,7 +409,7 @@ impl Message {
409409
/// # Return value
410410
///
411411
/// Returns nothing on success, or an error string on failure.
412-
pub fn write(&self, buffer: &mut Write) -> Result<()> {
412+
pub fn write<W: Write>(&self, buffer: &mut W) -> Result<()> {
413413
match self {
414414
/// Only the server should send replies
415415
&Message::OpReply {..} =>
@@ -445,7 +445,7 @@ impl Message {
445445
/// # Return value
446446
///
447447
/// Returns the reply message on success, or an Error on failure.
448-
fn read_reply(buffer: &mut Read, header: Header) -> Result<Message> {
448+
fn read_reply<R: Read>(buffer: &mut R, header: Header) -> Result<Message> {
449449
let mut length = header.message_length - mem::size_of::<Header>() as i32;
450450

451451
// Read flags

0 commit comments

Comments
 (0)