Skip to content

Commit 86efc40

Browse files
authored
Merge pull request #1204 from muzarski/inline-request-error
errors: final cleanups (restructure ExecutionError and introduce SchemaAgreementError)
2 parents 43f758f + 34afac7 commit 86efc40

File tree

13 files changed

+179
-262
lines changed

13 files changed

+179
-262
lines changed

examples/schema_agreement.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Result};
22
use futures::TryStreamExt as _;
33
use scylla::client::session::Session;
44
use scylla::client::session_builder::SessionBuilder;
5-
use scylla::errors::ExecutionError;
5+
use scylla::errors::SchemaAgreementError;
66
use std::env;
77
use std::time::Duration;
88

@@ -27,7 +27,9 @@ async fn main() -> Result<()> {
2727

2828
match session.await_schema_agreement().await {
2929
Ok(_schema_version) => println!("Schema is in agreement in time"),
30-
Err(ExecutionError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"),
30+
Err(SchemaAgreementError::Timeout(_)) => {
31+
println!("Schema is NOT in agreement in time")
32+
}
3133
Err(err) => bail!(err),
3234
};
3335
session

scylla-cql/src/frame/response/error.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ impl Error {
151151
/// An error sent from the database in response to a query
152152
/// as described in the [specification](https://github.com/apache/cassandra/blob/5ed5e84613ef0e9664a774493db7d2604e3596e0/doc/native_protocol_v4.spec#L1029)\
153153
#[derive(Error, Debug, Clone, PartialEq, Eq)]
154+
#[non_exhaustive]
154155
pub enum DbError {
155156
/// The submitted query has a syntax error
156157
#[error("The submitted query has a syntax error")]
@@ -386,6 +387,47 @@ impl DbError {
386387
} => protocol_features.rate_limit_error.unwrap(),
387388
}
388389
}
390+
391+
/// Decides whether the error can be ignored. If true, the driver can perform
392+
/// a speculative retry to the next target.
393+
pub fn can_speculative_retry(&self) -> bool {
394+
// Do not remove this lint!
395+
// It's there for a reason - we don't want new variants
396+
// automatically fall under `_` pattern when they are introduced.
397+
#[deny(clippy::wildcard_enum_match_arm)]
398+
match self {
399+
// Errors that will almost certainly appear on other nodes as well
400+
DbError::SyntaxError
401+
| DbError::Invalid
402+
| DbError::AlreadyExists { .. }
403+
| DbError::Unauthorized
404+
| DbError::ProtocolError => false,
405+
406+
// Errors that should not appear there - thus, should not be ignored.
407+
DbError::AuthenticationError | DbError::Other(_) => false,
408+
409+
// For now, let's assume that UDF failure is not transient - don't ignore it
410+
// TODO: investigate
411+
DbError::FunctionFailure { .. } => false,
412+
413+
// Not sure when these can appear - don't ignore them
414+
// TODO: Investigate these errors
415+
DbError::ConfigError | DbError::TruncateError => false,
416+
417+
// Errors that we can ignore and perform a retry on some other node
418+
DbError::Unavailable { .. }
419+
| DbError::Overloaded
420+
| DbError::IsBootstrapping
421+
| DbError::ReadTimeout { .. }
422+
| DbError::WriteTimeout { .. }
423+
| DbError::ReadFailure { .. }
424+
| DbError::WriteFailure { .. }
425+
// Preparation may succeed on some other node.
426+
| DbError::Unprepared { .. }
427+
| DbError::ServerError
428+
| DbError::RateLimitReached { .. } => true,
429+
}
430+
}
389431
}
390432

391433
/// Type of the operation rejected by rate limiting

scylla/src/client/session.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1515
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1616
use crate::errors::{
1717
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
18-
ProtocolError, RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
18+
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
1919
};
2020
use crate::frame::response::result;
2121
#[cfg(feature = "ssl")]
@@ -867,7 +867,9 @@ impl Session {
867867
.await?;
868868
if !paging_state_response.finished() {
869869
error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
870-
return Err(ProtocolError::NonfinishedPagingState.into());
870+
return Err(ExecutionError::LastAttemptError(
871+
RequestAttemptError::NonfinishedPagingState,
872+
));
871873
}
872874
Ok(result)
873875
}
@@ -988,9 +990,7 @@ impl Session {
988990
self.handle_set_keyspace_response(&response).await?;
989991
self.handle_auto_await_schema_agreement(&response).await?;
990992

991-
let (result, paging_state_response) = response
992-
.into_query_result_and_paging_state()
993-
.map_err(RequestAttemptError::into_execution_error)?;
993+
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
994994
span.record_result_fields(&result);
995995

996996
Ok((result, paging_state_response))
@@ -1175,7 +1175,9 @@ impl Session {
11751175
.await?;
11761176
if !paging_state.finished() {
11771177
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1178-
return Err(ProtocolError::NonfinishedPagingState.into());
1178+
return Err(ExecutionError::LastAttemptError(
1179+
RequestAttemptError::NonfinishedPagingState,
1180+
));
11791181
}
11801182
Ok(result)
11811183
}
@@ -1295,9 +1297,7 @@ impl Session {
12951297
self.handle_set_keyspace_response(&response).await?;
12961298
self.handle_auto_await_schema_agreement(&response).await?;
12971299

1298-
let (result, paging_state_response) = response
1299-
.into_query_result_and_paging_state()
1300-
.map_err(RequestAttemptError::into_execution_error)?;
1300+
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
13011301
span.record_result_fields(&result);
13021302

13031303
Ok((result, paging_state_response))
@@ -1938,7 +1938,7 @@ impl Session {
19381938
last_error.map(Result::Err)
19391939
}
19401940

1941-
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, ExecutionError> {
1941+
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
19421942
loop {
19431943
tokio::time::sleep(self.schema_agreement_interval).await;
19441944
if let Some(agreed_version) = self.check_schema_agreement().await? {
@@ -1947,18 +1947,18 @@ impl Session {
19471947
}
19481948
}
19491949

1950-
pub async fn await_schema_agreement(&self) -> Result<Uuid, ExecutionError> {
1950+
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
19511951
timeout(
19521952
self.schema_agreement_timeout,
19531953
self.await_schema_agreement_indefinitely(),
19541954
)
19551955
.await
1956-
.unwrap_or(Err(ExecutionError::SchemaAgreementTimeout(
1956+
.unwrap_or(Err(SchemaAgreementError::Timeout(
19571957
self.schema_agreement_timeout,
19581958
)))
19591959
}
19601960

1961-
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, ExecutionError> {
1961+
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
19621962
let cluster_state = self.get_cluster_state();
19631963
let connections_iter = cluster_state.iter_working_connections()?;
19641964

scylla/src/client/session_test.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use crate::cluster::metadata::{
99
CollectionType, ColumnKind, ColumnType, NativeType, UserDefinedType,
1010
};
1111
use crate::deserialize::DeserializeOwnedValue;
12-
use crate::errors::{BadKeyspaceName, DbError, ExecutionError, UseKeyspaceError};
12+
use crate::errors::{
13+
BadKeyspaceName, DbError, ExecutionError, RequestAttemptError, UseKeyspaceError,
14+
};
1315
use crate::observability::tracing::TracingInfo;
1416
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
1517
use crate::prepared_statement::PreparedStatement;
@@ -948,15 +950,17 @@ async fn test_db_errors() {
948950
// SyntaxError on bad query
949951
assert!(matches!(
950952
session.query_unpaged("gibberish", &[]).await,
951-
Err(ExecutionError::DbError(DbError::SyntaxError, _))
953+
Err(ExecutionError::LastAttemptError(
954+
RequestAttemptError::DbError(DbError::SyntaxError, _)
955+
))
952956
));
953957

954958
// AlreadyExists when creating a keyspace for the second time
955959
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
956960

957961
let create_keyspace_res = session.ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await;
958962
let keyspace_exists_error: DbError = match create_keyspace_res {
959-
Err(ExecutionError::DbError(e, _)) => e,
963+
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
960964
_ => panic!("Second CREATE KEYSPACE didn't return an error!"),
961965
};
962966

@@ -981,7 +985,7 @@ async fn test_db_errors() {
981985
.ddl(format!("CREATE TABLE {}.tab (a text primary key)", ks))
982986
.await;
983987
let create_tab_error: DbError = match create_table_res {
984-
Err(ExecutionError::DbError(e, _)) => e,
988+
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
985989
_ => panic!("Second CREATE TABLE didn't return an error!"),
986990
};
987991

@@ -2621,7 +2625,10 @@ async fn test_rate_limit_exceeded_exception() {
26212625
use crate::errors::OperationType;
26222626

26232627
match maybe_err.expect("Rate limit error didn't occur") {
2624-
ExecutionError::DbError(DbError::RateLimitReached { op_type, .. }, _) => {
2628+
ExecutionError::LastAttemptError(RequestAttemptError::DbError(
2629+
DbError::RateLimitReached { op_type, .. },
2630+
_,
2631+
)) => {
26252632
assert_eq!(op_type, OperationType::Write);
26262633
}
26272634
err => panic!("Unexpected error type received: {:?}", err),

scylla/src/cluster/state.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::errors::{BadQuery, ConnectionPoolError};
1+
use crate::errors::ConnectionPoolError;
22
use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
33
use crate::policies::host_filter::HostFilter;
44
use crate::prepared_statement::TokenCalculationError;
@@ -204,7 +204,7 @@ impl ClusterState {
204204
keyspace: &str,
205205
table: &str,
206206
partition_key: &SerializedValues,
207-
) -> Result<Token, BadQuery> {
207+
) -> Result<Token, TokenCalculationError> {
208208
let partitioner = self
209209
.keyspaces
210210
.get(keyspace)
@@ -213,11 +213,7 @@ impl ClusterState {
213213
.and_then(PartitionerName::from_str)
214214
.unwrap_or_default();
215215

216-
calculate_token_for_partition_key(partition_key, &partitioner).map_err(|err| match err {
217-
TokenCalculationError::ValueTooLong(values_len) => {
218-
BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into())
219-
}
220-
})
216+
calculate_token_for_partition_key(partition_key, &partitioner)
221217
}
222218

223219
/// Access to replicas owning a given token
@@ -255,7 +251,7 @@ impl ClusterState {
255251
keyspace: &str,
256252
table: &str,
257253
partition_key: &SerializedValues,
258-
) -> Result<Vec<(Arc<Node>, Shard)>, BadQuery> {
254+
) -> Result<Vec<(Arc<Node>, Shard)>, TokenCalculationError> {
259255
let token = self.compute_token(keyspace, table, partition_key)?;
260256
Ok(self.get_token_endpoints(keyspace, table, token))
261257
}

0 commit comments

Comments
 (0)