Skip to content

Commit d478a91

Browse files
committed
Read full stream before returning first result to ensure it is not dropped prematurely, fixes #269
1 parent e5e71da commit d478a91

File tree

1 file changed

+23
-13
lines changed

1 file changed

+23
-13
lines changed

src/run_query_dsl/utils.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::pin::Pin;
33
use std::task::{Context, Poll};
44

55
use diesel::QueryResult;
6-
use futures_core::{ready, TryFuture, TryStream};
6+
use futures_core::{TryFuture, TryStream};
77
use futures_util::{TryFutureExt, TryStreamExt};
88

99
// We use a custom future implementation here to erase some lifetimes
@@ -80,14 +80,23 @@ where
8080

8181
/// Converts a stream into a future, only yielding the first element.
8282
/// Based on [`futures_util::stream::StreamFuture`].
83-
pub struct LoadNext<St> {
84-
stream: Option<St>,
83+
///
84+
/// Consumes the entire stream to ensure proper cleanup before returning which is
85+
/// required to fix: https://github.com/weiznich/diesel_async/issues/269
86+
pub struct LoadNext<St>
87+
where
88+
St: TryStream<Error = diesel::result::Error> + Unpin,
89+
{
90+
future: futures_util::stream::TryCollect<St, Vec<St::Ok>>,
8591
}
8692

87-
impl<St> LoadNext<St> {
93+
impl<St> LoadNext<St>
94+
where
95+
St: TryStream<Error = diesel::result::Error> + Unpin,
96+
{
8897
pub(crate) fn new(stream: St) -> Self {
8998
Self {
90-
stream: Some(stream),
99+
future: stream.try_collect(),
91100
}
92101
}
93102
}
@@ -99,14 +108,15 @@ where
99108
type Output = QueryResult<St::Ok>;
100109

101110
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102-
let first = {
103-
let s = self.stream.as_mut().expect("polling LoadNext twice");
104-
ready!(s.try_poll_next_unpin(cx))
105-
};
106-
self.stream = None;
107-
match first {
108-
Some(first) => Poll::Ready(first),
109-
None => Poll::Ready(Err(diesel::result::Error::NotFound)),
111+
match Pin::new(&mut self.future).poll(cx) {
112+
Poll::Ready(Ok(results)) => {
113+
match results.into_iter().next() {
114+
Some(first) => Poll::Ready(Ok(first)),
115+
None => Poll::Ready(Err(diesel::result::Error::NotFound)),
116+
}
117+
}
118+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
119+
Poll::Pending => Poll::Pending,
110120
}
111121
}
112122
}

0 commit comments

Comments
 (0)