Skip to content

Commit 725b715

Browse files
committed
Move futures/streams to a submodule
1 parent 0ca084d commit 725b715

File tree

7 files changed

+215
-198
lines changed

7 files changed

+215
-198
lines changed

postgres/src/copy_out_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ use bytes::{Buf, Bytes};
22
use futures::stream::{self, Stream};
33
use std::io::{self, BufRead, Cursor, Read};
44
use std::marker::PhantomData;
5+
use tokio_postgres::impls;
56
use tokio_postgres::Error;
67

78
pub struct CopyOutReader<'a> {
8-
it: stream::Wait<tokio_postgres::CopyOut>,
9+
it: stream::Wait<impls::CopyOut>,
910
cur: Cursor<Bytes>,
1011
_p: PhantomData<&'a mut ()>,
1112
}
@@ -17,7 +18,7 @@ impl<'a> Drop for CopyOutReader<'a> {
1718

1819
impl<'a> CopyOutReader<'a> {
1920
#[allow(clippy::new_ret_no_self)]
20-
pub(crate) fn new(stream: tokio_postgres::CopyOut) -> Result<CopyOutReader<'a>, Error> {
21+
pub(crate) fn new(stream: impls::CopyOut) -> Result<CopyOutReader<'a>, Error> {
2122
let mut it = stream.wait();
2223
let cur = match it.next() {
2324
Some(Ok(cur)) => cur,

postgres/src/query.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use fallible_iterator::FallibleIterator;
22
use futures::stream::{self, Stream};
33
use std::marker::PhantomData;
4+
use tokio_postgres::impls;
45
use tokio_postgres::{Error, Row};
56

67
pub struct Query<'a> {
7-
it: stream::Wait<tokio_postgres::Query>,
8+
it: stream::Wait<impls::Query>,
89
_p: PhantomData<&'a mut ()>,
910
}
1011

@@ -14,7 +15,7 @@ impl<'a> Drop for Query<'a> {
1415
}
1516

1617
impl<'a> Query<'a> {
17-
pub(crate) fn new(stream: tokio_postgres::Query) -> Query<'a> {
18+
pub(crate) fn new(stream: impls::Query) -> Query<'a> {
1819
Query {
1920
it: stream.wait(),
2021
_p: PhantomData,

postgres/src/query_portal.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use fallible_iterator::FallibleIterator;
22
use futures::stream::{self, Stream};
33
use std::marker::PhantomData;
4+
use tokio_postgres::impls;
45
use tokio_postgres::{Error, Row};
56

67
pub struct QueryPortal<'a> {
7-
it: stream::Wait<tokio_postgres::QueryPortal>,
8+
it: stream::Wait<impls::QueryPortal>,
89
_p: PhantomData<&'a mut ()>,
910
}
1011

@@ -14,7 +15,7 @@ impl<'a> Drop for QueryPortal<'a> {
1415
}
1516

1617
impl<'a> QueryPortal<'a> {
17-
pub(crate) fn new(stream: tokio_postgres::QueryPortal) -> QueryPortal<'a> {
18+
pub(crate) fn new(stream: impls::QueryPortal) -> QueryPortal<'a> {
1819
QueryPortal {
1920
it: stream.wait(),
2021
_p: PhantomData,

tokio-postgres/src/config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ use std::sync::Arc;
1515
use std::time::Duration;
1616
use tokio_io::{AsyncRead, AsyncWrite};
1717

18+
#[cfg(feature = "runtime")]
19+
use crate::impls::Connect;
20+
use crate::impls::ConnectRaw;
1821
#[cfg(feature = "runtime")]
1922
use crate::proto::ConnectFuture;
2023
use crate::proto::ConnectRawFuture;
24+
use crate::{Error, TlsConnect};
2125
#[cfg(feature = "runtime")]
22-
use crate::{Connect, MakeTlsConnect, Socket};
23-
use crate::{ConnectRaw, Error, TlsConnect};
26+
use crate::{MakeTlsConnect, Socket};
2427

2528
/// Properties required of a session.
2629
#[cfg(feature = "runtime")]

tokio-postgres/src/impls.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use bytes::{Bytes, IntoBuf};
2+
use futures::{try_ready, Async, Future, Poll, Stream};
3+
use std::error;
4+
use tokio_io::{AsyncRead, AsyncWrite};
5+
6+
use crate::proto;
7+
use crate::{Client, Connection, Error, Portal, Row, Statement, TlsConnect};
8+
#[cfg(feature = "runtime")]
9+
use crate::{MakeTlsConnect, Socket};
10+
11+
#[must_use = "futures do nothing unless polled"]
12+
pub struct CancelQueryRaw<S, T>(pub(crate) proto::CancelQueryRawFuture<S, T>)
13+
where
14+
S: AsyncRead + AsyncWrite,
15+
T: TlsConnect<S>;
16+
17+
impl<S, T> Future for CancelQueryRaw<S, T>
18+
where
19+
S: AsyncRead + AsyncWrite,
20+
T: TlsConnect<S>,
21+
{
22+
type Item = ();
23+
type Error = Error;
24+
25+
fn poll(&mut self) -> Poll<(), Error> {
26+
self.0.poll()
27+
}
28+
}
29+
30+
#[cfg(feature = "runtime")]
31+
#[must_use = "futures do nothing unless polled"]
32+
pub struct CancelQuery<T>(pub(crate) proto::CancelQueryFuture<T>)
33+
where
34+
T: MakeTlsConnect<Socket>;
35+
36+
#[cfg(feature = "runtime")]
37+
impl<T> Future for CancelQuery<T>
38+
where
39+
T: MakeTlsConnect<Socket>,
40+
{
41+
type Item = ();
42+
type Error = Error;
43+
44+
fn poll(&mut self) -> Poll<(), Error> {
45+
self.0.poll()
46+
}
47+
}
48+
49+
#[must_use = "futures do nothing unless polled"]
50+
pub struct ConnectRaw<S, T>(pub(crate) proto::ConnectRawFuture<S, T>)
51+
where
52+
S: AsyncRead + AsyncWrite,
53+
T: TlsConnect<S>;
54+
55+
impl<S, T> Future for ConnectRaw<S, T>
56+
where
57+
S: AsyncRead + AsyncWrite,
58+
T: TlsConnect<S>,
59+
{
60+
type Item = (Client, Connection<S, T::Stream>);
61+
type Error = Error;
62+
63+
fn poll(&mut self) -> Poll<(Client, Connection<S, T::Stream>), Error> {
64+
let (client, connection) = try_ready!(self.0.poll());
65+
66+
Ok(Async::Ready((Client(client), Connection(connection))))
67+
}
68+
}
69+
70+
#[cfg(feature = "runtime")]
71+
#[must_use = "futures do nothing unless polled"]
72+
pub struct Connect<T>(pub(crate) proto::ConnectFuture<T>)
73+
where
74+
T: MakeTlsConnect<Socket>;
75+
76+
#[cfg(feature = "runtime")]
77+
impl<T> Future for Connect<T>
78+
where
79+
T: MakeTlsConnect<Socket>,
80+
{
81+
type Item = (Client, Connection<Socket, T::Stream>);
82+
type Error = Error;
83+
84+
fn poll(&mut self) -> Poll<(Client, Connection<Socket, T::Stream>), Error> {
85+
let (client, connection) = try_ready!(self.0.poll());
86+
87+
Ok(Async::Ready((Client(client), Connection(connection))))
88+
}
89+
}
90+
91+
#[must_use = "futures do nothing unless polled"]
92+
pub struct Prepare(pub(crate) proto::PrepareFuture);
93+
94+
impl Future for Prepare {
95+
type Item = Statement;
96+
type Error = Error;
97+
98+
fn poll(&mut self) -> Poll<Statement, Error> {
99+
let statement = try_ready!(self.0.poll());
100+
101+
Ok(Async::Ready(Statement(statement)))
102+
}
103+
}
104+
105+
#[must_use = "streams do nothing unless polled"]
106+
pub struct Query(pub(crate) proto::QueryStream<proto::Statement>);
107+
108+
impl Stream for Query {
109+
type Item = Row;
110+
type Error = Error;
111+
112+
fn poll(&mut self) -> Poll<Option<Row>, Error> {
113+
self.0.poll()
114+
}
115+
}
116+
117+
#[must_use = "futures do nothing unless polled"]
118+
pub struct Bind(pub(crate) proto::BindFuture);
119+
120+
impl Future for Bind {
121+
type Item = Portal;
122+
type Error = Error;
123+
124+
fn poll(&mut self) -> Poll<Portal, Error> {
125+
match self.0.poll() {
126+
Ok(Async::Ready(portal)) => Ok(Async::Ready(Portal(portal))),
127+
Ok(Async::NotReady) => Ok(Async::NotReady),
128+
Err(e) => Err(e),
129+
}
130+
}
131+
}
132+
133+
#[must_use = "streams do nothing unless polled"]
134+
pub struct QueryPortal(pub(crate) proto::QueryStream<proto::Portal>);
135+
136+
impl Stream for QueryPortal {
137+
type Item = Row;
138+
type Error = Error;
139+
140+
fn poll(&mut self) -> Poll<Option<Row>, Error> {
141+
self.0.poll()
142+
}
143+
}
144+
145+
#[must_use = "futures do nothing unless polled"]
146+
pub struct CopyIn<S>(pub(crate) proto::CopyInFuture<S>)
147+
where
148+
S: Stream,
149+
S::Item: IntoBuf,
150+
<S::Item as IntoBuf>::Buf: Send,
151+
S::Error: Into<Box<dyn error::Error + Sync + Send>>;
152+
153+
impl<S> Future for CopyIn<S>
154+
where
155+
S: Stream,
156+
S::Item: IntoBuf,
157+
<S::Item as IntoBuf>::Buf: Send,
158+
S::Error: Into<Box<dyn error::Error + Sync + Send>>,
159+
{
160+
type Item = u64;
161+
type Error = Error;
162+
163+
fn poll(&mut self) -> Poll<u64, Error> {
164+
self.0.poll()
165+
}
166+
}
167+
168+
#[must_use = "streams do nothing unless polled"]
169+
pub struct CopyOut(pub(crate) proto::CopyOutStream);
170+
171+
impl Stream for CopyOut {
172+
type Item = Bytes;
173+
type Error = Error;
174+
175+
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
176+
self.0.poll()
177+
}
178+
}

0 commit comments

Comments
 (0)