Skip to content

Commit 253c514

Browse files
authored
Merge pull request #1191 from muzarski/query-pager-public-error
errors: Narrow return error type of Session::[query/execute]_iter
2 parents e23e316 + 6744694 commit 253c514

File tree

5 files changed

+37
-28
lines changed

5 files changed

+37
-28
lines changed

scylla/src/client/caching_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::batch::{Batch, BatchStatement};
2-
use crate::errors::{ExecutionError, PrepareError};
2+
use crate::errors::{ExecutionError, PagerExecutionError, PrepareError};
33
use crate::prepared_statement::PreparedStatement;
44
use crate::query::Query;
55
use crate::response::query_result::QueryResult;
@@ -108,7 +108,7 @@ where
108108
&self,
109109
query: impl Into<Query>,
110110
values: impl SerializeRow,
111-
) -> Result<QueryPager, ExecutionError> {
111+
) -> Result<QueryPager, PagerExecutionError> {
112112
let query = query.into();
113113
let prepared = self.add_prepared_statement_owned(query).await?;
114114
self.session.execute_iter(prepared, values).await

scylla/src/client/pager.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ impl QueryPager {
664664
execution_profile: Arc<ExecutionProfileInner>,
665665
cluster_state: Arc<ClusterState>,
666666
metrics: Arc<Metrics>,
667-
) -> Result<Self, NextRowError> {
667+
) -> Result<Self, NextPageError> {
668668
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
669669

670670
let consistency = query
@@ -743,7 +743,7 @@ impl QueryPager {
743743

744744
pub(crate) async fn new_for_prepared_statement(
745745
config: PreparedIteratorConfig,
746-
) -> Result<Self, NextRowError> {
746+
) -> Result<Self, NextPageError> {
747747
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
748748

749749
let consistency = config
@@ -866,7 +866,7 @@ impl QueryPager {
866866
connection: Arc<Connection>,
867867
consistency: Consistency,
868868
serial_consistency: Option<SerialConsistency>,
869-
) -> Result<Self, NextRowError> {
869+
) -> Result<Self, NextPageError> {
870870
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
871871

872872
let page_size = query.get_validated_page_size();
@@ -896,7 +896,7 @@ impl QueryPager {
896896
connection: Arc<Connection>,
897897
consistency: Consistency,
898898
serial_consistency: Option<SerialConsistency>,
899-
) -> Result<Self, NextRowError> {
899+
) -> Result<Self, NextPageError> {
900900
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
901901

902902
let page_size = prepared.get_validated_page_size();
@@ -924,7 +924,7 @@ impl QueryPager {
924924
async fn new_from_worker_future(
925925
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
926926
mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
927-
) -> Result<Self, NextRowError> {
927+
) -> Result<Self, NextPageError> {
928928
tokio::task::spawn(worker_task);
929929

930930
// This unwrap is safe because:
@@ -933,10 +933,7 @@ impl QueryPager {
933933
// - That future is polled in a tokio::task which isn't going to be
934934
// cancelled
935935
let page_received = receiver.recv().await.unwrap()?;
936-
let raw_rows_with_deserialized_metadata =
937-
page_received.rows.deserialize_metadata().map_err(|err| {
938-
NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
939-
})?;
936+
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
940937

941938
Ok(Self {
942939
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),

scylla/src/client/session.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use crate::cluster::node::CloudEndpoint;
1414
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1515
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1616
use crate::errors::{
17-
BadQuery, ExecutionError, MetadataError, NewSessionError, PrepareError, ProtocolError,
18-
RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
17+
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
18+
ProtocolError, RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
1919
};
2020
use crate::frame::response::result;
2121
#[cfg(feature = "ssl")]
@@ -511,7 +511,7 @@ impl Session {
511511
&self,
512512
query: impl Into<Query>,
513513
values: impl SerializeRow,
514-
) -> Result<QueryPager, ExecutionError> {
514+
) -> Result<QueryPager, PagerExecutionError> {
515515
self.do_query_iter(query.into(), values).await
516516
}
517517

@@ -675,7 +675,7 @@ impl Session {
675675
&self,
676676
prepared: impl Into<PreparedStatement>,
677677
values: impl SerializeRow,
678-
) -> Result<QueryPager, ExecutionError> {
678+
) -> Result<QueryPager, PagerExecutionError> {
679679
self.do_execute_iter(prepared.into(), values).await
680680
}
681681

@@ -1035,7 +1035,7 @@ impl Session {
10351035
&self,
10361036
query: Query,
10371037
values: impl SerializeRow,
1038-
) -> Result<QueryPager, ExecutionError> {
1038+
) -> Result<QueryPager, PagerExecutionError> {
10391039
let execution_profile = query
10401040
.get_execution_profile_handle()
10411041
.unwrap_or_else(|| self.get_default_execution_profile_handle())
@@ -1049,7 +1049,7 @@ impl Session {
10491049
self.metrics.clone(),
10501050
)
10511051
.await
1052-
.map_err(ExecutionError::from)
1052+
.map_err(PagerExecutionError::NextPageError)
10531053
} else {
10541054
// Making QueryPager::new_for_query work with values is too hard (if even possible)
10551055
// so instead of sending one prepare to a specific connection on each iterator query,
@@ -1064,7 +1064,7 @@ impl Session {
10641064
metrics: self.metrics.clone(),
10651065
})
10661066
.await
1067-
.map_err(ExecutionError::from)
1067+
.map_err(PagerExecutionError::NextPageError)
10681068
}
10691069
}
10701070

@@ -1307,7 +1307,7 @@ impl Session {
13071307
&self,
13081308
prepared: PreparedStatement,
13091309
values: impl SerializeRow,
1310-
) -> Result<QueryPager, ExecutionError> {
1310+
) -> Result<QueryPager, PagerExecutionError> {
13111311
let serialized_values = prepared.serialize_values(&values)?;
13121312

13131313
let execution_profile = prepared
@@ -1323,7 +1323,7 @@ impl Session {
13231323
metrics: self.metrics.clone(),
13241324
})
13251325
.await
1326-
.map_err(ExecutionError::from)
1326+
.map_err(PagerExecutionError::NextPageError)
13271327
}
13281328

13291329
async fn do_batch(

scylla/src/errors.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use scylla_cql::{
2929

3030
use thiserror::Error;
3131

32+
use crate::client::pager::NextPageError;
3233
use crate::{authentication::AuthError, frame::response};
3334

3435
use crate::client::pager::NextRowError;
@@ -110,14 +111,6 @@ pub enum ExecutionError {
110111
/// 'USE KEYSPACE <>' request failed.
111112
#[error("'USE KEYSPACE <>' request failed: {0}")]
112113
UseKeyspaceError(#[from] UseKeyspaceError),
113-
114-
// TODO: This should not belong here, but it requires changes to error types
115-
// returned in async iterator API. This should be handled in separate PR.
116-
// The reason this needs to be included is that topology.rs makes use of iter API and returns ExecutionError.
117-
// Once iter API is adjusted, we can then adjust errors returned by topology module (e.g. refactor MetadataError and not include it in ExecutionError).
118-
/// An error occurred during async iteration over rows of result.
119-
#[error("An error occurred during async iteration over rows of result: {0}")]
120-
NextRowError(#[from] NextRowError),
121114
}
122115

123116
impl From<SerializationError> for ExecutionError {
@@ -151,6 +144,23 @@ pub enum PrepareError {
151144
PreparedStatementIdsMismatch,
152145
}
153146

147+
/// An error that occurred during construction of [`QueryPager`][crate::client::pager::QueryPager].
148+
#[derive(Error, Debug, Clone)]
149+
#[non_exhaustive]
150+
pub enum PagerExecutionError {
151+
/// Failed to prepare the statement.
152+
#[error("Failed to prepare the statement to be used by the pager: {0}")]
153+
PrepareError(#[from] PrepareError),
154+
155+
/// Failed to serialize statement parameters.
156+
#[error("Failed to serialize statement parameters: {0}")]
157+
SerializationError(#[from] SerializationError),
158+
159+
/// Failed to fetch the first page of the result.
160+
#[error("Failed to fetch the first page of the result: {0}")]
161+
NextPageError(#[from] NextPageError),
162+
}
163+
154164
/// Error that occurred during session creation
155165
#[derive(Error, Debug, Clone)]
156166
#[non_exhaustive]

scylla/src/network/connection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,7 @@ impl Connection {
10171017

10181018
QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
10191019
.await
1020+
.map_err(NextRowError::NextPageError)
10201021
}
10211022

10221023
/// Executes a prepared statements and fetches its results over multiple pages, using
@@ -1039,6 +1040,7 @@ impl Connection {
10391040
serial_consistency,
10401041
)
10411042
.await
1043+
.map_err(NextRowError::NextPageError)
10421044
}
10431045

10441046
#[allow(dead_code)]

0 commit comments

Comments
 (0)