Skip to content

Commit a3f611d

Browse files
committed
Overhaul copy_out
1 parent b8577b4 commit a3f611d

File tree

6 files changed

+48
-46
lines changed

6 files changed

+48
-46
lines changed

postgres/src/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,15 +275,15 @@ impl Client {
275275
/// # Ok(())
276276
/// # }
277277
/// ```
278-
pub fn copy_out<'a, T>(
279-
&'a mut self,
280-
query: &'a T,
281-
params: &'a [&(dyn ToSql + Sync)],
282-
) -> Result<impl BufRead + 'a, Error>
278+
pub fn copy_out<T>(
279+
&mut self,
280+
query: &T,
281+
params: &[&(dyn ToSql + Sync)],
282+
) -> Result<impl BufRead, Error>
283283
where
284284
T: ?Sized + ToStatement,
285285
{
286-
let stream = self.0.copy_out(query, params);
286+
let stream = executor::block_on(self.0.copy_out(query, params))?;
287287
CopyOutReader::new(stream)
288288
}
289289

postgres/src/transaction.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,15 @@ impl<'a> Transaction<'a> {
123123
}
124124

125125
/// Like `Client::copy_out`.
126-
pub fn copy_out<'b, T>(
127-
&'b mut self,
128-
query: &'b T,
129-
params: &'b [&(dyn ToSql + Sync)],
130-
) -> Result<impl BufRead + 'b, Error>
126+
pub fn copy_out<T>(
127+
&mut self,
128+
query: &T,
129+
params: &[&(dyn ToSql + Sync)],
130+
) -> Result<impl BufRead, Error>
131131
where
132132
T: ?Sized + ToStatement,
133133
{
134-
let stream = self.0.copy_out(query, params);
134+
let stream = executor::block_on(self.0.copy_out(query, params))?;
135135
CopyOutReader::new(stream)
136136
}
137137

tokio-postgres/src/client.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::cancel_query;
33
use crate::codec::BackendMessages;
44
use crate::config::{Host, SslMode};
55
use crate::connection::{Request, RequestMessages};
6+
use crate::copy_out::CopyStream;
67
use crate::query::RowStream;
78
use crate::slice_iter;
89
#[cfg(feature = "runtime")]
@@ -16,10 +17,10 @@ use crate::{cancel_query_raw, copy_in, copy_out, query, Transaction};
1617
use crate::{prepare, SimpleQueryMessage};
1718
use crate::{simple_query, Row};
1819
use crate::{Error, Statement};
19-
use bytes::{Bytes, IntoBuf};
20+
use bytes::IntoBuf;
2021
use fallible_iterator::FallibleIterator;
2122
use futures::channel::mpsc;
22-
use futures::{future, Stream, TryFutureExt, TryStream, TryStreamExt};
23+
use futures::{future, Stream, TryStream, TryStreamExt};
2324
use futures::{ready, StreamExt};
2425
use parking_lot::Mutex;
2526
use postgres_protocol::message::backend::Message;
@@ -293,20 +294,17 @@ impl Client {
293294
/// # Panics
294295
///
295296
/// Panics if the number of parameters provided does not match the number expected.
296-
pub fn copy_out<'a, T>(
297-
&'a self,
298-
statement: &'a T,
299-
params: &'a [&(dyn ToSql + Sync)],
300-
) -> impl Stream<Item = Result<Bytes, Error>> + 'a
297+
pub async fn copy_out<T>(
298+
&self,
299+
statement: &T,
300+
params: &[&(dyn ToSql + Sync)],
301+
) -> Result<CopyStream, Error>
301302
where
302303
T: ?Sized + ToStatement,
303304
{
304-
let f = async move {
305-
let statement = statement.__convert().into_statement(self).await?;
306-
let params = slice_iter(params);
307-
Ok(copy_out::copy_out(self.inner(), statement, params))
308-
};
309-
f.try_flatten_stream()
305+
let statement = statement.__convert().into_statement(self).await?;
306+
let params = slice_iter(params);
307+
copy_out::copy_out(self.inner(), statement, params).await
310308
}
311309

312310
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.

tokio-postgres/src/copy_out.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,23 @@ use crate::connection::RequestMessages;
44
use crate::types::ToSql;
55
use crate::{query, Error, Statement};
66
use bytes::Bytes;
7-
use futures::{ready, Stream, TryFutureExt};
7+
use futures::{ready, Stream};
88
use postgres_protocol::message::backend::Message;
99
use std::pin::Pin;
1010
use std::task::{Context, Poll};
1111

12-
pub fn copy_out<'a, I>(
13-
client: &'a InnerClient,
12+
pub async fn copy_out<'a, I>(
13+
client: &InnerClient,
1414
statement: Statement,
1515
params: I,
16-
) -> impl Stream<Item = Result<Bytes, Error>> + 'a
16+
) -> Result<CopyStream, Error>
1717
where
18-
I: IntoIterator<Item = &'a dyn ToSql> + 'a,
18+
I: IntoIterator<Item = &'a dyn ToSql>,
1919
I::IntoIter: ExactSizeIterator,
2020
{
21-
let f = async move {
22-
let buf = query::encode(&statement, params)?;
23-
let responses = start(client, buf).await?;
24-
Ok(CopyOut { responses })
25-
};
26-
f.try_flatten_stream()
21+
let buf = query::encode(&statement, params)?;
22+
let responses = start(client, buf).await?;
23+
Ok(CopyStream { responses })
2724
}
2825

2926
async fn start(client: &InnerClient, buf: Vec<u8>) -> Result<Responses, Error> {
@@ -42,11 +39,11 @@ async fn start(client: &InnerClient, buf: Vec<u8>) -> Result<Responses, Error> {
4239
Ok(responses)
4340
}
4441

45-
struct CopyOut {
42+
pub struct CopyStream {
4643
responses: Responses,
4744
}
4845

49-
impl Stream for CopyOut {
46+
impl Stream for CopyStream {
5047
type Item = Result<Bytes, Error>;
5148

5249
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

tokio-postgres/src/transaction.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::codec::FrontendMessage;
22
use crate::connection::RequestMessages;
3+
use crate::copy_out::CopyStream;
34
use crate::query::RowStream;
45
#[cfg(feature = "runtime")]
56
use crate::tls::MakeTlsConnect;
@@ -10,7 +11,7 @@ use crate::Socket;
1011
use crate::{
1112
bind, query, slice_iter, Client, Error, Portal, Row, SimpleQueryMessage, Statement, ToStatement,
1213
};
13-
use bytes::{Bytes, IntoBuf};
14+
use bytes::IntoBuf;
1415
use futures::{Stream, TryStream, TryStreamExt};
1516
use postgres_protocol::message::frontend;
1617
use std::error;
@@ -211,15 +212,15 @@ impl<'a> Transaction<'a> {
211212
}
212213

213214
/// Like `Client::copy_out`.
214-
pub fn copy_out<'b, T>(
215-
&'b self,
216-
statement: &'b T,
217-
params: &'b [&(dyn ToSql + Sync)],
218-
) -> impl Stream<Item = Result<Bytes, Error>> + 'b
215+
pub async fn copy_out<T>(
216+
&self,
217+
statement: &T,
218+
params: &[&(dyn ToSql + Sync)],
219+
) -> Result<CopyStream, Error>
219220
where
220221
T: ?Sized + ToStatement,
221222
{
222-
self.client.copy_out(statement, params)
223+
self.client.copy_out(statement, params).await
223224
}
224225

225226
/// Like `Client::simple_query`.

tokio-postgres/tests/test/main.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,13 @@ async fn copy_out() {
506506
.unwrap();
507507

508508
let stmt = client.prepare("COPY foo TO STDOUT").await.unwrap();
509-
let data = client.copy_out(&stmt, &[]).try_concat().await.unwrap();
509+
let data = client
510+
.copy_out(&stmt, &[])
511+
.await
512+
.unwrap()
513+
.try_concat()
514+
.await
515+
.unwrap();
510516
assert_eq!(&data[..], b"1\tjim\n2\tjoe\n");
511517
}
512518

0 commit comments

Comments
 (0)