Skip to content

Commit c4becf6

Browse files
committed
Read message with timeout backend support
1 parent 87f865d commit c4becf6

File tree

4 files changed

+51
-2
lines changed

4 files changed

+51
-2
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ byteorder = "0.3"
2929
log = "0.3"
3030
phf = "0.7"
3131
rustc-serialize = "0.3"
32+
net2 = { version = "0.2", features = ["nightly"] }
3233
chrono = { version = "0.2.14", optional = true }
3334
openssl = { version = "0.6.4", optional = true }
3435
serde = { version = "0.3", optional = true }
3536
time = { version = "0.1.14", optional = true }
36-
unix_socket = { version = ">= 0.3, < 0.5", optional = true }
37+
unix_socket = { version = ">= 0.3, < 0.5", optional = true, features = ["socket_timeout"] }
3738
uuid = { version = "0.1", optional = true }
3839

3940
[dev-dependencies]

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ extern crate phf;
5252
extern crate rustc_serialize as serialize;
5353
#[cfg(feature = "unix_socket")]
5454
extern crate unix_socket;
55+
extern crate net2;
5556

5657
use bufstream::BufStream;
5758
use md5::Md5;

src/message.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::io;
22
use std::io::prelude::*;
33
use std::mem;
4+
use std::time::Duration;
45
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
56

67
use types::Oid;
78
use util;
9+
use priv_io::ReadTimeout;
810

911
use self::BackendMessage::*;
1012
use self::FrontendMessage::*;
@@ -282,12 +284,39 @@ impl<R: BufRead> ReadCStr for R {
282284
#[doc(hidden)]
283285
pub trait ReadMessage {
284286
fn read_message(&mut self) -> io::Result<BackendMessage>;
287+
288+
fn read_message_timeout(&mut self, timeout: Duration)
289+
-> io::Result<Option<BackendMessage>>;
290+
291+
fn finish_read_message(&mut self, ident: u8) -> io::Result<BackendMessage>;
285292
}
286293

287-
impl<R: BufRead> ReadMessage for R {
294+
impl<R: BufRead + ReadTimeout> ReadMessage for R {
288295
fn read_message(&mut self) -> io::Result<BackendMessage> {
289296
let ident = try!(self.read_u8());
297+
self.finish_read_message(ident)
298+
}
299+
300+
fn read_message_timeout(&mut self, timeout: Duration)
301+
-> io::Result<Option<BackendMessage>> {
302+
try!(self.set_read_timeout(Some(timeout)));
303+
let ident = self.read_u8();
304+
try!(self.set_read_timeout(None));
305+
306+
match ident {
307+
Ok(ident) => self.finish_read_message(ident).map(Some),
308+
Err(e) => {
309+
let e: io::Error = e.into();
310+
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut {
311+
Ok(None)
312+
} else {
313+
Err(e)
314+
}
315+
}
316+
}
317+
}
290318

319+
fn finish_read_message(&mut self, ident: u8) -> io::Result<BackendMessage> {
291320
// subtract size of length value
292321
let len = try!(self.read_u32::<BigEndian>()) - mem::size_of::<u32>() as u32;
293322
let mut rdr = self.by_ref().take(len as u64);

src/priv_io.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use byteorder::ReadBytesExt;
2+
use net2::TcpStreamExt;
23
use std::io;
34
use std::io::prelude::*;
45
use std::net::TcpStream;
6+
use std::time::Duration;
7+
use bufstream::BufStream;
58
#[cfg(feature = "unix_socket")]
69
use unix_socket::UnixStream;
710
#[cfg(unix)]
@@ -17,6 +20,21 @@ use message::FrontendMessage::SslRequest;
1720

1821
const DEFAULT_PORT: u16 = 5432;
1922

23+
#[doc(hidden)]
24+
pub trait ReadTimeout {
25+
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()>;
26+
}
27+
28+
impl ReadTimeout for BufStream<Box<StreamWrapper>> {
29+
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
30+
match self.get_ref().get_ref().0 {
31+
InternalStream::Tcp(ref s) => <TcpStream as TcpStreamExt>::set_read_timeout(s, timeout),
32+
#[cfg(feature = "unix_socket")]
33+
InternalStream::Unix(ref s) => s.set_read_timeout(timeout),
34+
}
35+
}
36+
}
37+
2038
/// A connection to the Postgres server.
2139
///
2240
/// It implements `Read`, `Write` and `StreamWrapper`, as well as `AsRawFd` on

0 commit comments

Comments
 (0)