Skip to content

Commit 96dfa58

Browse files
committed
session: Use NonErrorQueryResponse in run_query
run_query takes as a paramter a closure used to run a single query ona connection. Result of such a query is Result<ResT, QueryResult>. In Case of Result::Err the retry policy decides whether to retry the query based on the error type. There was a bug - in a few cases ResT was of type QueryResponse, which has an Error variant. Such errors weren't detected by RetryPolicy and were never retried. This commit fixes the bug by making the closure return NonErrorQueryResponse. All errors will be collected in a single QueryResult and detected by RetryPolicy. Fixes: #501 Signed-off-by: Jan Ciolek <[email protected]>
1 parent d199c74 commit 96dfa58

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

scylla/src/transport/connection.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,6 @@ pub struct BatchResult {
158158
}
159159

160160
impl QueryResponse {
161-
pub fn as_set_keyspace(&self) -> Option<&result::SetKeyspace> {
162-
match &self.response {
163-
Response::Result(result::Result::SetKeyspace(sk)) => Some(sk),
164-
_ => None,
165-
}
166-
}
167-
168-
pub fn as_schema_change(&self) -> Option<&result::SchemaChange> {
169-
match &self.response {
170-
Response::Result(result::Result::SchemaChange(sc)) => Some(sc),
171-
_ => None,
172-
}
173-
}
174-
175161
pub fn into_non_error_query_response(self) -> Result<NonErrorQueryResponse, QueryError> {
176162
Ok(NonErrorQueryResponse {
177163
response: self.response.into_non_error_response()?,
@@ -186,6 +172,20 @@ impl QueryResponse {
186172
}
187173

188174
impl NonErrorQueryResponse {
175+
pub fn as_set_keyspace(&self) -> Option<&result::SetKeyspace> {
176+
match &self.response {
177+
NonErrorResponse::Result(result::Result::SetKeyspace(sk)) => Some(sk),
178+
_ => None,
179+
}
180+
}
181+
182+
pub fn as_schema_change(&self) -> Option<&result::SchemaChange> {
183+
match &self.response {
184+
NonErrorResponse::Result(result::Result::SchemaChange(sc)) => Some(sc),
185+
_ => None,
186+
}
187+
}
188+
189189
pub fn into_query_result(self) -> Result<QueryResult, QueryError> {
190190
let (rows, paging_state, col_specs) = match self.response {
191191
NonErrorResponse::Result(result::Result::Rows(rs)) => (

scylla/src/transport/session.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tokio::time::timeout;
1414
use tracing::{debug, error, trace, trace_span, Instrument};
1515
use uuid::Uuid;
1616

17+
use super::connection::NonErrorQueryResponse;
1718
use super::connection::QueryResponse;
1819
use super::errors::{BadQuery, NewSessionError, QueryError};
1920
use crate::cql_to_rust::FromRow;
@@ -447,6 +448,7 @@ impl Session {
447448
connection
448449
.query(query_ref, values_ref, paging_state_ref.clone())
449450
.await
451+
.and_then(QueryResponse::into_non_error_query_response)
450452
}
451453
},
452454
)
@@ -461,7 +463,7 @@ impl Session {
461463

462464
async fn handle_set_keyspace_response(
463465
&self,
464-
response: &QueryResponse,
466+
response: &NonErrorQueryResponse,
465467
) -> Result<(), QueryError> {
466468
if let Some(set_keyspace) = response.as_set_keyspace() {
467469
debug!(
@@ -478,7 +480,7 @@ impl Session {
478480
async fn handle_auto_await_schema_agreement(
479481
&self,
480482
contents: &str,
481-
response: &QueryResponse,
483+
response: &NonErrorQueryResponse,
482484
) -> Result<(), QueryError> {
483485
if let Some(timeout) = self.auto_await_schema_agreement_timeout {
484486
if response.as_schema_change().is_some()
@@ -725,7 +727,7 @@ impl Session {
725727
"Request",
726728
prepared_id = format!("{:X}", prepared.get_id()).as_str()
727729
);
728-
let response = self
730+
let response: NonErrorQueryResponse = self
729731
.run_query(
730732
statement_info,
731733
&prepared.config,
@@ -739,6 +741,7 @@ impl Session {
739741
connection
740742
.execute(prepared, values_ref, paging_state_ref.clone())
741743
.await
744+
.and_then(QueryResponse::into_non_error_query_response)
742745
},
743746
)
744747
.instrument(span)

0 commit comments

Comments
 (0)