Skip to content

Commit 3aab188

Browse files
committed
QueryPager: Typecheck each page
1 parent 3e61348 commit 3aab188

File tree

1 file changed

+32
-11
lines changed

1 file changed

+32
-11
lines changed

scylla/src/client/pager.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -615,36 +615,40 @@ pub struct QueryPager {
615615
impl QueryPager {
616616
/// Returns the next item (`ColumnIterator`) from the stream.
617617
///
618-
/// This can be used with `type_check() for manual deserialization - see example below.
618+
/// Because pages may have different result metadata, each one needs to be type-checked before deserialization.
619+
/// The bool returned in second element of the tuple indicates whether the page was fresh or not.
620+
/// This allows user to then perform the type check for fresh pages.
619621
///
620622
/// This is not a part of the `Stream` interface because the returned iterator
621623
/// borrows from self.
622624
///
623625
/// This is cancel-safe.
624-
async fn next(&mut self) -> Option<Result<ColumnIterator<'_, '_>, NextRowError>> {
626+
async fn next(&mut self) -> Option<Result<(ColumnIterator<'_, '_>, bool), NextRowError>> {
625627
let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
626-
match res {
627-
Some(Ok(())) => {}
628+
let fresh_page = match res {
629+
Some(Ok(f)) => f,
628630
Some(Err(err)) => return Some(Err(err)),
629631
None => return None,
630-
}
632+
};
631633

632634
// We are guaranteed here to have a non-empty page, so unwrap
633635
Some(
634636
self.current_page
635637
.next()
636638
.unwrap()
637-
.map_err(NextRowError::RowDeserializationError),
639+
.map_err(NextRowError::RowDeserializationError)
640+
.map(|x| (x, fresh_page)),
638641
)
639642
}
640643

641644
/// Tries to acquire a non-empty page, if current page is exhausted.
645+
/// Boolean value in `Some(Ok(r))` is true if a new page was fetched.
642646
fn poll_fill_page(
643647
mut self: Pin<&mut Self>,
644648
cx: &mut Context<'_>,
645-
) -> Poll<Option<Result<(), NextRowError>>> {
649+
) -> Poll<Option<Result<bool, NextRowError>>> {
646650
if !self.is_current_page_exhausted() {
647-
return Poll::Ready(Some(Ok(())));
651+
return Poll::Ready(Some(Ok(false)));
648652
}
649653
ready_some_ok!(self.as_mut().poll_next_page(cx));
650654
if self.is_current_page_exhausted() {
@@ -653,7 +657,7 @@ impl QueryPager {
653657
cx.waker().wake_by_ref();
654658
Poll::Pending
655659
} else {
656-
Poll::Ready(Some(Ok(())))
660+
Poll::Ready(Some(Ok(true)))
657661
}
658662
}
659663

@@ -1040,6 +1044,7 @@ impl QueryPager {
10401044
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
10411045
pub struct TypedRowStream<RowT> {
10421046
raw_row_lending_stream: QueryPager,
1047+
current_page_typechecked: bool,
10431048
_phantom: std::marker::PhantomData<RowT>,
10441049
}
10451050

@@ -1061,10 +1066,12 @@ where
10611066
RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>,
10621067
{
10631068
fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
1069+
#[allow(deprecated)] // In TypedRowStream we take care to type check each page.
10641070
raw_stream.type_check::<RowT>()?;
10651071

10661072
Ok(Self {
10671073
raw_row_lending_stream: raw_stream,
1074+
current_page_typechecked: true,
10681075
_phantom: Default::default(),
10691076
})
10701077
}
@@ -1101,8 +1108,18 @@ where
11011108

11021109
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
11031110
let next_fut = async {
1104-
self.raw_row_lending_stream.next().await.map(|res| {
1105-
res.and_then(|column_iterator| {
1111+
let real_self: &mut Self = &mut self; // Self is Unpin, and this lets us perform partial borrows.
1112+
real_self.raw_row_lending_stream.next().await.map(|res| {
1113+
res.and_then(|(column_iterator, fresh_page)| {
1114+
if fresh_page {
1115+
real_self.current_page_typechecked = false;
1116+
}
1117+
if !real_self.current_page_typechecked {
1118+
column_iterator.type_check::<RowT>().map_err(|e| {
1119+
NextRowError::NextPageError(NextPageError::TypeCheckError(e))
1120+
})?;
1121+
real_self.current_page_typechecked = true;
1122+
}
11061123
<RowT as DeserializeRow>::deserialize(column_iterator)
11071124
.map_err(NextRowError::RowDeserializationError)
11081125
})
@@ -1130,6 +1147,10 @@ pub enum NextPageError {
11301147
/// Failed to deserialize result metadata associated with next page response.
11311148
#[error("Failed to deserialize result metadata associated with next page response: {0}")]
11321149
ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError),
1150+
1151+
/// Failed to type check a received page.
1152+
#[error("Failed to type check a received page: {0}")]
1153+
TypeCheckError(#[from] TypeCheckError),
11331154
}
11341155

11351156
/// An error returned by async iterator API.

0 commit comments

Comments
 (0)