Skip to content

Commit 7a11683

Browse files
committed
errors: re-introduce RequestError
Introduced "new" error type and adjusted session.rs, speculative_execution module and iterator module to this type. This error represents a definite request failure (after potential retries).
1 parent 4a9367c commit 7a11683

File tree

4 files changed

+132
-84
lines changed

4 files changed

+132
-84
lines changed

scylla/src/client/pager.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::cluster::{ClusterState, NodeRef};
2929
#[allow(deprecated)]
3030
use crate::cql_to_rust::{FromRow, FromRowError};
3131
use crate::deserialize::DeserializeOwnedRow;
32-
use crate::errors::ProtocolError;
32+
use crate::errors::{ProtocolError, RequestError};
3333
use crate::errors::{QueryError, RequestAttemptError};
3434
use crate::frame::response::result;
3535
use crate::network::Connection;
@@ -168,7 +168,7 @@ where
168168
let query_plan =
169169
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);
170170

171-
let mut last_error: QueryError = QueryError::EmptyPlan;
171+
let mut last_error: RequestError = RequestError::EmptyPlan;
172172
let mut current_consistency: Consistency = self.query_consistency;
173173

174174
self.log_query_start();
@@ -235,8 +235,12 @@ where
235235
retry_decision = ?retry_decision
236236
);
237237

238-
last_error = request_error.into_query_error();
239-
self.log_attempt_error(&last_error, &retry_decision);
238+
// TODO: This is a temporary measure. Will be able to remove it later in this PR
239+
// once I narrow the error type in history module.
240+
let q_error: QueryError = request_error.clone().into_query_error();
241+
self.log_attempt_error(&q_error, &retry_decision);
242+
243+
last_error = request_error.into();
240244

241245
match retry_decision {
242246
RetryDecision::RetrySameNode(cl) => {
@@ -266,8 +270,9 @@ where
266270
}
267271

268272
// Send last_error to QueryPager - query failed fully
269-
self.log_query_error(&last_error);
270-
let (proof, _) = self.sender.send(Err(last_error)).await;
273+
let q_error = last_error.into_query_error();
274+
self.log_query_error(&q_error);
275+
let (proof, _) = self.sender.send(Err(q_error)).await;
271276
proof
272277
}
273278

scylla/src/client/session.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use crate::cluster::node::CloudEndpoint;
1515
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1616
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1717
use crate::errors::{
18-
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
18+
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, RequestError,
19+
TracingProtocolError,
1920
};
2021
use crate::frame::response::result;
2122
#[cfg(feature = "ssl")]
@@ -1966,7 +1967,7 @@ where
19661967
},
19671968
)
19681969
.await
1969-
.unwrap_or(Err(QueryError::EmptyPlan))
1970+
.unwrap_or(Err(RequestError::EmptyPlan))
19701971
}
19711972
}
19721973
};
@@ -1977,14 +1978,15 @@ where
19771978
let result = match effective_timeout {
19781979
Some(timeout) => tokio::time::timeout(timeout, runner)
19791980
.await
1981+
.map(|res| res.map_err(RequestError::into_query_error))
19801982
.unwrap_or_else(|e| {
19811983
Err(QueryError::RequestTimeout(format!(
19821984
"Request took longer than {}ms: {}",
19831985
timeout.as_millis(),
19841986
e
19851987
)))
19861988
}),
1987-
None => runner.await,
1989+
None => runner.await.map_err(RequestError::into_query_error),
19881990
};
19891991

19901992
if let Some((history_listener, query_id)) = history_listener_and_id {
@@ -2008,12 +2010,12 @@ where
20082010
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
20092011
execution_profile: &ExecutionProfileInner,
20102012
mut context: ExecuteRequestContext<'a>,
2011-
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
2013+
) -> Option<Result<RunRequestResult<ResT>, RequestError>>
20122014
where
20132015
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
20142016
ResT: AllowedRunRequestResTType,
20152017
{
2016-
let mut last_error: Option<QueryError> = None;
2018+
let mut last_error: Option<RequestError> = None;
20172019
let mut current_consistency: Consistency = context
20182020
.consistency_set_on_statement
20192021
.unwrap_or(execution_profile.consistency);
@@ -2097,12 +2099,12 @@ where
20972099
retry_decision = ?retry_decision
20982100
);
20992101

2100-
last_error = Some(request_error.into_query_error());
2101-
context.log_attempt_error(
2102-
&attempt_id,
2103-
last_error.as_ref().unwrap(),
2104-
&retry_decision,
2105-
);
2102+
// TODO: This is a temporary measure. Will be able to remove it later in this PR
2103+
// once I narrow the error type in history module.
2104+
let q_error: QueryError = request_error.clone().into_query_error();
2105+
context.log_attempt_error(&attempt_id, &q_error, &retry_decision);
2106+
2107+
last_error = Some(request_error.into());
21062108

21072109
match retry_decision {
21082110
RetryDecision::RetrySameNode(new_cl) => {

scylla/src/errors.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,47 @@ pub enum CqlEventHandlingError {
871871
SendError,
872872
}
873873

874-
/// An error type that occurred during single attempt of:
874+
/// An error that occurred during execution of
875+
/// - `QUERY`
876+
/// - `PREPARE`
877+
/// - `EXECUTE`
878+
/// - `BATCH`
879+
///
880+
/// request. This error represents a definite request failure, unlike
881+
/// [`RequestAttemptError`] which represents a failure of a single
882+
/// attempt.
883+
#[derive(Error, Debug, Clone)]
884+
#[non_exhaustive]
885+
pub enum RequestError {
886+
/// Load balancing policy returned an empty plan.
887+
#[error(
888+
"Load balancing policy returned an empty plan.\
889+
First thing to investigate should be the logic of custom LBP implementation.\
890+
If you think that your LBP implementation is correct, or you make use of `DefaultPolicy`,\
891+
then this is most probably a driver bug!"
892+
)]
893+
EmptyPlan,
894+
895+
/// Selected node's connection pool is in invalid state.
896+
#[error("No connections in the pool: {0}")]
897+
ConnectionPoolError(#[from] ConnectionPoolError),
898+
899+
/// Failed to execute request.
900+
#[error(transparent)]
901+
LastAttemptError(#[from] RequestAttemptError),
902+
}
903+
904+
impl RequestError {
905+
pub fn into_query_error(self) -> QueryError {
906+
match self {
907+
RequestError::EmptyPlan => QueryError::EmptyPlan,
908+
RequestError::ConnectionPoolError(e) => e.into(),
909+
RequestError::LastAttemptError(e) => e.into_query_error(),
910+
}
911+
}
912+
}
913+
914+
/// An error that occurred during a single attempt of:
875915
/// - `QUERY`
876916
/// - `PREPARE`
877917
/// - `EXECUTE`

scylla/src/policies/speculative_execution.rs

Lines changed: 67 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use scylla_cql::frame::response::error::DbError;
66
use std::{future::Future, sync::Arc, time::Duration};
77
use tracing::{trace_span, warn, Instrument};
88

9-
use crate::errors::QueryError;
9+
use crate::errors::{RequestAttemptError, RequestError};
1010
use crate::observability::metrics::Metrics;
1111

1212
/// Context is passed as an argument to `SpeculativeExecutionPolicy` methods
@@ -85,94 +85,95 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
8585
///
8686
/// We should ignore errors such that their presence when executing the request
8787
/// on one node, does not imply that the same error will appear during retry on some other node.
88-
fn can_be_ignored<ResT>(result: &Result<ResT, QueryError>) -> bool {
88+
fn can_be_ignored<ResT>(result: &Result<ResT, RequestError>) -> bool {
8989
match result {
9090
Ok(_) => false,
9191
// Do not remove this lint!
9292
// It's there for a reason - we don't want new variants
9393
// automatically fall under `_` pattern when they are introduced.
9494
#[deny(clippy::wildcard_enum_match_arm)]
9595
Err(e) => match e {
96-
// Errors that will almost certainly appear for other nodes as well
97-
QueryError::BadQuery(_)
98-
| QueryError::CqlRequestSerialization(_)
99-
| QueryError::BodyExtensionsParseError(_)
100-
| QueryError::CqlResultParseError(_)
101-
| QueryError::CqlErrorParseError(_)
102-
| QueryError::ProtocolError(_) => false,
103-
104-
// EmptyPlan is not returned by `Session::execute_query`.
105-
// It is represented by None, which is then transformed
106-
// to QueryError::EmptyPlan by the caller
107-
// (either here is speculative_execution module, or for non-speculative execution).
108-
// I believe this should not be ignored, since we do not expect it here.
109-
QueryError::EmptyPlan => false,
110-
111-
// Errors that should not appear here, thus should not be ignored
112-
#[allow(deprecated)]
113-
QueryError::NextRowError(_)
114-
| QueryError::IntoLegacyQueryResultError(_)
115-
| QueryError::TimeoutError
116-
| QueryError::RequestTimeout(_)
117-
| QueryError::MetadataError(_) => false,
118-
119-
// Errors that can be ignored
120-
QueryError::BrokenConnection(_)
121-
| QueryError::UnableToAllocStreamId
122-
| QueryError::ConnectionPoolError(_) => true,
123-
124-
// Handle DbErrors
125-
QueryError::DbError(db_error, _) => {
96+
// This error should not appear it. Anyway, if it possibly could
97+
// in the future, it should not be ignored.
98+
RequestError::EmptyPlan => false,
99+
100+
// Can try on another node.
101+
RequestError::ConnectionPoolError { .. } => true,
102+
103+
RequestError::LastAttemptError(e) => {
126104
// Do not remove this lint!
127105
// It's there for a reason - we don't want new variants
128106
// automatically fall under `_` pattern when they are introduced.
129107
#[deny(clippy::wildcard_enum_match_arm)]
130-
match db_error {
131-
// Errors that will almost certainly appear on other nodes as well
132-
DbError::SyntaxError
133-
| DbError::Invalid
134-
| DbError::AlreadyExists { .. }
135-
| DbError::Unauthorized
136-
| DbError::ProtocolError => false,
137-
138-
// Errors that should not appear there - thus, should not be ignored.
139-
DbError::AuthenticationError | DbError::Other(_) => false,
140-
141-
// For now, let's assume that UDF failure is not transient - don't ignore it
142-
// TODO: investigate
143-
DbError::FunctionFailure { .. } => false,
144-
145-
// Not sure when these can appear - don't ignore them
146-
// TODO: Investigate these errors
147-
DbError::ConfigError | DbError::TruncateError => false,
148-
149-
// Errors that we can ignore and perform a retry on some other node
150-
DbError::Unavailable { .. }
151-
| DbError::Overloaded
152-
| DbError::IsBootstrapping
153-
| DbError::ReadTimeout { .. }
154-
| DbError::WriteTimeout { .. }
155-
| DbError::ReadFailure { .. }
156-
| DbError::WriteFailure { .. }
157-
// Preparation may succeed on some other node.
158-
| DbError::Unprepared { .. }
159-
| DbError::ServerError
160-
| DbError::RateLimitReached { .. } => true,
108+
match e {
109+
// Errors that will almost certainly appear for other nodes as well
110+
RequestAttemptError::SerializationError(_)
111+
| RequestAttemptError::CqlRequestSerialization(_)
112+
| RequestAttemptError::BodyExtensionsParseError(_)
113+
| RequestAttemptError::CqlResultParseError(_)
114+
| RequestAttemptError::CqlErrorParseError(_)
115+
| RequestAttemptError::UnexpectedResponse(_)
116+
| RequestAttemptError::RepreparedIdChanged { .. }
117+
| RequestAttemptError::RepreparedIdMissingInBatch => false,
118+
119+
// Errors that can be ignored
120+
RequestAttemptError::BrokenConnectionError(_)
121+
| RequestAttemptError::UnableToAllocStreamId => true,
122+
123+
// Handle DbErrors
124+
RequestAttemptError::DbError(db_error, _) => {
125+
// Do not remove this lint!
126+
// It's there for a reason - we don't want new variants
127+
// automatically fall under `_` pattern when they are introduced.
128+
#[deny(clippy::wildcard_enum_match_arm)]
129+
match db_error {
130+
// Errors that will almost certainly appear on other nodes as well
131+
DbError::SyntaxError
132+
| DbError::Invalid
133+
| DbError::AlreadyExists { .. }
134+
| DbError::Unauthorized
135+
| DbError::ProtocolError => false,
136+
137+
// Errors that should not appear there - thus, should not be ignored.
138+
DbError::AuthenticationError | DbError::Other(_) => false,
139+
140+
// For now, let's assume that UDF failure is not transient - don't ignore it
141+
// TODO: investigate
142+
DbError::FunctionFailure { .. } => false,
143+
144+
// Not sure when these can appear - don't ignore them
145+
// TODO: Investigate these errors
146+
DbError::ConfigError | DbError::TruncateError => false,
147+
148+
// Errors that we can ignore and perform a retry on some other node
149+
DbError::Unavailable { .. }
150+
| DbError::Overloaded
151+
| DbError::IsBootstrapping
152+
| DbError::ReadTimeout { .. }
153+
| DbError::WriteTimeout { .. }
154+
| DbError::ReadFailure { .. }
155+
| DbError::WriteFailure { .. }
156+
// Preparation may succeed on some other node.
157+
| DbError::Unprepared { .. }
158+
| DbError::ServerError
159+
| DbError::RateLimitReached { .. } => true,
160+
}
161161
}
162+
}
162163
}
163164
},
164165
}
165166
}
166167

167-
const EMPTY_PLAN_ERROR: QueryError = QueryError::EmptyPlan;
168+
const EMPTY_PLAN_ERROR: RequestError = RequestError::EmptyPlan;
168169

169170
pub(crate) async fn execute<QueryFut, ResT>(
170171
policy: &dyn SpeculativeExecutionPolicy,
171172
context: &Context,
172173
query_runner_generator: impl Fn(bool) -> QueryFut,
173-
) -> Result<ResT, QueryError>
174+
) -> Result<ResT, RequestError>
174175
where
175-
QueryFut: Future<Output = Option<Result<ResT, QueryError>>>,
176+
QueryFut: Future<Output = Option<Result<ResT, RequestError>>>,
176177
{
177178
let mut retries_remaining = policy.max_retry_count(context);
178179
let retry_interval = policy.retry_interval(context);

0 commit comments

Comments
 (0)