1
1
use crate :: client:: { InnerClient , Responses } ;
2
+ use pin_project_lite:: pin_project;
2
3
use crate :: codec:: FrontendMessage ;
3
4
use crate :: connection:: RequestMessages ;
4
5
use crate :: types:: ToSql ;
@@ -8,6 +9,7 @@ use futures::{ready, Stream};
8
9
use postgres_protocol:: message:: backend:: Message ;
9
10
use std:: pin:: Pin ;
10
11
use std:: task:: { Context , Poll } ;
12
+ use std:: marker:: PhantomPinned ;
11
13
12
14
pub async fn copy_out < ' a , I > (
13
15
client : & InnerClient ,
20
22
{
21
23
let buf = query:: encode ( client, & statement, params) ?;
22
24
let responses = start ( client, buf) . await ?;
23
- Ok ( CopyStream { responses } )
25
+ Ok ( CopyStream { responses, _p : PhantomPinned } )
24
26
}
25
27
26
28
async fn start ( client : & InnerClient , buf : Bytes ) -> Result < Responses , Error > {
@@ -39,16 +41,22 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
39
41
Ok ( responses)
40
42
}
41
43
42
- /// A stream of `COPY ... TO STDOUT` query data.
43
- pub struct CopyStream {
44
- responses : Responses ,
44
+ pin_project ! {
45
+ /// A stream of `COPY ... TO STDOUT` query data.
46
+ pub struct CopyStream {
47
+ responses: Responses ,
48
+ #[ pin]
49
+ _p: PhantomPinned ,
50
+ }
45
51
}
46
52
47
53
impl Stream for CopyStream {
48
54
type Item = Result < Bytes , Error > ;
49
55
50
- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
51
- match ready ! ( self . responses. poll_next( cx) ?) {
56
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
57
+ let this = self . project ( ) ;
58
+
59
+ match ready ! ( this. responses. poll_next( cx) ?) {
52
60
Message :: CopyData ( body) => Poll :: Ready ( Some ( Ok ( body. into_bytes ( ) ) ) ) ,
53
61
Message :: CopyDone => Poll :: Ready ( None ) ,
54
62
_ => Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
0 commit comments