Skip to content

Commit c21fc29

Browse files
committed
Provide more information for copy in readers
1 parent 6c0a7c3 commit c21fc29

File tree

2 files changed

+75
-55
lines changed

2 files changed

+75
-55
lines changed

src/stmt.rs

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::fmt;
77
use std::io::{self, Cursor, BufRead, Read};
88

99
use error::{Error, DbError};
10-
use types::{ReadWithInfo, SessionInfo, Type, ToSql, IsNull};
10+
use types::{SessionInfo, Type, ToSql, IsNull};
1111
use message::FrontendMessage::*;
1212
use message::BackendMessage::*;
1313
use message::WriteMessage;
@@ -309,8 +309,8 @@ impl<'conn> Statement<'conn> {
309309
try!(self.inner_execute("", 0, params));
310310
let mut conn = self.conn.conn.borrow_mut();
311311

312-
match try!(conn.read_message()) {
313-
CopyInResponse { .. } => {}
312+
let (format, column_formats) = match try!(conn.read_message()) {
313+
CopyInResponse { format, column_formats } => (format, column_formats),
314314
_ => {
315315
loop {
316316
match try!(conn.read_message()) {
@@ -323,53 +323,59 @@ impl<'conn> Statement<'conn> {
323323
}
324324
}
325325
}
326-
}
326+
};
327+
328+
let mut info = CopyInfo {
329+
conn: conn,
330+
format: Format::from_u16(format as u16),
331+
column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(),
332+
};
327333

328334
let mut buf = [0; 16 * 1024];
329335
loop {
330-
match fill_copy_buf(&mut buf, r, &SessionInfo::new(&conn)) {
336+
match fill_copy_buf(&mut buf, r, &info) {
331337
Ok(0) => break,
332338
Ok(len) => {
333-
try_desync!(conn, conn.stream.write_message(
339+
try_desync!(info.conn, info.conn.stream.write_message(
334340
&CopyData {
335341
data: &buf[..len],
336342
}));
337343
}
338344
Err(err) => {
339-
try!(conn.write_messages(&[
345+
try!(info.conn.write_messages(&[
340346
CopyFail {
341347
message: "",
342348
},
343349
CopyDone,
344350
Sync]));
345-
match try!(conn.read_message()) {
351+
match try!(info.conn.read_message()) {
346352
ErrorResponse { .. } => { /* expected from the CopyFail */ }
347353
_ => {
348-
conn.desynchronized = true;
354+
info.conn.desynchronized = true;
349355
return Err(Error::IoError(bad_response()));
350356
}
351357
}
352-
try!(conn.wait_for_ready());
358+
try!(info.conn.wait_for_ready());
353359
return Err(Error::IoError(err));
354360
}
355361
}
356362
}
357363

358-
try!(conn.write_messages(&[CopyDone, Sync]));
364+
try!(info.conn.write_messages(&[CopyDone, Sync]));
359365

360-
let num = match try!(conn.read_message()) {
366+
let num = match try!(info.conn.read_message()) {
361367
CommandComplete { tag } => util::parse_update_count(tag),
362368
ErrorResponse { fields } => {
363-
try!(conn.wait_for_ready());
369+
try!(info.conn.wait_for_ready());
364370
return DbError::new(fields);
365371
}
366372
_ => {
367-
conn.desynchronized = true;
373+
info.conn.desynchronized = true;
368374
return Err(Error::IoError(bad_response()));
369375
}
370376
};
371377

372-
try!(conn.wait_for_ready());
378+
try!(info.conn.wait_for_ready());
373379
Ok(num)
374380
}
375381

@@ -443,9 +449,11 @@ impl<'conn> Statement<'conn> {
443449
};
444450

445451
Ok(CopyOutReader {
446-
conn: conn,
447-
format: Format::from_u16(format as u16),
448-
column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(),
452+
info: CopyInfo {
453+
conn: conn,
454+
format: Format::from_u16(format as u16),
455+
column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(),
456+
},
449457
buf: Cursor::new(vec![]),
450458
finished: false,
451459
})
@@ -463,7 +471,7 @@ impl<'conn> Statement<'conn> {
463471
}
464472
}
465473

466-
fn fill_copy_buf<R: ReadWithInfo>(buf: &mut [u8], r: &mut R, info: &SessionInfo)
474+
fn fill_copy_buf<R: ReadWithInfo>(buf: &mut [u8], r: &mut R, info: &CopyInfo)
467475
-> io::Result<usize> {
468476
let mut nread = 0;
469477
while nread < buf.len() {
@@ -493,6 +501,44 @@ impl ColumnNew for Column {
493501
}
494502
}
495503

504+
/// A struct containing information relevant for a `COPY` operation.
505+
pub struct CopyInfo<'a> {
506+
conn: RefMut<'a, InnerConnection>,
507+
format: Format,
508+
column_formats: Vec<Format>,
509+
}
510+
511+
impl<'a> CopyInfo<'a> {
512+
/// Returns the format of the overall data.
513+
pub fn format(&self) -> Format {
514+
self.format
515+
}
516+
517+
/// Returns the format of the individual columns.
518+
pub fn column_formats(&self) -> &[Format] {
519+
&self.column_formats
520+
}
521+
522+
/// Returns session info for the associated connection.
523+
pub fn session_info<'b>(&'b self) -> SessionInfo<'b> {
524+
SessionInfo::new(&*self.conn)
525+
}
526+
}
527+
528+
/// Like `Read` except that a `CopyInfo` object is provided as well.
529+
///
530+
/// All types that implement `Read` also implement this trait.
531+
pub trait ReadWithInfo {
532+
/// Like `Read::read`.
533+
fn read_with_info(&mut self, buf: &mut [u8], info: &CopyInfo) -> io::Result<usize>;
534+
}
535+
536+
impl<R: Read> ReadWithInfo for R {
537+
fn read_with_info(&mut self, buf: &mut [u8], _: &CopyInfo) -> io::Result<usize> {
538+
self.read(buf)
539+
}
540+
}
541+
496542
impl Column {
497543
/// The name of the column.
498544
pub fn name(&self) -> &str {
@@ -530,9 +576,7 @@ impl Format {
530576
/// The underlying connection may not be used while a `CopyOutReader` exists.
531577
/// Any attempt to do so will panic.
532578
pub struct CopyOutReader<'a> {
533-
conn: RefMut<'a, InnerConnection>,
534-
format: Format,
535-
column_formats: Vec<Format>,
579+
info: CopyInfo<'a>,
536580
buf: Cursor<Vec<u8>>,
537581
finished: bool,
538582
}
@@ -544,19 +588,9 @@ impl<'a> Drop for CopyOutReader<'a> {
544588
}
545589

546590
impl<'a> CopyOutReader<'a> {
547-
/// Returns the format of the overall data.
548-
pub fn format(&self) -> Format {
549-
self.format
550-
}
551-
552-
/// Returns the format of the individual columns.
553-
pub fn column_formats(&self) -> &[Format] {
554-
&self.column_formats
555-
}
556-
557-
/// Returns session info for the associated connection.
558-
pub fn session_info<'b>(&'b self) -> SessionInfo<'b> {
559-
SessionInfo::new(&*self.conn)
591+
/// Returns the `CopyInfo` for the current operation.
592+
pub fn info(&self) -> &CopyInfo {
593+
&self.info
560594
}
561595

562596
/// Consumes the `CopyOutReader`, throwing away any unread data.
@@ -581,26 +615,26 @@ impl<'a> CopyOutReader<'a> {
581615
return Ok(());
582616
}
583617

584-
match try!(self.conn.read_message()) {
618+
match try!(self.info.conn.read_message()) {
585619
BCopyData { data } => self.buf = Cursor::new(data),
586620
BCopyDone => {
587621
self.finished = true;
588-
match try!(self.conn.read_message()) {
622+
match try!(self.info.conn.read_message()) {
589623
CommandComplete { .. } => {}
590624
_ => {
591-
self.conn.desynchronized = true;
625+
self.info.conn.desynchronized = true;
592626
return Err(Error::IoError(bad_response()));
593627
}
594628
}
595-
try!(self.conn.wait_for_ready());
629+
try!(self.info.conn.wait_for_ready());
596630
}
597631
ErrorResponse { fields } => {
598632
self.finished = true;
599-
try!(self.conn.wait_for_ready());
633+
try!(self.info.conn.wait_for_ready());
600634
return DbError::new(fields);
601635
}
602636
_ => {
603-
self.conn.desynchronized = true;
637+
self.info.conn.desynchronized = true;
604638
return Err(Error::IoError(bad_response()));
605639
}
606640
}

src/types/mod.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,6 @@ impl<'a> SessionInfo<'a> {
7676
}
7777
}
7878

79-
/// Like `Read` except that a `SessionInfo` object is provided as well.
80-
///
81-
/// All types that implement `Read` also implement this trait.
82-
pub trait ReadWithInfo {
83-
/// Like `Read::read`.
84-
fn read_with_info(&mut self, buf: &mut [u8], info: &SessionInfo) -> io::Result<usize>;
85-
}
86-
87-
impl<R: Read> ReadWithInfo for R {
88-
fn read_with_info(&mut self, buf: &mut [u8], _: &SessionInfo) -> io::Result<usize> {
89-
self.read(buf)
90-
}
91-
}
92-
9379
/// A Postgres OID.
9480
pub type Oid = u32;
9581

0 commit comments

Comments
 (0)