Skip to content

Commit dc2f04d

Browse files
committed
Start work on COPY TO statements
1 parent 0d5f254 commit dc2f04d

File tree

3 files changed

+66
-0
lines changed

3 files changed

+66
-0
lines changed

src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,6 +1450,20 @@ impl<'conn> Statement<'conn> {
14501450
},
14511451
Sync]));
14521452
}
1453+
CopyOutResponse { .. } => {
1454+
loop {
1455+
match try!(conn.read_message()) {
1456+
BCopyDone => break,
1457+
ErrorResponse { fields } => {
1458+
try!(conn.wait_for_ready());
1459+
return DbError::new(fields);
1460+
}
1461+
_ => {}
1462+
}
1463+
}
1464+
num = 0;
1465+
break;
1466+
}
14531467
_ => {
14541468
conn.desynchronized = true;
14551469
return Err(Error::IoError(bad_response()));
@@ -1690,6 +1704,17 @@ fn read_rows(conn: &mut InnerConnection, buf: &mut VecDeque<Vec<Option<Vec<u8>>>
16901704
},
16911705
Sync]));
16921706
}
1707+
CopyOutResponse { .. } => {
1708+
loop {
1709+
match try!(conn.read_message()) {
1710+
ReadyForQuery { .. } => break,
1711+
_ => {}
1712+
}
1713+
}
1714+
return Err(Error::IoError(std_io::Error::new(
1715+
std_io::ErrorKind::InvalidInput,
1716+
"COPY queries cannot be directly executed")));
1717+
}
16931718
_ => {
16941719
conn.desynchronized = true;
16951720
return Err(Error::IoError(bad_response()));

src/message.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,19 @@ pub enum BackendMessage {
3232
CommandComplete {
3333
tag: String,
3434
},
35+
// FIXME naming
36+
BCopyData {
37+
data: Vec<u8>,
38+
},
39+
BCopyDone,
3540
CopyInResponse {
3641
format: u8,
3742
column_formats: Vec<u16>,
3843
},
44+
CopyOutResponse {
45+
format: u8,
46+
column_formats: Vec<u16>,
47+
},
3948
DataRow {
4049
row: Vec<Option<Vec<u8>>>
4150
},
@@ -292,7 +301,15 @@ impl<R: BufRead> ReadMessage for R {
292301
channel: try!(rdr.read_cstr()),
293302
payload: try!(rdr.read_cstr())
294303
},
304+
b'c' => BCopyDone,
295305
b'C' => CommandComplete { tag: try!(rdr.read_cstr()) },
306+
b'd' => {
307+
let mut data = vec![];
308+
try!(rdr.read_to_end(&mut data));
309+
BCopyData {
310+
data: data,
311+
}
312+
}
296313
b'D' => try!(read_data_row(&mut rdr)),
297314
b'E' => ErrorResponse { fields: try!(read_fields(&mut rdr)) },
298315
b'G' => {
@@ -306,6 +323,17 @@ impl<R: BufRead> ReadMessage for R {
306323
column_formats: column_formats,
307324
}
308325
}
326+
b'H' => {
327+
let format = try!(rdr.read_u8());
328+
let mut column_formats = vec![];
329+
for _ in 0..try!(rdr.read_u16::<BigEndian>()) {
330+
column_formats.push(try!(rdr.read_u16::<BigEndian>()));
331+
}
332+
CopyOutResponse {
333+
format: format,
334+
column_formats: column_formats,
335+
}
336+
}
309337
b'I' => EmptyQueryResponse,
310338
b'K' => BackendKeyData {
311339
process_id: try!(rdr.read_u32::<BigEndian>()),

tests/test.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,19 @@ fn test_copy() {
756756
stmt.query(&[]).unwrap().iter().map(|r| r.get(0)).collect::<Vec<i32>>());
757757
}
758758

759+
#[test]
760+
fn test_copy_out_query() {
761+
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
762+
or_panic!(conn.batch_execute("
763+
CREATE TEMPORARY TABLE foo (id INT);
764+
INSERT INTO foo (id) VALUES (0), (1), (2), (3)"));
765+
let stmt = or_panic!(conn.prepare("COPY foo (id) TO STDOUT"));
766+
match stmt.query(&[]) {
767+
Ok(_) => panic!("unexpected success"),
768+
Err(Error::IoError(ref e)) if e.to_string().contains("COPY") => {}
769+
Err(e) => panic!("unexpected error {:?}", e),
770+
}
771+
}
759772

760773
#[test]
761774
// Just make sure the impls don't infinite loop

0 commit comments

Comments
 (0)