Skip to content

Commit 60eb0ff

Browse files
committed
Added DowngradingConsistencyRetryPolicy
Such policy, though deprecated in many drivers, may be useful for some users.
1 parent c3a1a51 commit 60eb0ff

File tree

2 files changed

+171
-0
lines changed

2 files changed

+171
-0
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use scylla_cql::{
2+
errors::{DbError, QueryError, WriteType},
3+
frame::types::LegacyConsistency,
4+
Consistency,
5+
};
6+
use tracing::debug;
7+
8+
use crate::retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession};
9+
10+
/// Downgrading consistency retry policy - retries with lower consistency level if it knows\
11+
/// that the initial CL is unreachable. Also, it behaves as [DefaultRetryPolicy] when it believes that the initial CL is reachable.
12+
/// Behaviour based on [DataStax Java Driver]\
13+
///(https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.html)
14+
#[derive(Debug)]
15+
pub struct DowngradingConsistencyRetryPolicy;
16+
17+
impl DowngradingConsistencyRetryPolicy {
18+
pub fn new() -> DowngradingConsistencyRetryPolicy {
19+
DowngradingConsistencyRetryPolicy
20+
}
21+
}
22+
23+
impl Default for DowngradingConsistencyRetryPolicy {
24+
fn default() -> DowngradingConsistencyRetryPolicy {
25+
DowngradingConsistencyRetryPolicy::new()
26+
}
27+
}
28+
29+
impl RetryPolicy for DowngradingConsistencyRetryPolicy {
30+
fn new_session(&self) -> Box<dyn RetrySession> {
31+
Box::new(DowngradingConsistencyRetrySession::new())
32+
}
33+
34+
fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
35+
Box::new(DowngradingConsistencyRetryPolicy)
36+
}
37+
}
38+
39+
pub struct DowngradingConsistencyRetrySession {
40+
was_retry: bool,
41+
}
42+
43+
impl DowngradingConsistencyRetrySession {
44+
pub fn new() -> DowngradingConsistencyRetrySession {
45+
DowngradingConsistencyRetrySession { was_retry: false }
46+
}
47+
}
48+
49+
impl Default for DowngradingConsistencyRetrySession {
50+
fn default() -> DowngradingConsistencyRetrySession {
51+
DowngradingConsistencyRetrySession::new()
52+
}
53+
}
54+
55+
impl RetrySession for DowngradingConsistencyRetrySession {
56+
fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision {
57+
let cl = match query_info.consistency {
58+
LegacyConsistency::Serial(_) => return RetryDecision::DontRetry, // FIXME: is this proper behaviour?
59+
LegacyConsistency::Regular(cl) => cl,
60+
};
61+
62+
fn max_likely_to_work_cl(known_ok: i32, previous_cl: Consistency) -> RetryDecision {
63+
let decision = if known_ok >= 3 {
64+
RetryDecision::RetrySameNode(Consistency::Three)
65+
} else if known_ok == 2 {
66+
RetryDecision::RetrySameNode(Consistency::Two)
67+
} else if known_ok == 1 || previous_cl == Consistency::EachQuorum {
68+
// JAVA-1005: EACH_QUORUM does not report a global number of alive replicas
69+
// so even if we get 0 alive replicas, there might be
70+
// a node up in some other datacenter
71+
RetryDecision::RetrySameNode(Consistency::One)
72+
} else {
73+
RetryDecision::DontRetry
74+
};
75+
if let RetryDecision::RetrySameNode(new_cl) = decision {
76+
debug!(
77+
"Decided to lower required consistency from {} to {}.",
78+
previous_cl, new_cl
79+
);
80+
}
81+
decision
82+
}
83+
84+
match query_info.error {
85+
// Basic errors - there are some problems on this node
86+
// Retry on a different one if possible
87+
QueryError::IoError(_)
88+
| QueryError::DbError(DbError::Overloaded, _)
89+
| QueryError::DbError(DbError::ServerError, _)
90+
| QueryError::DbError(DbError::TruncateError, _) => {
91+
if query_info.is_idempotent {
92+
RetryDecision::RetryNextNode(cl)
93+
} else {
94+
RetryDecision::DontRetry
95+
}
96+
}
97+
// Unavailable - the current node believes that not enough nodes
98+
// are alive to satisfy specified consistency requirements.
99+
QueryError::DbError(DbError::Unavailable { alive, .. }, _) => {
100+
if !self.was_retry {
101+
self.was_retry = true;
102+
max_likely_to_work_cl(*alive, cl)
103+
} else {
104+
RetryDecision::DontRetry
105+
}
106+
}
107+
// ReadTimeout - coordinator didn't receive enough replies in time.
108+
QueryError::DbError(
109+
DbError::ReadTimeout {
110+
received,
111+
required,
112+
data_present,
113+
..
114+
},
115+
_,
116+
) => {
117+
if self.was_retry {
118+
RetryDecision::DontRetry
119+
} else if received < required {
120+
self.was_retry = true;
121+
max_likely_to_work_cl(*received, cl)
122+
} else if !*data_present {
123+
self.was_retry = true;
124+
RetryDecision::RetrySameNode(cl)
125+
} else {
126+
RetryDecision::DontRetry
127+
}
128+
}
129+
// Write timeout - coordinator didn't receive enough replies in time.
130+
QueryError::DbError(
131+
DbError::WriteTimeout {
132+
write_type,
133+
received,
134+
..
135+
},
136+
_,
137+
) => {
138+
if self.was_retry || !query_info.is_idempotent {
139+
RetryDecision::DontRetry
140+
} else {
141+
self.was_retry = true;
142+
match write_type {
143+
WriteType::Batch | WriteType::Simple if *received > 0 => {
144+
RetryDecision::IgnoreWriteError
145+
}
146+
147+
WriteType::UnloggedBatch => {
148+
// Since only part of the batch could have been persisted,
149+
// retry with whatever consistency should allow to persist all
150+
max_likely_to_work_cl(*received, cl)
151+
}
152+
WriteType::BatchLog => RetryDecision::RetrySameNode(cl),
153+
154+
_ => RetryDecision::DontRetry,
155+
}
156+
}
157+
}
158+
// The node is still bootstrapping it can't execute the query, we should try another one
159+
QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(cl),
160+
// Connection to the contacted node is overloaded, try another one
161+
QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(cl),
162+
// In all other cases propagate the error to the user
163+
_ => RetryDecision::DontRetry,
164+
}
165+
}
166+
167+
fn reset(&mut self) {
168+
*self = DowngradingConsistencyRetrySession::new();
169+
}
170+
}

scylla/src/transport/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub(crate) mod caching_session;
22
mod cluster;
33
pub(crate) mod connection;
44
mod connection_pool;
5+
pub mod downgrading_consistency_retry_policy;
56
pub mod iterator;
67
pub mod load_balancing;
78
pub(crate) mod metrics;

0 commit comments

Comments
 (0)