Skip to content

Commit 0f33e3c

Browse files
authored
Merge pull request #516 from cvybhu/retry_retry_policy
Fix RetryPolicy not retrying on Response::Error
2 parents aa3addc + 50873fa commit 0f33e3c

File tree

3 files changed

+76
-10
lines changed

3 files changed

+76
-10
lines changed

scylla-cql/src/frame/response/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub mod event;
55
pub mod result;
66
pub mod supported;
77

8-
use crate::frame::frame_errors::ParseError;
8+
use crate::{errors::QueryError, frame::frame_errors::ParseError};
99
use num_enum::TryFromPrimitive;
1010

1111
pub use error::Error;
@@ -57,4 +57,29 @@ impl Response {
5757

5858
Ok(response)
5959
}
60+
61+
pub fn into_non_error_response(self) -> Result<NonErrorResponse, QueryError> {
62+
Ok(match self {
63+
Response::Error(err) => return Err(QueryError::from(err)),
64+
Response::Ready => NonErrorResponse::Ready,
65+
Response::Result(res) => NonErrorResponse::Result(res),
66+
Response::Authenticate(auth) => NonErrorResponse::Authenticate(auth),
67+
Response::AuthSuccess(auth_succ) => NonErrorResponse::AuthSuccess(auth_succ),
68+
Response::AuthChallenge(auth_chal) => NonErrorResponse::AuthChallenge(auth_chal),
69+
Response::Supported(sup) => NonErrorResponse::Supported(sup),
70+
Response::Event(eve) => NonErrorResponse::Event(eve),
71+
})
72+
}
73+
}
74+
75+
// A Response which can not be Response::Error
76+
#[derive(Debug)]
77+
pub enum NonErrorResponse {
78+
Ready,
79+
Result(result::Result),
80+
Authenticate(authenticate::Authenticate),
81+
AuthSuccess(authenticate::AuthSuccess),
82+
AuthChallenge(authenticate::AuthChallenge),
83+
Supported(Supported),
84+
Event(event::Event),
6085
}

scylla/src/transport/connection.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::batch::{Batch, BatchStatement};
3232
use crate::frame::{
3333
self,
3434
request::{self, batch, execute, query, register, Request},
35-
response::{event::Event, result, Response, ResponseOpcode},
35+
response::{event::Event, result, NonErrorResponse, Response, ResponseOpcode},
3636
server_event_type::EventType,
3737
value::{BatchValues, ValueList},
3838
FrameParams, SerializedRequest,
@@ -142,6 +142,13 @@ pub struct QueryResponse {
142142
pub warnings: Vec<String>,
143143
}
144144

145+
// A QueryResponse in which response can not be Response::Error
146+
pub struct NonErrorQueryResponse {
147+
pub response: NonErrorResponse,
148+
pub tracing_id: Option<Uuid>,
149+
pub warnings: Vec<String>,
150+
}
151+
145152
/// Result of Session::batch(). Contains no rows, only some useful information.
146153
pub struct BatchResult {
147154
/// Warnings returned by the database
@@ -151,29 +158,42 @@ pub struct BatchResult {
151158
}
152159

153160
impl QueryResponse {
161+
pub fn into_non_error_query_response(self) -> Result<NonErrorQueryResponse, QueryError> {
162+
Ok(NonErrorQueryResponse {
163+
response: self.response.into_non_error_response()?,
164+
tracing_id: self.tracing_id,
165+
warnings: self.warnings,
166+
})
167+
}
168+
169+
pub fn into_query_result(self) -> Result<QueryResult, QueryError> {
170+
self.into_non_error_query_response()?.into_query_result()
171+
}
172+
}
173+
174+
impl NonErrorQueryResponse {
154175
pub fn as_set_keyspace(&self) -> Option<&result::SetKeyspace> {
155176
match &self.response {
156-
Response::Result(result::Result::SetKeyspace(sk)) => Some(sk),
177+
NonErrorResponse::Result(result::Result::SetKeyspace(sk)) => Some(sk),
157178
_ => None,
158179
}
159180
}
160181

161182
pub fn as_schema_change(&self) -> Option<&result::SchemaChange> {
162183
match &self.response {
163-
Response::Result(result::Result::SchemaChange(sc)) => Some(sc),
184+
NonErrorResponse::Result(result::Result::SchemaChange(sc)) => Some(sc),
164185
_ => None,
165186
}
166187
}
167188

168189
pub fn into_query_result(self) -> Result<QueryResult, QueryError> {
169190
let (rows, paging_state, col_specs) = match self.response {
170-
Response::Error(err) => return Err(err.into()),
171-
Response::Result(result::Result::Rows(rs)) => (
191+
NonErrorResponse::Result(result::Result::Rows(rs)) => (
172192
Some(rs.rows),
173193
rs.metadata.paging_state,
174194
rs.metadata.col_specs,
175195
),
176-
Response::Result(_) => (None, None, vec![]),
196+
NonErrorResponse::Result(_) => (None, None, vec![]),
177197
_ => {
178198
return Err(QueryError::ProtocolError(
179199
"Unexpected server response, expected Result or Error",

scylla/src/transport/session.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tokio::time::timeout;
1414
use tracing::{debug, error, trace, trace_span, Instrument};
1515
use uuid::Uuid;
1616

17+
use super::connection::NonErrorQueryResponse;
1718
use super::connection::QueryResponse;
1819
use super::errors::{BadQuery, NewSessionError, QueryError};
1920
use crate::cql_to_rust::FromRow;
@@ -470,6 +471,7 @@ impl Session {
470471
connection
471472
.query(query_ref, values_ref, paging_state_ref.clone())
472473
.await
474+
.and_then(QueryResponse::into_non_error_query_response)
473475
}
474476
},
475477
)
@@ -484,7 +486,7 @@ impl Session {
484486

485487
async fn handle_set_keyspace_response(
486488
&self,
487-
response: &QueryResponse,
489+
response: &NonErrorQueryResponse,
488490
) -> Result<(), QueryError> {
489491
if let Some(set_keyspace) = response.as_set_keyspace() {
490492
debug!(
@@ -501,7 +503,7 @@ impl Session {
501503
async fn handle_auto_await_schema_agreement(
502504
&self,
503505
contents: &str,
504-
response: &QueryResponse,
506+
response: &NonErrorQueryResponse,
505507
) -> Result<(), QueryError> {
506508
if let Some(timeout) = self.auto_await_schema_agreement_timeout {
507509
if response.as_schema_change().is_some()
@@ -748,7 +750,7 @@ impl Session {
748750
"Request",
749751
prepared_id = format!("{:X}", prepared.get_id()).as_str()
750752
);
751-
let response = self
753+
let response: NonErrorQueryResponse = self
752754
.run_query(
753755
statement_info,
754756
&prepared.config,
@@ -762,6 +764,7 @@ impl Session {
762764
connection
763765
.execute(prepared, values_ref, paging_state_ref.clone())
764766
.await
767+
.and_then(QueryResponse::into_non_error_query_response)
765768
},
766769
)
767770
.instrument(span)
@@ -1104,6 +1107,7 @@ impl Session {
11041107
where
11051108
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
11061109
QueryFut: Future<Output = Result<ResT, QueryError>>,
1110+
ResT: AllowedRunQueryResTType,
11071111
{
11081112
let cluster_data = self.cluster.get_data();
11091113
let query_plan = self.load_balancer.plan(&statement_info, &cluster_data);
@@ -1196,6 +1200,7 @@ impl Session {
11961200
where
11971201
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
11981202
QueryFut: Future<Output = Result<ResT, QueryError>>,
1203+
ResT: AllowedRunQueryResTType,
11991204
{
12001205
let mut last_error: Option<QueryError> = None;
12011206

@@ -1303,6 +1308,7 @@ impl Session {
13031308
) -> Result<ResT, QueryError>
13041309
where
13051310
QueryFut: Future<Output = Result<ResT, QueryError>>,
1311+
ResT: AllowedRunQueryResTType,
13061312
{
13071313
let info = Statement::default();
13081314
let config = StatementConfig {
@@ -1395,3 +1401,18 @@ async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, NewSessionError>
13951401

13961402
ret.ok_or(failed_err)
13971403
}
1404+
1405+
// run_query, execute_query, etc have a template type called ResT.
1406+
// There was a bug where ResT was set to QueryResponse, which could
1407+
// be an error response. This was not caught by retry policy which
1408+
// assumed all errors would come from analyzing Result<ResT, QueryError>.
1409+
// This trait is a guard to make sure that this mistake doesn't
1410+
// happen again.
1411+
// When using run_query make sure that the ResT type is NOT able
1412+
// to contain any errors.
1413+
// See https://github.com/scylladb/scylla-rust-driver/issues/501
1414+
pub trait AllowedRunQueryResTType {}
1415+
1416+
impl AllowedRunQueryResTType for Uuid {}
1417+
impl AllowedRunQueryResTType for BatchResult {}
1418+
impl AllowedRunQueryResTType for NonErrorQueryResponse {}

0 commit comments

Comments
 (0)