Skip to content

Commit 3340b9d

Browse files
piodulwprzytula
andcommitted
iterator: adjust to the new deserialization framework
This commit makes the RowIteratorWorker pass raw rows to the main tokio task, instead of the eagerly deserialized Rows. The equivalent of the old RowIterator is now QueryPager. It cannot be conveniently iterated on, as it does not have any information about the column types. It features (yet not exposes) a `next()` method for deserializing consecutive `ColumnIterator`s. Users cannot manually perform deserialization using this method directly, because will be able to utilise the preferred (typed) API that will be added in the next commit. If they prefer manual deserialization, they will be be able to specify `ColumnIterator` as the deserialized row type, and proceed with manual deserialization from there. The legacy iterators are preserved by wrapping around QueryPager. Co-authored-by: Wojciech Przytuła <[email protected]>
1 parent 814fbe6 commit 3340b9d

File tree

3 files changed

+141
-77
lines changed

3 files changed

+141
-77
lines changed

scylla/src/transport/connection.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use std::{
4747
};
4848

4949
use super::errors::{ProtocolError, UseKeyspaceProtocolError};
50-
use super::iterator::LegacyRowIterator;
50+
use super::iterator::{LegacyRowIterator, QueryPager};
5151
use super::locator::tablets::{RawTablet, TabletParsingError};
5252
use super::query_result::QueryResult;
5353
use super::session::AddressTranslator;
@@ -1188,13 +1188,9 @@ impl Connection {
11881188
.determine_consistency(self.config.default_consistency);
11891189
let serial_consistency = query.config.serial_consistency.flatten();
11901190

1191-
LegacyRowIterator::new_for_connection_query_iter(
1192-
query,
1193-
self,
1194-
consistency,
1195-
serial_consistency,
1196-
)
1197-
.await
1191+
QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
1192+
.await
1193+
.map(QueryPager::into_legacy)
11981194
}
11991195

12001196
/// Executes a prepared statements and fetches its results over multiple pages, using
@@ -1209,14 +1205,15 @@ impl Connection {
12091205
.determine_consistency(self.config.default_consistency);
12101206
let serial_consistency = prepared_statement.config.serial_consistency.flatten();
12111207

1212-
LegacyRowIterator::new_for_connection_execute_iter(
1208+
QueryPager::new_for_connection_execute_iter(
12131209
prepared_statement,
12141210
values,
12151211
self,
12161212
consistency,
12171213
serial_consistency,
12181214
)
12191215
.await
1216+
.map(QueryPager::into_legacy)
12201217
}
12211218

12221219
#[allow(dead_code)]

scylla/src/transport/iterator.rs

Lines changed: 125 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,31 @@
11
//! Iterators over rows returned by paged queries
22
33
use std::future::Future;
4-
use std::mem;
54
use std::net::SocketAddr;
65
use std::ops::ControlFlow;
76
use std::pin::Pin;
87
use std::sync::Arc;
98
use std::task::{Context, Poll};
109

1110
use futures::Stream;
12-
use scylla_cql::frame::request::query::PagingStateResponse;
11+
use scylla_cql::frame::frame_errors::RowsParseError;
1312
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
1413
use scylla_cql::frame::response::NonErrorResponse;
14+
use scylla_cql::types::deserialize::result::RawRowLendingIterator;
15+
use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow};
1516
use scylla_cql::types::serialize::row::SerializedValues;
1617
use std::result::Result;
1718
use thiserror::Error;
1819
use tokio::sync::mpsc;
1920

2021
use super::execution_profile::ExecutionProfileInner;
22+
use super::query_result::ColumnSpecs;
2123
use super::session::RequestSpan;
2224
use crate::cql_to_rust::{FromRow, FromRowError};
2325

2426
use crate::frame::response::{
2527
result,
26-
result::{ColumnSpec, Row, Rows},
28+
result::{ColumnSpec, Row},
2729
};
2830
use crate::history::{self, HistoryListener};
2931
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
@@ -535,46 +537,70 @@ where
535537
}
536538
}
537539

538-
/// Iterator over rows returned by paged queries\
539-
/// Allows to easily access rows without worrying about handling multiple pages
540-
pub struct LegacyRowIterator {
541-
current_row_idx: usize,
542-
current_page: Rows,
540+
/// An intermediate object that allows to construct an iterator over a query
541+
/// that is asynchronously paged in the background.
542+
///
543+
/// TODO: implement and describe the new API
544+
///
545+
/// A pre-0.15.0 interface is also available, although deprecated:
546+
/// `into_legacy()` method converts QueryPager to LegacyRowIterator,
547+
/// enabling Stream'ed operation on rows being eagerly deserialized
548+
/// to the middle-man [Row] type. This is inefficient, especially if
549+
/// [Row] is not the intended target type.
550+
pub struct QueryPager {
551+
current_page: RawRowLendingIterator,
543552
page_receiver: mpsc::Receiver<Result<ReceivedPage, QueryError>>,
544553
tracing_ids: Vec<Uuid>,
545554
}
546555

547-
/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\
548-
/// Instead it uses the asynchronous `Stream` trait
549-
impl Stream for LegacyRowIterator {
550-
type Item = Result<Row, QueryError>;
556+
// QueryPager is not an iterator or a stream! However, it implements
557+
// a `next()` method that returns a [ColumnIterator], which can be used
558+
// to manually deserialize a row.
559+
// The `ColumnIterator` borrows from the `QueryPager`, and the [futures::Stream] trait
560+
// does not allow for such a pattern. Lending streams are not a thing yet.
561+
impl QueryPager {
562+
/// Returns the next item (`ColumnIterator`) from the stream.
563+
///
564+
/// This can be used with `type_check() for manual deserialization - see example below.
565+
///
566+
/// This is not a part of the `Stream` interface because the returned iterator
567+
/// borrows from self.
568+
///
569+
/// This is cancel-safe.
570+
async fn next(&mut self) -> Option<Result<ColumnIterator, QueryError>> {
571+
let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
572+
match res {
573+
Some(Ok(())) => {}
574+
Some(Err(err)) => return Some(Err(err)),
575+
None => return None,
576+
}
551577

552-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
553-
self.poll_next_internal(cx)
578+
// We are guaranteed here to have a non-empty page, so unwrap
579+
Some(
580+
self.current_page
581+
.next()
582+
.unwrap()
583+
.map_err(|e| RowsParseError::from(e).into()),
584+
)
554585
}
555-
}
556586

557-
impl LegacyRowIterator {
558-
fn poll_next_internal(
559-
mut self: Pin<&mut Self>,
587+
/// Tries to acquire a non-empty page, if current page is exhausted.
588+
fn poll_fill_page<'r>(
589+
mut self: Pin<&'r mut Self>,
560590
cx: &mut Context<'_>,
561-
) -> Poll<Option<Result<Row, QueryError>>> {
562-
if self.as_ref().is_current_page_exhausted() {
563-
ready_some_ok!(self.as_mut().poll_next_page(cx));
591+
) -> Poll<Option<Result<(), QueryError>>> {
592+
if !self.is_current_page_exhausted() {
593+
return Poll::Ready(Some(Ok(())));
564594
}
565-
566-
let mut s = self.as_mut();
567-
568-
let idx = s.current_row_idx;
569-
if idx < s.current_page.rows.len() {
570-
let row = mem::take(&mut s.current_page.rows[idx]);
571-
s.current_row_idx += 1;
572-
return Poll::Ready(Some(Ok(row)));
595+
ready_some_ok!(self.as_mut().poll_next_page(cx));
596+
if self.is_current_page_exhausted() {
597+
// We most likely got a zero-sized page.
598+
// Try again later.
599+
cx.waker().wake_by_ref();
600+
Poll::Pending
601+
} else {
602+
Poll::Ready(Some(Ok(())))
573603
}
574-
// We probably got a zero-sized page
575-
// Yield, but tell that we are ready
576-
cx.waker().wake_by_ref();
577-
Poll::Pending
578604
}
579605

580606
/// Makes an attempt to acquire the next page (which may be empty).
@@ -589,18 +615,8 @@ impl LegacyRowIterator {
589615
let mut s = self.as_mut();
590616

591617
let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
592-
let rows = match received_page
593-
.rows
594-
// As RowIteratorWorker manages paging itself, the paging state response
595-
// returned to the user is always NoMorePages. It used to be so before
596-
// the deserialization refactor, too.
597-
.into_legacy_rows(PagingStateResponse::NoMorePages)
598-
{
599-
Ok(rows) => rows,
600-
Err(err) => return Poll::Ready(Some(Err(err.into()))),
601-
};
602-
s.current_page = rows;
603-
s.current_row_idx = 0;
618+
let raw_rows_with_deserialized_metadata = received_page.rows.deserialize_metadata()?;
619+
s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);
604620

605621
if let Some(tracing_id) = received_page.tracing_id {
606622
s.tracing_ids.push(tracing_id);
@@ -609,12 +625,13 @@ impl LegacyRowIterator {
609625
Poll::Ready(Some(Ok(())))
610626
}
611627

612-
/// Converts this iterator into an iterator over rows parsed as given type
613-
pub fn into_typed<RowT: FromRow>(self) -> LegacyTypedRowIterator<RowT> {
614-
LegacyTypedRowIterator {
615-
row_iterator: self,
616-
phantom_data: Default::default(),
617-
}
628+
/// Converts this iterator into an iterator over rows parsed as given type,
629+
/// using the legacy deserialization framework.
630+
/// This is inefficient, because all rows are being eagerly deserialized
631+
/// to a middle-man [Row] type.
632+
#[inline]
633+
pub fn into_legacy(self) -> LegacyRowIterator {
634+
LegacyRowIterator { raw_iterator: self }
618635
}
619636

620637
pub(crate) async fn new_for_query(
@@ -888,16 +905,13 @@ impl LegacyRowIterator {
888905
// to the channel (the PageSendAttemptedProof helps enforce this)
889906
// - That future is polled in a tokio::task which isn't going to be
890907
// cancelled
891-
let pages_received = receiver.recv().await.unwrap()?;
892-
let rows = pages_received
893-
.rows
894-
.into_legacy_rows(PagingStateResponse::NoMorePages)?;
908+
let page_received = receiver.recv().await.unwrap()?;
909+
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
895910

896911
Ok(Self {
897-
current_row_idx: 0,
898-
current_page: rows,
912+
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
899913
page_receiver: receiver,
900-
tracing_ids: if let Some(tracing_id) = pages_received.tracing_id {
914+
tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
901915
vec![tracing_id]
902916
} else {
903917
Vec::new()
@@ -906,17 +920,63 @@ impl LegacyRowIterator {
906920
}
907921

908922
/// If tracing was enabled returns tracing ids of all finished page queries
909-
pub fn get_tracing_ids(&self) -> &[Uuid] {
923+
#[inline]
924+
pub fn tracing_ids(&self) -> &[Uuid] {
910925
&self.tracing_ids
911926
}
912927

913928
/// Returns specification of row columns
914-
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
915-
self.current_page.metadata.inner().col_specs()
929+
#[inline]
930+
pub fn column_specs(&self) -> ColumnSpecs<'_> {
931+
ColumnSpecs::new(self.current_page.metadata().col_specs())
916932
}
917933

918934
fn is_current_page_exhausted(&self) -> bool {
919-
self.current_row_idx >= self.current_page.rows.len()
935+
self.current_page.rows_remaining() == 0
936+
}
937+
}
938+
939+
/// Iterator over rows returned by paged queries.
940+
///
941+
/// Allows to easily access rows without worrying about handling multiple pages.
942+
pub struct LegacyRowIterator {
943+
raw_iterator: QueryPager,
944+
}
945+
946+
impl Stream for LegacyRowIterator {
947+
type Item = Result<Row, QueryError>;
948+
949+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
950+
let mut s = self.as_mut();
951+
952+
let next_fut = s.raw_iterator.next();
953+
futures::pin_mut!(next_fut);
954+
955+
let next_column_iter = ready_some_ok!(next_fut.poll(cx));
956+
957+
let next_ready_row =
958+
Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into());
959+
960+
Poll::Ready(Some(next_ready_row))
961+
}
962+
}
963+
964+
impl LegacyRowIterator {
965+
/// If tracing was enabled returns tracing ids of all finished page queries
966+
pub fn get_tracing_ids(&self) -> &[Uuid] {
967+
self.raw_iterator.tracing_ids()
968+
}
969+
970+
/// Returns specification of row columns
971+
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
972+
self.raw_iterator.column_specs().inner()
973+
}
974+
975+
pub fn into_typed<RowT>(self) -> LegacyTypedRowIterator<RowT> {
976+
LegacyTypedRowIterator {
977+
row_iterator: self,
978+
_phantom_data: Default::default(),
979+
}
920980
}
921981
}
922982

@@ -925,16 +985,18 @@ impl LegacyRowIterator {
925985
/// Returned by `RowIterator::into_typed`
926986
pub struct LegacyTypedRowIterator<RowT> {
927987
row_iterator: LegacyRowIterator,
928-
phantom_data: std::marker::PhantomData<RowT>,
988+
_phantom_data: std::marker::PhantomData<RowT>,
929989
}
930990

931991
impl<RowT> LegacyTypedRowIterator<RowT> {
932992
/// If tracing was enabled returns tracing ids of all finished page queries
993+
#[inline]
933994
pub fn get_tracing_ids(&self) -> &[Uuid] {
934995
self.row_iterator.get_tracing_ids()
935996
}
936997

937998
/// Returns specification of row columns
999+
#[inline]
9381000
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
9391001
self.row_iterator.get_column_specs()
9401002
}

scylla/src/transport/session.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use crate::batch::batch_values;
55
#[cfg(feature = "cloud")]
66
use crate::cloud::CloudConfig;
7+
use crate::LegacyQueryResult;
78

89
use crate::history;
910
use crate::history::HistoryListener;
@@ -42,7 +43,8 @@ use super::connection::QueryResponse;
4243
use super::connection::SslConfig;
4344
use super::errors::TracingProtocolError;
4445
use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
45-
use super::legacy_query_result::{LegacyQueryResult, MaybeFirstRowTypedError};
46+
use super::iterator::QueryPager;
47+
use super::legacy_query_result::MaybeFirstRowTypedError;
4648
#[cfg(feature = "cloud")]
4749
use super::node::CloudEndpoint;
4850
use super::node::{InternalKnownNode, KnownNode};
@@ -885,27 +887,29 @@ impl Session {
885887
.access();
886888

887889
if values.is_empty() {
888-
LegacyRowIterator::new_for_query(
890+
QueryPager::new_for_query(
889891
query,
890892
execution_profile,
891893
self.cluster.get_data(),
892894
self.metrics.clone(),
893895
)
894896
.await
897+
.map(QueryPager::into_legacy)
895898
} else {
896-
// Making LegacyRowIterator::new_for_query work with values is too hard (if even possible)
899+
// Making QueryPager::new_for_query work with values is too hard (if even possible)
897900
// so instead of sending one prepare to a specific connection on each iterator query,
898901
// we fully prepare a statement beforehand.
899902
let prepared = self.prepare(query).await?;
900903
let values = prepared.serialize_values(&values)?;
901-
LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig {
904+
QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
902905
prepared,
903906
values,
904907
execution_profile,
905908
cluster_data: self.cluster.get_data(),
906909
metrics: self.metrics.clone(),
907910
})
908911
.await
912+
.map(QueryPager::into_legacy)
909913
}
910914
}
911915

@@ -1288,14 +1292,15 @@ impl Session {
12881292
.unwrap_or_else(|| self.get_default_execution_profile_handle())
12891293
.access();
12901294

1291-
LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig {
1295+
QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
12921296
prepared,
12931297
values: serialized_values,
12941298
execution_profile,
12951299
cluster_data: self.cluster.get_data(),
12961300
metrics: self.metrics.clone(),
12971301
})
12981302
.await
1303+
.map(QueryPager::into_legacy)
12991304
}
13001305

13011306
/// Perform a batch request.\

0 commit comments

Comments
 (0)