Skip to content

Commit 6d9d971

Browse files
wprzytulapiodul
andcommitted
connection: migrate query_iter to new deserialization framework
The Connection::query_iter method is changed to use the new deserialization framework. All the internal uses of it in topology.rs are adjusted. Co-authored-by: Piotr Dulikowski <[email protected]>
1 parent db6bee0 commit 6d9d971

File tree

3 files changed

+86
-69
lines changed

3 files changed

+86
-69
lines changed

scylla/src/transport/connection.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use std::{
4747
};
4848

4949
use super::errors::{ProtocolError, SchemaVersionFetchError, UseKeyspaceProtocolError};
50-
use super::iterator::{LegacyRowIterator, QueryPager};
50+
use super::iterator::QueryPager;
5151
use super::locator::tablets::{RawTablet, TabletParsingError};
5252
use super::query_result::QueryResult;
5353
use super::session::AddressTranslator;
@@ -1182,15 +1182,14 @@ impl Connection {
11821182
pub(crate) async fn query_iter(
11831183
self: Arc<Self>,
11841184
query: Query,
1185-
) -> Result<LegacyRowIterator, QueryError> {
1185+
) -> Result<QueryPager, QueryError> {
11861186
let consistency = query
11871187
.config
11881188
.determine_consistency(self.config.default_consistency);
11891189
let serial_consistency = query.config.serial_consistency.flatten();
11901190

11911191
QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
11921192
.await
1193-
.map(QueryPager::into_legacy)
11941193
}
11951194

11961195
/// Executes a prepared statements and fetches its results over multiple pages, using
@@ -1199,7 +1198,7 @@ impl Connection {
11991198
self: Arc<Self>,
12001199
prepared_statement: PreparedStatement,
12011200
values: SerializedValues,
1202-
) -> Result<LegacyRowIterator, QueryError> {
1201+
) -> Result<QueryPager, QueryError> {
12031202
let consistency = prepared_statement
12041203
.config
12051204
.determine_consistency(self.config.default_consistency);
@@ -1213,7 +1212,6 @@ impl Connection {
12131212
serial_consistency,
12141213
)
12151214
.await
1216-
.map(QueryPager::into_legacy)
12171215
}
12181216

12191217
#[allow(dead_code)]
@@ -2479,6 +2477,8 @@ mod tests {
24792477
.query_iter(select_query.clone())
24802478
.await
24812479
.unwrap()
2480+
.rows_stream::<(i32,)>()
2481+
.unwrap()
24822482
.try_collect::<Vec<_>>()
24832483
.await
24842484
.unwrap();
@@ -2503,7 +2503,8 @@ mod tests {
25032503
.query_iter(select_query.clone())
25042504
.await
25052505
.unwrap()
2506-
.into_typed::<(i32,)>()
2506+
.rows_stream::<(i32,)>()
2507+
.unwrap()
25072508
.map(|ret| ret.unwrap().0)
25082509
.collect::<Vec<_>>()
25092510
.await;
@@ -2517,6 +2518,8 @@ mod tests {
25172518
))
25182519
.await
25192520
.unwrap()
2521+
.rows_stream::<()>()
2522+
.unwrap()
25202523
.try_collect::<Vec<_>>()
25212524
.await
25222525
.unwrap();

scylla/src/transport/errors.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use scylla_cql::{
2525
response::CqlResponseKind,
2626
value::SerializeValuesError,
2727
},
28-
types::serialize::SerializationError,
28+
types::{deserialize::TypeCheckError, serialize::SerializationError},
2929
};
3030

3131
use thiserror::Error;
@@ -436,7 +436,7 @@ pub enum PeersMetadataError {
436436
pub enum KeyspacesMetadataError {
437437
/// system_schema.keyspaces has invalid column type.
438438
#[error("system_schema.keyspaces has invalid column type: {0}")]
439-
SchemaKeyspacesInvalidColumnType(FromRowError),
439+
SchemaKeyspacesInvalidColumnType(TypeCheckError),
440440

441441
/// Bad keyspace replication strategy.
442442
#[error("Bad keyspace <{keyspace}> replication strategy: {error}")]
@@ -474,7 +474,7 @@ pub enum KeyspaceStrategyError {
474474
pub enum UdtMetadataError {
475475
/// system_schema.types has invalid column type.
476476
#[error("system_schema.types has invalid column type: {0}")]
477-
SchemaTypesInvalidColumnType(FromRowError),
477+
SchemaTypesInvalidColumnType(TypeCheckError),
478478

479479
/// Circular UDT dependency detected.
480480
#[error("Detected circular dependency between user defined types - toposort is impossible!")]
@@ -487,11 +487,11 @@ pub enum UdtMetadataError {
487487
pub enum TablesMetadataError {
488488
/// system_schema.tables has invalid column type.
489489
#[error("system_schema.tables has invalid column type: {0}")]
490-
SchemaTablesInvalidColumnType(FromRowError),
490+
SchemaTablesInvalidColumnType(TypeCheckError),
491491

492492
/// system_schema.columns has invalid column type.
493493
#[error("system_schema.columns has invalid column type: {0}")]
494-
SchemaColumnsInvalidColumnType(FromRowError),
494+
SchemaColumnsInvalidColumnType(TypeCheckError),
495495

496496
/// Unknown column kind.
497497
#[error("Unknown column kind '{column_kind}' for {keyspace_name}.{table_name}.{column_name}")]
@@ -509,7 +509,7 @@ pub enum TablesMetadataError {
509509
pub enum ViewsMetadataError {
510510
/// system_schema.views has invalid column type.
511511
#[error("system_schema.views has invalid column type: {0}")]
512-
SchemaViewsInvalidColumnType(FromRowError),
512+
SchemaViewsInvalidColumnType(TypeCheckError),
513513
}
514514

515515
/// Error caused by caller creating an invalid query

scylla/src/transport/topology.rs

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ use futures::stream::{self, StreamExt, TryStreamExt};
1313
use futures::Stream;
1414
use rand::seq::SliceRandom;
1515
use rand::{thread_rng, Rng};
16-
use scylla_cql::frame::response::result::Row;
17-
use scylla_macros::FromRow;
16+
use scylla_cql::frame::frame_errors::RowsParseError;
17+
use scylla_cql::types::deserialize::row::DeserializeRow;
18+
use scylla_cql::types::deserialize::TypeCheckError;
19+
use scylla_macros::DeserializeRow;
1820
use std::borrow::BorrowMut;
1921
use std::cell::Cell;
2022
use std::collections::HashMap;
@@ -765,11 +767,13 @@ async fn query_metadata(
765767
Ok(Metadata { peers, keyspaces })
766768
}
767769

768-
#[derive(FromRow)]
769-
#[scylla_crate = "scylla_cql"]
770+
#[derive(DeserializeRow)]
771+
#[scylla(crate = "scylla_cql")]
770772
struct NodeInfoRow {
771773
host_id: Option<Uuid>,
774+
#[scylla(rename = "rpc_address")]
772775
untranslated_ip_addr: IpAddr,
776+
#[scylla(rename = "data_center")]
773777
datacenter: Option<String>,
774778
rack: Option<String>,
775779
tokens: Option<Vec<String>>,
@@ -799,6 +803,13 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
799803
let peers_query_stream = conn
800804
.clone()
801805
.query_iter(peers_query)
806+
.map(|pager_res| {
807+
let pager = pager_res?;
808+
let rows_stream = pager
809+
.rows_stream::<NodeInfoRow>()
810+
.map_err(RowsParseError::from)?;
811+
Ok::<_, QueryError>(rows_stream)
812+
})
802813
.into_stream()
803814
.try_flatten()
804815
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));
@@ -809,6 +820,13 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
809820
let local_query_stream = conn
810821
.clone()
811822
.query_iter(local_query)
823+
.map(|pager_res| {
824+
let pager = pager_res?;
825+
let rows_stream = pager
826+
.rows_stream::<NodeInfoRow>()
827+
.map_err(RowsParseError::from)?;
828+
Ok::<_, QueryError>(rows_stream)
829+
})
812830
.into_stream()
813831
.try_flatten()
814832
.and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));
@@ -819,9 +837,8 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
819837
let local_address = SocketAddr::new(local_ip, connect_port);
820838

821839
let translated_peers_futures = untranslated_rows.map(|row_result| async {
822-
let (source, raw_row) = row_result?;
823-
match raw_row.into_typed() {
824-
Ok(row) => create_peer_from_row(source, row, local_address).await,
840+
match row_result {
841+
Ok((source, row)) => create_peer_from_row(source, row, local_address).await,
825842
Err(err) => {
826843
warn!(
827844
"system.peers or system.local has an invalid row, skipping it: {}",
@@ -905,15 +922,19 @@ async fn create_peer_from_row(
905922
}))
906923
}
907924

908-
fn query_filter_keyspace_name<'a>(
925+
fn query_filter_keyspace_name<'a, R>(
909926
conn: &Arc<Connection>,
910927
query_str: &'a str,
911928
keyspaces_to_fetch: &'a [String],
912-
) -> impl Stream<Item = Result<Row, QueryError>> + 'a {
929+
convert_typecheck_error: impl FnOnce(TypeCheckError) -> MetadataError + 'a,
930+
) -> impl Stream<Item = Result<R, QueryError>> + 'a
931+
where
932+
R: for<'r> DeserializeRow<'r, 'r> + 'static,
933+
{
913934
let conn = conn.clone();
914935

915936
let fut = async move {
916-
if keyspaces_to_fetch.is_empty() {
937+
let pager = if keyspaces_to_fetch.is_empty() {
917938
let mut query = Query::new(query_str);
918939
query.set_page_size(METADATA_QUERY_PAGE_SIZE);
919940

@@ -928,7 +949,11 @@ fn query_filter_keyspace_name<'a>(
928949
let prepared = conn.prepare(&query).await?;
929950
let serialized_values = prepared.serialize_values(&keyspaces)?;
930951
conn.execute_iter(prepared, serialized_values).await
931-
}
952+
}?;
953+
954+
let stream: super::iterator::TypedRowStream<R> =
955+
pager.rows_stream::<R>().map_err(convert_typecheck_error)?;
956+
Ok::<_, QueryError>(stream)
932957
};
933958
fut.into_stream().try_flatten()
934959
}
@@ -938,10 +963,15 @@ async fn query_keyspaces(
938963
keyspaces_to_fetch: &[String],
939964
fetch_schema: bool,
940965
) -> Result<HashMap<String, Keyspace>, QueryError> {
941-
let rows = query_filter_keyspace_name(
966+
let rows = query_filter_keyspace_name::<(String, HashMap<String, String>)>(
942967
conn,
943968
"select keyspace_name, replication from system_schema.keyspaces",
944969
keyspaces_to_fetch,
970+
|err| {
971+
MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType(
972+
err,
973+
))
974+
},
945975
);
946976

947977
let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema {
@@ -956,12 +986,7 @@ async fn query_keyspaces(
956986
};
957987

958988
rows.map(|row_result| {
959-
let row = row_result?;
960-
let (keyspace_name, strategy_map) = row.into_typed::<(String, _)>().map_err(|err| {
961-
MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType(
962-
err,
963-
))
964-
})?;
989+
let (keyspace_name, strategy_map) = row_result?;
965990

966991
let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| {
967992
MetadataError::Keyspaces(KeyspacesMetadataError::Strategy {
@@ -988,8 +1013,8 @@ async fn query_keyspaces(
9881013
.await
9891014
}
9901015

991-
#[derive(FromRow, Debug)]
992-
#[scylla_crate = "crate"]
1016+
#[derive(DeserializeRow, Debug)]
1017+
#[scylla(crate = "crate")]
9931018
struct UdtRow {
9941019
keyspace_name: String,
9951020
type_name: String,
@@ -1031,21 +1056,16 @@ async fn query_user_defined_types(
10311056
conn: &Arc<Connection>,
10321057
keyspaces_to_fetch: &[String],
10331058
) -> Result<HashMap<String, HashMap<String, Arc<UserDefinedType>>>, QueryError> {
1034-
let rows = query_filter_keyspace_name(
1059+
let rows = query_filter_keyspace_name::<UdtRow>(
10351060
conn,
10361061
"select keyspace_name, type_name, field_names, field_types from system_schema.types",
10371062
keyspaces_to_fetch,
1063+
|err| MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)),
10381064
);
10391065

10401066
let mut udt_rows: Vec<UdtRowWithParsedFieldTypes> = rows
10411067
.map(|row_result| {
1042-
let row = row_result?;
1043-
let udt_row = row
1044-
.into_typed::<UdtRow>()
1045-
.map_err(|err| {
1046-
MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err))
1047-
})?
1048-
.try_into()?;
1068+
let udt_row = row_result?.try_into()?;
10491069

10501070
Ok::<_, QueryError>(udt_row)
10511071
})
@@ -1355,21 +1375,17 @@ async fn query_tables(
13551375
keyspaces_to_fetch: &[String],
13561376
udts: &HashMap<String, HashMap<String, Arc<UserDefinedType>>>,
13571377
) -> Result<HashMap<String, HashMap<String, Table>>, QueryError> {
1358-
let rows = query_filter_keyspace_name(
1378+
let rows = query_filter_keyspace_name::<(String, String)>(
13591379
conn,
13601380
"SELECT keyspace_name, table_name FROM system_schema.tables",
13611381
keyspaces_to_fetch,
1382+
|err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)),
13621383
);
13631384
let mut result = HashMap::new();
13641385
let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?;
13651386

13661387
rows.map(|row_result| {
1367-
let row = row_result?;
1368-
let (keyspace_name, table_name) = row.into_typed().map_err(|err| {
1369-
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
1370-
})?;
1371-
1372-
let keyspace_and_table_name = (keyspace_name, table_name);
1388+
let keyspace_and_table_name = row_result?;
13731389

13741390
let table = tables.remove(&keyspace_and_table_name).unwrap_or(Table {
13751391
columns: HashMap::new(),
@@ -1396,20 +1412,18 @@ async fn query_views(
13961412
keyspaces_to_fetch: &[String],
13971413
udts: &HashMap<String, HashMap<String, Arc<UserDefinedType>>>,
13981414
) -> Result<HashMap<String, HashMap<String, MaterializedView>>, QueryError> {
1399-
let rows = query_filter_keyspace_name(
1415+
let rows = query_filter_keyspace_name::<(String, String, String)>(
14001416
conn,
14011417
"SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
14021418
keyspaces_to_fetch,
1419+
|err| MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)),
14031420
);
14041421

14051422
let mut result = HashMap::new();
14061423
let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?;
14071424

14081425
rows.map(|row_result| {
1409-
let row = row_result?;
1410-
let (keyspace_name, view_name, base_table_name) = row.into_typed().map_err(|err| {
1411-
MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err))
1412-
})?;
1426+
let (keyspace_name, view_name, base_table_name) = row_result?;
14131427

14141428
let keyspace_and_view_name = (keyspace_name, view_name);
14151429

@@ -1447,24 +1461,18 @@ async fn query_tables_schema(
14471461
// This column shouldn't be exposed to the user but is currently exposed in system tables.
14481462
const THRIFT_EMPTY_TYPE: &str = "empty";
14491463

1450-
let rows = query_filter_keyspace_name(conn,
1451-
"select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch
1464+
type RowType = (String, String, String, String, i32, String);
1465+
1466+
let rows = query_filter_keyspace_name::<RowType>(conn,
1467+
"select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch, |err| {
1468+
MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err))
1469+
}
14521470
);
14531471

14541472
let mut tables_schema = HashMap::new();
14551473

14561474
rows.map(|row_result| {
1457-
let row = row_result?;
1458-
let (keyspace_name, table_name, column_name, kind, position, type_): (
1459-
String,
1460-
String,
1461-
String,
1462-
String,
1463-
i32,
1464-
String,
1465-
) = row.into_typed().map_err(|err| {
1466-
MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err))
1467-
})?;
1475+
let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
14681476

14691477
if type_ == THRIFT_EMPTY_TYPE {
14701478
return Ok::<_, QueryError>(());
@@ -1674,15 +1682,21 @@ async fn query_table_partitioners(
16741682
let rows = conn
16751683
.clone()
16761684
.query_iter(partitioner_query)
1685+
.map(|pager_res| {
1686+
let pager = pager_res?;
1687+
let stream = pager
1688+
.rows_stream::<(String, String, Option<String>)>()
1689+
.map_err(|err| {
1690+
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
1691+
})?;
1692+
Ok::<_, QueryError>(stream)
1693+
})
16771694
.into_stream()
16781695
.try_flatten();
16791696

16801697
let result = rows
16811698
.map(|row_result| {
1682-
let (keyspace_name, table_name, partitioner) =
1683-
row_result?.into_typed().map_err(|err| {
1684-
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
1685-
})?;
1699+
let (keyspace_name, table_name, partitioner) = row_result?;
16861700
Ok::<_, QueryError>(((keyspace_name, table_name), partitioner))
16871701
})
16881702
.try_collect::<HashMap<_, _>>()

0 commit comments

Comments
 (0)