1+ mod utils;
2+
13use crate :: AsyncConnectionCore ;
24use diesel:: associations:: HasTable ;
35use diesel:: query_builder:: IntoUpdateTarget ;
46use diesel:: result:: QueryResult ;
57use diesel:: AsChangeset ;
68use futures_core:: future:: BoxFuture ;
7- use futures_core:: Stream ;
8- use futures_util:: { future, stream, FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
9+ use futures_util:: { stream, FutureExt , StreamExt , TryStreamExt } ;
910use std:: future:: Future ;
10- use std:: pin:: Pin ;
1111
1212/// The traits used by `QueryDsl`.
1313///
@@ -22,7 +22,7 @@ pub mod methods {
2222 use diesel:: expression:: QueryMetadata ;
2323 use diesel:: query_builder:: { AsQuery , QueryFragment , QueryId } ;
2424 use diesel:: query_dsl:: CompatibleType ;
25- use futures_util:: { Future , Stream , TryFutureExt } ;
25+ use futures_util:: { Future , Stream } ;
2626
2727 /// The `execute` method
2828 ///
@@ -74,6 +74,7 @@ pub mod methods {
7474 type LoadFuture < ' conn > : Future < Output = QueryResult < Self :: Stream < ' conn > > > + Send
7575 where
7676 Conn : ' conn ;
77+
7778 /// The inner stream returned by [`LoadQuery::internal_load`]
7879 type Stream < ' conn > : Stream < Item = QueryResult < U > > + Send
7980 where
@@ -96,10 +97,7 @@ pub mod methods {
9697 ST : ' static ,
9798 {
9899 type LoadFuture < ' conn >
99- = future:: MapOk <
100- Conn :: LoadFuture < ' conn , ' query > ,
101- fn ( Conn :: Stream < ' conn , ' query > ) -> Self :: Stream < ' conn > ,
102- >
100+ = utils:: Map < Conn :: LoadFuture < ' conn , ' query > , Self :: Stream < ' conn > >
103101 where
104102 Conn : ' conn ;
105103
@@ -112,33 +110,13 @@ pub mod methods {
112110 Conn : ' conn ;
113111
114112 fn internal_load ( self , conn : & mut Conn ) -> Self :: LoadFuture < ' _ > {
115- conn. load ( self )
116- . map_ok ( map_result_stream_future :: < U , _ , _ , DB , ST > )
113+ utils:: Map :: new ( conn. load ( self ) , |stream| {
114+ Ok ( stream?. map ( |row| {
115+ U :: build_from_row ( & row?) . map_err ( diesel:: result:: Error :: DeserializationError )
116+ } ) )
117+ } )
117118 }
118119 }
119-
120- #[ allow( clippy:: type_complexity) ]
121- fn map_result_stream_future < ' s , ' a , U , S , R , DB , ST > (
122- stream : S ,
123- ) -> stream:: Map < S , fn ( QueryResult < R > ) -> QueryResult < U > >
124- where
125- S : Stream < Item = QueryResult < R > > + Send + ' s ,
126- R : diesel:: row:: Row < ' a , DB > + ' s ,
127- DB : Backend + ' static ,
128- U : FromSqlRow < ST , DB > + ' static ,
129- ST : ' static ,
130- {
131- stream. map ( map_row_helper :: < _ , DB , U , ST > )
132- }
133-
134- fn map_row_helper < ' a , R , DB , U , ST > ( row : QueryResult < R > ) -> QueryResult < U >
135- where
136- U : FromSqlRow < ST , DB > ,
137- R : diesel:: row:: Row < ' a , DB > ,
138- DB : Backend ,
139- {
140- U :: build_from_row ( & row?) . map_err ( diesel:: result:: Error :: DeserializationError )
141- }
142120}
143121
144122/// The return types produced by the various [`RunQueryDsl`] methods
@@ -149,37 +127,24 @@ pub mod methods {
149127// the same connection
150128#[ allow( type_alias_bounds) ] // we need these bounds otherwise we cannot use GAT's
151129pub mod return_futures {
130+ use crate :: run_query_dsl:: utils;
131+
152132 use super :: methods:: LoadQuery ;
153- use diesel:: QueryResult ;
154- use futures_util:: { future, stream} ;
133+ use futures_util:: stream;
155134 use std:: pin:: Pin ;
156135
157136 /// The future returned by [`RunQueryDsl::load`](super::RunQueryDsl::load)
158137 /// and [`RunQueryDsl::get_results`](super::RunQueryDsl::get_results)
159138 ///
160139 /// This is essentially `impl Future<Output = QueryResult<Vec<U>>>`
161- pub type LoadFuture < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > = future:: AndThen <
162- Q :: LoadFuture < ' conn > ,
163- stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > ,
164- fn ( Q :: Stream < ' conn > ) -> stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > ,
165- > ;
140+ pub type LoadFuture < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > =
141+ utils:: AndThen < Q :: LoadFuture < ' conn > , stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > > ;
166142
167143 /// The future returned by [`RunQueryDsl::get_result`](super::RunQueryDsl::get_result)
168144 ///
169145 /// This is essentially `impl Future<Output = QueryResult<U>>`
170- pub type GetResult < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > = future:: AndThen <
171- Q :: LoadFuture < ' conn > ,
172- future:: Map <
173- stream:: StreamFuture < Pin < Box < Q :: Stream < ' conn > > > > ,
174- fn ( ( Option < QueryResult < U > > , Pin < Box < Q :: Stream < ' conn > > > ) ) -> QueryResult < U > ,
175- > ,
176- fn (
177- Q :: Stream < ' conn > ,
178- ) -> future:: Map <
179- stream:: StreamFuture < Pin < Box < Q :: Stream < ' conn > > > > ,
180- fn ( ( Option < QueryResult < U > > , Pin < Box < Q :: Stream < ' conn > > > ) ) -> QueryResult < U > ,
181- > ,
182- > ;
146+ pub type GetResult < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > =
147+ utils:: AndThen < Q :: LoadFuture < ' conn > , utils:: LoadNext < Pin < Box < Q :: Stream < ' conn > > > > > ;
183148}
184149
185150/// Methods used to execute queries.
@@ -346,13 +311,9 @@ pub trait RunQueryDsl<Conn>: Sized {
346311 Conn : AsyncConnectionCore ,
347312 Self : methods:: LoadQuery < ' query , Conn , U > + ' query ,
348313 {
349- fn collect_result < U , S > ( stream : S ) -> stream:: TryCollect < S , Vec < U > >
350- where
351- S : Stream < Item = QueryResult < U > > ,
352- {
353- stream. try_collect ( )
354- }
355- self . internal_load ( conn) . and_then ( collect_result :: < U , _ > )
314+ utils:: AndThen :: new ( self . internal_load ( conn) , |stream| {
315+ Ok ( stream?. try_collect :: < Vec < _ > > ( ) )
316+ } )
356317 }
357318
358319 /// Executes the given query, returning a [`Stream`] with the returned rows.
@@ -547,29 +508,9 @@ pub trait RunQueryDsl<Conn>: Sized {
547508 Conn : AsyncConnectionCore ,
548509 Self : methods:: LoadQuery < ' query , Conn , U > + ' query ,
549510 {
550- #[ allow( clippy:: type_complexity) ]
551- fn get_next_stream_element < S , U > (
552- stream : S ,
553- ) -> future:: Map <
554- stream:: StreamFuture < Pin < Box < S > > > ,
555- fn ( ( Option < QueryResult < U > > , Pin < Box < S > > ) ) -> QueryResult < U > ,
556- >
557- where
558- S : Stream < Item = QueryResult < U > > ,
559- {
560- fn map_option_to_result < U , S > (
561- ( o, _) : ( Option < QueryResult < U > > , Pin < Box < S > > ) ,
562- ) -> QueryResult < U > {
563- match o {
564- Some ( s) => s,
565- None => Err ( diesel:: result:: Error :: NotFound ) ,
566- }
567- }
568-
569- Box :: pin ( stream) . into_future ( ) . map ( map_option_to_result)
570- }
571-
572- self . load_stream ( conn) . and_then ( get_next_stream_element)
511+ utils:: AndThen :: new ( self . internal_load ( conn) , |stream| {
512+ Ok ( utils:: LoadNext :: new ( Box :: pin ( stream?) ) )
513+ } )
573514 }
574515
575516 /// Runs the command, returning an `Vec` with the affected rows.
0 commit comments