Skip to content

Commit c3a1a51

Browse files
committed
Added RetryDecision::IgnoreWriteError
Such variant may be useful to quietly consider a write operation successful in case it wasn't replicated on enough number of nodes that the consistency level would require. DowngradingConsistencyPolicy will rely on such behaviour.
1 parent a9e5448 commit c3a1a51

File tree

3 files changed

+66
-16
lines changed

3 files changed

+66
-16
lines changed

scylla/src/transport/iterator.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::transport::load_balancing::{LoadBalancingPolicy, Statement};
3333
use crate::transport::metrics::Metrics;
3434
use crate::transport::node::Node;
3535
use crate::transport::retry_policy::{QueryInfo, RetryDecision, RetrySession};
36-
use tracing::{trace, trace_span, Instrument};
36+
use tracing::{trace, trace_span, warn, Instrument};
3737
use uuid::Uuid;
3838

3939
// #424
@@ -375,6 +375,10 @@ where
375375
continue 'nodes_in_plan;
376376
}
377377
RetryDecision::DontRetry => break 'nodes_in_plan,
378+
RetryDecision::IgnoreWriteError => {
379+
warn!("Ignoring error during fetching pages; stopping fetching.");
380+
return;
381+
}
378382
};
379383
}
380384
}

scylla/src/transport/retry_policy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub enum RetryDecision {
2222
RetrySameNode(Consistency),
2323
RetryNextNode(Consistency),
2424
DontRetry,
25+
IgnoreWriteError,
2526
}
2627

2728
/// Specifies a policy used to decide when to retry a query

scylla/src/transport/session.rs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::frame::types::LegacyConsistency;
55
use bytes::Bytes;
66
use futures::future::join_all;
77
use futures::future::try_join_all;
8+
use scylla_cql::frame::response::NonErrorResponse;
89
use std::future::Future;
910
use std::net::SocketAddr;
1011
use std::sync::Arc;
@@ -316,6 +317,11 @@ impl<RowT: FromRow> Iterator for TypedRowIter<RowT> {
316317
}
317318
}
318319

320+
pub enum RunQueryResult<ResT> {
321+
IgnoredWriteError,
322+
Completed(ResT),
323+
}
324+
319325
/// Represents a CQL session, which can be used to communicate
320326
/// with the database
321327
impl Session {
@@ -463,7 +469,7 @@ impl Session {
463469
let serialized_values = values.serialized()?;
464470

465471
let span = trace_span!("Request", query = query.contents.as_str());
466-
let response = self
472+
let run_query_result = self
467473
.run_query(
468474
Statement::default(),
469475
&query.config,
@@ -488,6 +494,16 @@ impl Session {
488494
)
489495
.instrument(span)
490496
.await?;
497+
498+
let response = match run_query_result {
499+
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
500+
response: NonErrorResponse::Result(result::Result::Void),
501+
tracing_id: None,
502+
warnings: Vec::new(),
503+
},
504+
RunQueryResult::Completed(response) => response,
505+
};
506+
491507
self.handle_set_keyspace_response(&response).await?;
492508
self.handle_auto_await_schema_agreement(&query.contents, &response)
493509
.await?;
@@ -761,7 +777,7 @@ impl Session {
761777
"Request",
762778
prepared_id = format!("{:X}", prepared.get_id()).as_str()
763779
);
764-
let response: NonErrorQueryResponse = self
780+
let run_query_result: RunQueryResult<NonErrorQueryResponse> = self
765781
.run_query(
766782
statement_info,
767783
&prepared.config,
@@ -785,6 +801,16 @@ impl Session {
785801
)
786802
.instrument(span)
787803
.await?;
804+
805+
let response = match run_query_result {
806+
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
807+
response: NonErrorResponse::Result(result::Result::Void),
808+
tracing_id: None,
809+
warnings: Vec::new(),
810+
},
811+
RunQueryResult::Completed(response) => response,
812+
};
813+
788814
self.handle_set_keyspace_response(&response).await?;
789815
self.handle_auto_await_schema_agreement(prepared.get_statement(), &response)
790816
.await?;
@@ -913,7 +939,7 @@ impl Session {
913939
) -> Result<BatchResult, QueryError> {
914940
let values_ref = &values;
915941

916-
self
942+
let run_query_result = self
917943
.run_query(
918944
Statement::default(),
919945
&batch.config,
@@ -925,7 +951,15 @@ impl Session {
925951
},
926952
)
927953
.instrument(trace_span!("Batch"))
928-
.await
954+
.await?;
955+
956+
Ok(match run_query_result {
957+
RunQueryResult::IgnoredWriteError => BatchResult {
958+
tracing_id: None,
959+
warnings: Vec::new(),
960+
},
961+
RunQueryResult::Completed(response) => response,
962+
})
929963
}
930964

931965
/// Sends `USE <keyspace_name>` request on all connections\
@@ -1124,7 +1158,7 @@ impl Session {
11241158
statement_config: &StatementConfig,
11251159
choose_connection: impl Fn(Arc<Node>) -> ConnFut,
11261160
do_query: impl Fn(Arc<Connection>, Consistency) -> QueryFut,
1127-
) -> Result<ResT, QueryError>
1161+
) -> Result<RunQueryResult<ResT>, QueryError>
11281162
where
11291163
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
11301164
QueryFut: Future<Output = Result<ResT, QueryError>>,
@@ -1233,7 +1267,7 @@ impl Session {
12331267
mut retry_session: Box<dyn RetrySession>,
12341268
choose_connection: impl Fn(Arc<Node>) -> ConnFut,
12351269
do_query: impl Fn(Arc<Connection>, Consistency) -> QueryFut,
1236-
) -> Option<Result<ResT, QueryError>>
1270+
) -> Option<Result<RunQueryResult<ResT>, QueryError>>
12371271
where
12381272
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
12391273
QueryFut: Future<Output = Result<ResT, QueryError>>,
@@ -1282,7 +1316,7 @@ impl Session {
12821316
let _ = self
12831317
.metrics
12841318
.log_query_latency(query_start.elapsed().as_millis() as u64);
1285-
return Some(Ok(response));
1319+
return Some(Ok(RunQueryResult::Completed(response)));
12861320
}
12871321
Err(e) => {
12881322
trace!(
@@ -1320,7 +1354,11 @@ impl Session {
13201354
current_consistency = cl;
13211355
continue 'nodes_in_plan;
13221356
}
1323-
RetryDecision::DontRetry => return last_error.map(Result::Err),
1357+
RetryDecision::DontRetry => break 'nodes_in_plan,
1358+
1359+
RetryDecision::IgnoreWriteError => {
1360+
return Some(Ok(RunQueryResult::IgnoredWriteError))
1361+
}
13241362
};
13251363
}
13261364
}
@@ -1359,13 +1397,20 @@ impl Session {
13591397
..Default::default()
13601398
};
13611399

1362-
self.run_query(
1363-
info,
1364-
&config,
1365-
|node: Arc<Node>| async move { node.random_connection().await },
1366-
do_query,
1367-
)
1368-
.await
1400+
match self
1401+
.run_query(
1402+
info,
1403+
&config,
1404+
|node: Arc<Node>| async move { node.random_connection().await },
1405+
do_query,
1406+
)
1407+
.await?
1408+
{
1409+
RunQueryResult::IgnoredWriteError => Err(QueryError::ProtocolError(
1410+
"Retry policy has made the driver ignore schema's agreement query.",
1411+
)),
1412+
RunQueryResult::Completed(result) => Ok(result),
1413+
}
13691414
}
13701415

13711416
pub async fn check_schema_agreement(&self) -> Result<bool, QueryError> {

0 commit comments

Comments
 (0)