@@ -73,7 +73,7 @@ use std::path::PathBuf;
73
73
use error:: { Error , ConnectError , SqlState , DbError } ;
74
74
use types:: { ToSql , FromSql } ;
75
75
use io:: { StreamWrapper , NegotiateSsl } ;
76
- use types:: { IsNull , Kind , Type , SessionInfo , Oid , Other } ;
76
+ use types:: { IsNull , Kind , Type , SessionInfo , Oid , Other , ReadWithInfo } ;
77
77
use message:: BackendMessage :: * ;
78
78
use message:: FrontendMessage :: * ;
79
79
use message:: { FrontendMessage , BackendMessage , RowDescriptionEntry } ;
@@ -1530,9 +1530,10 @@ impl<'conn> Statement<'conn> {
1530
1530
/// Executes a `COPY FROM STDIN` statement, returning the number of rows
1531
1531
/// added.
1532
1532
///
1533
- /// The contents of the provided `Read`er are passed to the Postgres server
1534
- /// verbatim; it is the caller's responsibility to ensure the data is in
1535
- /// the proper format. See the [Postgres documentation](http://www.postgresql.org/docs/9.4/static/sql-copy.html)
1533
+ /// The data read out of the provided `Read`er are passed to the Postgres
1534
+ /// server verbatim; it is the caller's responsibility to ensure the data
1535
+ /// is in the proper format. See the
1536
+ /// [Postgres documentation](http://www.postgresql.org/docs/9.4/static/sql-copy.html)
1536
1537
/// for details.
1537
1538
///
1538
1539
/// If the statement is not a `COPY FROM STDIN` statement, it will still be
@@ -1547,7 +1548,7 @@ impl<'conn> Statement<'conn> {
1547
1548
/// let stmt = conn.prepare("COPY people FROM STDIN").unwrap();
1548
1549
/// stmt.copy_in(&[], &mut "1\tjohn\n2\tjane\n".as_bytes()).unwrap();
1549
1550
/// ```
1550
- pub fn copy_in < R : Read > ( & self , params : & [ & ToSql ] , r : & mut R ) -> Result < u64 > {
1551
+ pub fn copy_in < R : ReadWithInfo > ( & self , params : & [ & ToSql ] , r : & mut R ) -> Result < u64 > {
1551
1552
try!( self . inner_execute ( "" , 0 , params) ) ;
1552
1553
let mut conn = self . conn . conn . borrow_mut ( ) ;
1553
1554
@@ -1567,16 +1568,15 @@ impl<'conn> Statement<'conn> {
1567
1568
}
1568
1569
}
1569
1570
1570
- let mut buf = vec ! [ ] ;
1571
+ let mut buf = [ 0 ; 16 * 1024 ] ;
1571
1572
loop {
1572
- match r . take ( 16 * 1024 ) . read_to_end ( & mut buf) {
1573
+ match fill_copy_buf ( & mut buf, r , & SessionInfo :: new ( & conn ) ) {
1573
1574
Ok ( 0 ) => break ,
1574
- Ok ( _ ) => {
1575
+ Ok ( len ) => {
1575
1576
try_desync ! ( conn, conn. stream. write_message(
1576
1577
& CopyData {
1577
- data: & buf,
1578
+ data: & buf[ ..len ] ,
1578
1579
} ) ) ;
1579
- buf. clear ( ) ;
1580
1580
}
1581
1581
Err ( err) => {
1582
1582
try!( conn. write_messages ( & [
@@ -1628,6 +1628,20 @@ impl<'conn> Statement<'conn> {
1628
1628
}
1629
1629
}
1630
1630
1631
+ fn fill_copy_buf < R : ReadWithInfo > ( buf : & mut [ u8 ] , r : & mut R , info : & SessionInfo )
1632
+ -> std_io:: Result < usize > {
1633
+ let mut nread = 0 ;
1634
+ while nread < buf. len ( ) {
1635
+ match r. read_with_info ( & mut buf[ nread..] , info) {
1636
+ Ok ( 0 ) => break ,
1637
+ Ok ( n) => nread += n,
1638
+ Err ( ref e) if e. kind ( ) == std_io:: ErrorKind :: Interrupted => { }
1639
+ Err ( e) => return Err ( e) ,
1640
+ }
1641
+ }
1642
+ Ok ( nread)
1643
+ }
1644
+
1631
1645
/// Information about a column of the result of a query.
1632
1646
#[ derive( PartialEq , Eq , Clone , Debug ) ]
1633
1647
pub struct Column {
0 commit comments