Skip to content

Commit 522ea10

Browse files
committed
Automatically close dead statements
1 parent 29210bb commit 522ea10

File tree

2 files changed

+49
-28
lines changed

2 files changed

+49
-28
lines changed

postgres-tokio/src/lib.rs

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use postgres_shared::RowData;
1616
use std::collections::HashMap;
1717
use std::fmt;
1818
use std::io;
19+
use std::mem;
20+
use std::sync::mpsc::{self, Sender, Receiver};
1921
use tokio_core::reactor::Handle;
2022

2123
#[doc(inline)]
@@ -40,6 +42,8 @@ pub struct CancelData {
4042

4143
struct InnerConnection {
4244
stream: PostgresStream,
45+
close_receiver: Receiver<(u8, String)>,
46+
close_sender: Sender<(u8, String)>,
4347
parameters: HashMap<String, String>,
4448
cancel_data: CancelData,
4549
next_stmt_id: u32,
@@ -122,8 +126,11 @@ impl Connection {
122126
stream::connect(params.host(), params.port(), handle)
123127
.map_err(ConnectError::Io)
124128
.map(|s| {
129+
let (sender, receiver) = mpsc::channel();
125130
Connection(InnerConnection {
126131
stream: s,
132+
close_sender: sender,
133+
close_receiver: receiver,
127134
parameters: HashMap::new(),
128135
cancel_data: CancelData {
129136
process_id: 0,
@@ -321,6 +328,7 @@ impl Connection {
321328
_ => Err(bad_message())
322329
}
323330
})
331+
.and_then(|(t, s)| s.close_gc().map(|s| (t, s)))
324332
.boxed()
325333
}
326334

@@ -336,7 +344,9 @@ impl Connection {
336344
}
337345

338346
pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, Error> {
339-
self.simple_query(query).map(|r| r.1).boxed()
347+
self.simple_query(query)
348+
.map(|r| r.1)
349+
.boxed()
340350
}
341351

342352
fn raw_prepare(self,
@@ -346,9 +356,9 @@ impl Connection {
346356
let mut parse = vec![];
347357
let mut describe = vec![];
348358
let mut sync = vec![];
359+
frontend::sync(&mut sync);
349360
frontend::parse(name, query, None, &mut parse)
350361
.and_then(|()| frontend::describe(b'S', name, &mut describe))
351-
.and_then(|()| Ok(frontend::sync(&mut sync)))
352362
.into_future()
353363
.and_then(move |()| {
354364
let it = Some(parse).into_iter()
@@ -457,6 +467,7 @@ impl Connection {
457467
let mut bind = vec![];
458468
let mut execute = vec![];
459469
let mut sync = vec![];
470+
frontend::sync(&mut sync);
460471
let r = frontend::bind(portal,
461472
stmt,
462473
Some(1),
@@ -482,10 +493,6 @@ impl Connection {
482493
.map(|()| s)
483494
.map_err(Error::Io)
484495
})
485-
.map(|s| {
486-
frontend::sync(&mut sync);
487-
s
488-
})
489496
.into_future()
490497
.and_then(|s| {
491498
let it = Some(bind).into_iter()
@@ -546,6 +553,7 @@ impl Connection {
546553
self.raw_prepare(&name, query)
547554
.map(|(params, columns, conn)| {
548555
let stmt = Statement {
556+
close_sender: conn.0.close_sender.clone(),
549557
name: name,
550558
params: params,
551559
columns: columns,
@@ -555,29 +563,41 @@ impl Connection {
555563
.boxed()
556564
}
557565

558-
fn raw_close(self, type_: u8, name: &str) -> BoxFuture<Connection, Error> {
559-
let mut close = vec![];
560-
let mut sync = vec![];
561-
frontend::close(type_, name, &mut close)
562-
.map(|()| frontend::sync(&mut sync))
563-
.into_future()
564-
.and_then(move |()| {
565-
let it = Some(close).into_iter().chain(Some(sync)).map(Ok::<_, io::Error>);
566-
self.0.send_all(futures::stream::iter(it))
567-
})
568-
.and_then(|s| s.0.read())
566+
fn close_gc(self) -> BoxFuture<Connection, Error> {
567+
let mut messages = vec![];
568+
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
569+
let mut buf = vec![];
570+
frontend::close(type_, &name, &mut buf).unwrap(); // this can only fail on bad names
571+
messages.push(buf);
572+
}
573+
if messages.is_empty() {
574+
return Ok(self).into_future().boxed();
575+
}
576+
577+
let mut buf = vec![];
578+
frontend::sync(&mut buf);
579+
messages.push(buf);
580+
self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
581+
.map_err(Error::Io)
582+
.and_then(|s| Connection(s.0).finish_close_gc())
583+
.boxed()
584+
}
585+
586+
fn finish_close_gc(self) -> BoxFuture<Connection, Error> {
587+
self.0.read()
569588
.map_err(Error::Io)
570589
.and_then(|(m, s)| {
571590
match m {
572-
backend::Message::CloseComplete => Either::A(Ok(Connection(s)).into_future()),
591+
backend::Message::ReadyForQuery(_) => {
592+
Either::A(Ok(Connection(s)).into_future())
593+
}
594+
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
573595
backend::Message::ErrorResponse(body) => {
574596
Either::B(Connection(s).ready_err(body))
575597
}
576598
_ => Either::A(Err(bad_message()).into_future()),
577599
}
578600
})
579-
.and_then(|s| s.ready(()))
580-
.map(|((), s)| s)
581601
.boxed()
582602
}
583603

@@ -601,11 +621,19 @@ struct Column {
601621
}
602622

603623
pub struct Statement {
624+
close_sender: Sender<(u8, String)>,
604625
name: String,
605626
params: Vec<Type>,
606627
columns: Vec<Column>,
607628
}
608629

630+
impl Drop for Statement {
631+
fn drop(&mut self) {
632+
let name = mem::replace(&mut self.name, String::new());
633+
let _ = self.close_sender.send((b'S', name));
634+
}
635+
}
636+
609637
impl Statement {
610638
pub fn execute(self,
611639
params: &[&ToSql],
@@ -616,10 +644,6 @@ impl Statement {
616644
.map(|(n, conn)| (n, self, conn))
617645
.boxed()
618646
}
619-
620-
pub fn close(self, conn: Connection) -> BoxFuture<Connection, Error> {
621-
conn.raw_close(b'S', &self.name)
622-
}
623647
}
624648

625649
fn connect_err(fields: &mut ErrorFields) -> ConnectError {

postgres-tokio/src/test.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,8 @@ fn prepare_execute() {
113113
c.unwrap().prepare("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)")
114114
})
115115
.and_then(|(s, c)| s.execute(&[], c))
116-
.and_then(|(n, s, c)| {
116+
.and_then(|(n, _, c)| {
117117
assert_eq!(0, n);
118-
s.close(c)
119-
})
120-
.and_then(|c| {
121118
c.prepare("INSERT INTO foo (name) VALUES ($1), ($2)")
122119
})
123120
.and_then(|(s, c)| s.execute(&[&"steven", &"bob"], c))

0 commit comments

Comments
 (0)