Skip to content

Commit 28ed6c4

Browse files
authored
Merge pull request #1355 from Lorak-mmk/fix-schema-broken-connection
Ignore nodes with all connections broken during schema agreement.
2 parents 4c7e189 + 0156169 commit 28ed6c4

File tree

6 files changed

+387
-40
lines changed

6 files changed

+387
-40
lines changed

scylla/src/client/session.rs

Lines changed: 156 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use crate::cluster::node::CloudEndpoint;
1212
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1313
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1414
use crate::errors::{
15-
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
16-
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
15+
BadQuery, BrokenConnectionError, ExecutionError, MetadataError, NewSessionError,
16+
PagerExecutionError, PrepareError, RequestAttemptError, RequestError, SchemaAgreementError,
17+
TracingError, UseKeyspaceError,
1718
};
1819
use crate::frame::response::result;
1920
use crate::network::tls::TlsProvider;
@@ -1115,7 +1116,8 @@ impl Session {
11151116
};
11161117

11171118
self.handle_set_keyspace_response(&response).await?;
1118-
self.handle_auto_await_schema_agreement(&response).await?;
1119+
self.handle_auto_await_schema_agreement(&response, coordinator.node().host_id)
1120+
.await?;
11191121

11201122
let (result, paging_state_response) =
11211123
response.into_query_result_and_paging_state(coordinator)?;
@@ -1143,10 +1145,12 @@ impl Session {
11431145
async fn handle_auto_await_schema_agreement(
11441146
&self,
11451147
response: &NonErrorQueryResponse,
1148+
coordinator_id: Uuid,
11461149
) -> Result<(), ExecutionError> {
11471150
if self.schema_agreement_automatic_waiting {
11481151
if response.as_schema_change().is_some() {
1149-
self.await_schema_agreement().await?;
1152+
self.await_schema_agreement_with_required_node(Some(coordinator_id))
1153+
.await?;
11501154
}
11511155

11521156
if self.refresh_metadata_on_auto_schema_agreement
@@ -1488,7 +1492,8 @@ impl Session {
14881492
};
14891493

14901494
self.handle_set_keyspace_response(&response).await?;
1491-
self.handle_auto_await_schema_agreement(&response).await?;
1495+
self.handle_auto_await_schema_agreement(&response, coordinator.node().host_id)
1496+
.await?;
14921497

14931498
let (result, paging_state_response) =
14941499
response.into_query_result_and_paging_state(coordinator)?;
@@ -2159,10 +2164,19 @@ impl Session {
21592164
///
21602165
/// Issues an agreement check each `Session::schema_agreement_interval`.
21612166
/// Loops indefinitely until the agreement is reached.
2162-
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
2167+
///
2168+
/// If `required_node` is Some, only returns Ok if this node successfully
2169+
/// returned its schema version during the agreement process.
2170+
async fn await_schema_agreement_indefinitely(
2171+
&self,
2172+
required_node: Option<Uuid>,
2173+
) -> Result<Uuid, SchemaAgreementError> {
21632174
loop {
21642175
tokio::time::sleep(self.schema_agreement_interval).await;
2165-
if let Some(agreed_version) = self.check_schema_agreement().await? {
2176+
if let Some(agreed_version) = self
2177+
.check_schema_agreement_with_required_node(required_node)
2178+
.await?
2179+
{
21662180
return Ok(agreed_version);
21672181
}
21682182
}
@@ -2176,7 +2190,29 @@ impl Session {
21762190
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
21772191
timeout(
21782192
self.schema_agreement_timeout,
2179-
self.await_schema_agreement_indefinitely(),
2193+
self.await_schema_agreement_indefinitely(None),
2194+
)
2195+
.await
2196+
.unwrap_or(Err(SchemaAgreementError::Timeout(
2197+
self.schema_agreement_timeout,
2198+
)))
2199+
}
2200+
2201+
/// Awaits schema agreement among all reachable nodes.
2202+
///
2203+
/// Issues an agreement check each `Session::schema_agreement_interval`.
2204+
/// If agreement is not reached in `Session::schema_agreement_timeout`,
2205+
/// `SchemaAgreementError::Timeout` is returned.
2206+
///
2207+
/// If `required_node` is Some, only returns Ok if this node successfully
2208+
/// returned its schema version during the agreement process.
2209+
async fn await_schema_agreement_with_required_node(
2210+
&self,
2211+
required_node: Option<Uuid>,
2212+
) -> Result<Uuid, SchemaAgreementError> {
2213+
timeout(
2214+
self.schema_agreement_timeout,
2215+
self.await_schema_agreement_indefinitely(required_node),
21802216
)
21812217
.await
21822218
.unwrap_or(Err(SchemaAgreementError::Timeout(
@@ -2188,37 +2224,123 @@ impl Session {
21882224
///
21892225
/// If so, returns that agreed upon version.
21902226
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2227+
self.check_schema_agreement_with_required_node(None).await
2228+
}
2229+
2230+
/// Checks if all reachable nodes have the same schema version.
2231+
/// If so, returns that agreed upon version.
2232+
///
2233+
/// If `required_node` is Some, only returns Ok if this node successfully
2234+
/// returned its schema version.
2235+
async fn check_schema_agreement_with_required_node(
2236+
&self,
2237+
required_node: Option<Uuid>,
2238+
) -> Result<Option<Uuid>, SchemaAgreementError> {
21912239
let cluster_state = self.get_cluster_state();
21922240
// The iterator is guaranteed to be nonempty.
21932241
let per_node_connections = cluster_state.iter_working_connections_per_node()?;
21942242

21952243
// Therefore, this iterator is guaranteed to be nonempty, too.
2196-
let handles = per_node_connections.map(|connections_to_node| async move {
2197-
// Iterate over connections to the node. Fail if fetching schema version failed on all connections.
2198-
// Else, return the first fetched schema version, because all shards have the same schema version.
2199-
let mut first_err = None;
2200-
for connection in connections_to_node {
2201-
match connection.fetch_schema_version().await {
2202-
Ok(schema_version) => return Ok(schema_version),
2203-
Err(err) => {
2204-
if first_err.is_none() {
2205-
first_err = Some(err);
2206-
}
2207-
}
2208-
}
2209-
}
2210-
// The iterator was guaranteed to be nonempty, so there must have been at least one error.
2211-
Err(first_err.unwrap())
2244+
let handles = per_node_connections.map(|(host_id, pool)| async move {
2245+
(host_id, Session::read_node_schema_version(pool).await)
22122246
});
22132247
// Hence, this is nonempty, too.
2214-
let versions = try_join_all(handles).await?;
2248+
let versions_results = join_all(handles).await;
2249+
2250+
// Verify that required host is present, and returned success.
2251+
if let Some(required_node) = required_node {
2252+
match versions_results
2253+
.iter()
2254+
.find(|(host_id, _)| *host_id == required_node)
2255+
{
2256+
Some((_, Ok(SchemaNodeResult::Success(_version)))) => (),
2257+
// For other connections we can ignore Broken error, but for required
2258+
// host we need an actual schema version.
2259+
Some((_, Ok(SchemaNodeResult::BrokenConnection(e)))) => {
2260+
return Err(SchemaAgreementError::RequestError(
2261+
RequestAttemptError::BrokenConnectionError(e.clone()),
2262+
))
2263+
}
2264+
Some((_, Err(e))) => return Err(e.clone()),
2265+
None => return Err(SchemaAgreementError::RequiredHostAbsent(required_node)),
2266+
}
2267+
}
2268+
2269+
// Now we no longer need all the errors. We can return if there is
2270+
// irrecoverable one, and collect the Ok values otherwise.
2271+
let versions_results: Vec<_> = versions_results
2272+
.into_iter()
2273+
.map(|(_, result)| result)
2274+
.try_collect()?;
2275+
2276+
// unwrap is safe because iterator is still not empty.
2277+
let local_version = match versions_results
2278+
.iter()
2279+
.find_or_first(|r| matches!(r, SchemaNodeResult::Success(_)))
2280+
.unwrap()
2281+
{
2282+
SchemaNodeResult::Success(v) => *v,
2283+
SchemaNodeResult::BrokenConnection(err) => {
2284+
// There are only broken connection errors. Nothing better to do
2285+
// than to return an error.
2286+
return Err(SchemaAgreementError::RequestError(
2287+
RequestAttemptError::BrokenConnectionError(err.clone()),
2288+
));
2289+
}
2290+
};
22152291

2216-
// Therefore, taking the first element is safe.
2217-
let local_version: Uuid = versions[0];
2218-
let in_agreement = versions.into_iter().all(|v| v == local_version);
2292+
let in_agreement = versions_results
2293+
.into_iter()
2294+
.filter_map(|v_r| match v_r {
2295+
SchemaNodeResult::Success(v) => Some(v),
2296+
SchemaNodeResult::BrokenConnection(_) => None,
2297+
})
2298+
.all(|v| v == local_version);
22192299
Ok(in_agreement.then_some(local_version))
22202300
}
22212301

2302+
/// Iterate over connections to the node.
2303+
/// If fetching succeeds on some connection, return first fetched schema version,
2304+
/// as Ok(SchemaNodeResult::Success(...)).
2305+
/// Otherwise it means there are only errors:
2306+
/// - If, and only if, all connections returned ConnectionBrokenError, first such error will be returned,
2307+
/// as Ok(SchemaNodeResult::BrokenConnection(...)).
2308+
/// - Otherwise there is some other type of error on some connection. First such error will be returned as Err(...).
2309+
///
2310+
/// `connections_to_node` iterator must be non-empty!
2311+
async fn read_node_schema_version(
2312+
connections_to_node: impl Iterator<Item = Arc<Connection>>,
2313+
) -> Result<SchemaNodeResult, SchemaAgreementError> {
2314+
let mut first_broken_connection_err: Option<BrokenConnectionError> = None;
2315+
let mut first_unignorable_err: Option<SchemaAgreementError> = None;
2316+
for connection in connections_to_node {
2317+
match connection.fetch_schema_version().await {
2318+
Ok(schema_version) => return Ok(SchemaNodeResult::Success(schema_version)),
2319+
Err(SchemaAgreementError::RequestError(
2320+
RequestAttemptError::BrokenConnectionError(conn_err),
2321+
)) => {
2322+
if first_broken_connection_err.is_none() {
2323+
first_broken_connection_err = Some(conn_err);
2324+
}
2325+
}
2326+
Err(err) => {
2327+
if first_unignorable_err.is_none() {
2328+
first_unignorable_err = Some(err);
2329+
}
2330+
}
2331+
}
2332+
}
2333+
// The iterator was guaranteed to be nonempty, so there must have been at least one error.
2334+
// It means at least one of `first_broken_connection_err` and `first_unrecoverable_err` is Some.
2335+
if let Some(err) = first_unignorable_err {
2336+
return Err(err);
2337+
}
2338+
2339+
Ok(SchemaNodeResult::BrokenConnection(
2340+
first_broken_connection_err.unwrap(),
2341+
))
2342+
}
2343+
22222344
/// Retrieves the handle to execution profile that is used by this session
22232345
/// by default, i.e. when an executed statement does not define its own handle.
22242346
pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
@@ -2300,3 +2422,9 @@ impl ExecuteRequestContext<'_> {
23002422
.log_attempt_error(*attempt_id, error, retry_decision);
23012423
}
23022424
}
2425+
2426+
#[derive(Debug)]
2427+
enum SchemaNodeResult {
2428+
Success(Uuid),
2429+
BrokenConnection(BrokenConnectionError),
2430+
}

scylla/src/cluster/state.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -324,17 +324,21 @@ impl ClusterState {
324324
/// Internal iterator iterates over working connections to all shards of given node.
325325
pub(crate) fn iter_working_connections_per_node(
326326
&self,
327-
) -> Result<impl Iterator<Item = impl Iterator<Item = Arc<Connection>>> + '_, ConnectionPoolError>
328-
{
327+
) -> Result<
328+
impl Iterator<Item = (Uuid, impl Iterator<Item = Arc<Connection>>)> + '_,
329+
ConnectionPoolError,
330+
> {
329331
// The returned iterator is nonempty by nonemptiness invariant of `self.known_peers`.
330332
assert!(!self.known_peers.is_empty());
331333
let nodes_iter = self.known_peers.values();
332-
let mut connection_pool_per_node_iter =
333-
nodes_iter.map(|node| node.get_working_connections());
334+
let mut connection_pool_per_node_iter = nodes_iter.map(|node| {
335+
node.get_working_connections()
336+
.map(|pool| (node.host_id, pool))
337+
});
334338

335339
// First we try to find the first working pool of connections.
336340
// If none is found, return error.
337-
let first_working_pool_or_error: Result<Vec<Arc<Connection>>, ConnectionPoolError> =
341+
let first_working_pool_or_error: Result<(Uuid, Vec<Arc<Connection>>), ConnectionPoolError> =
338342
connection_pool_per_node_iter
339343
.by_ref()
340344
.find_or_first(Result::is_ok)
@@ -344,19 +348,19 @@ impl ClusterState {
344348
// 1. either consumed the whole iterator without success and got the first error,
345349
// in which case we propagate it;
346350
// 2. or found the first working pool of connections.
347-
let first_working_pool: Vec<Arc<Connection>> = first_working_pool_or_error?;
351+
let first_working_pool: (Uuid, Vec<Arc<Connection>>) = first_working_pool_or_error?;
348352

349353
// We retrieve connection pools for remaining nodes (those that are left in the iterator
350354
// once the first working pool has been found).
351355
let remaining_pools_iter = connection_pool_per_node_iter;
352356
// Errors (non-working pools) are filtered out.
353357
let remaining_working_pools_iter = remaining_pools_iter.filter_map(Result::ok);
354-
// Pools are made iterators, so now we have `impl Iterator<Item = impl Iterator<Item = Arc<Connection>>>`.
355-
let remaining_working_per_node_connections_iter =
356-
remaining_working_pools_iter.map(IntoIterator::into_iter);
357358

358-
Ok(std::iter::once(first_working_pool.into_iter())
359-
.chain(remaining_working_per_node_connections_iter))
359+
// First pool is chained with the rest.
360+
// Then, pools are made iterators, so now we have `impl Iterator<Item = (Uuid, impl Iterator<Item = Arc<Connection>>)>`.
361+
Ok(std::iter::once(first_working_pool)
362+
.chain(remaining_working_pools_iter)
363+
.map(|(host_id, pool)| (host_id, IntoIterator::into_iter(pool))))
360364
// By an invariant `self.known_peers` is nonempty, so the returned iterator
361365
// is nonempty, too.
362366
}
@@ -366,7 +370,7 @@ impl ClusterState {
366370
&self,
367371
) -> Result<impl Iterator<Item = Arc<Connection>> + '_, ConnectionPoolError> {
368372
self.iter_working_connections_per_node()
369-
.map(|iter| iter.flatten())
373+
.map(|outer_iter| outer_iter.flat_map(|(_, inner_iter)| inner_iter))
370374
}
371375

372376
/// Returns nonempty iterator of working connections to all nodes.

scylla/src/errors.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::net::{AddrParseError, IpAddr, SocketAddr};
66
use std::num::ParseIntError;
77
use std::sync::Arc;
88
use thiserror::Error;
9+
use uuid::Uuid;
910

1011
use crate::frame::response;
1112

@@ -209,6 +210,12 @@ pub enum SchemaAgreementError {
209210
/// Schema agreement timed out.
210211
#[error("Schema agreement exceeded {}ms", std::time::Duration::as_millis(.0))]
211212
Timeout(std::time::Duration),
213+
214+
#[error(
215+
"Host with id {} required for schema agreement is not present in connection pool",
216+
0
217+
)]
218+
RequiredHostAbsent(Uuid),
212219
}
213220

214221
/// An error that occurred during tracing info fetch.

scylla/tests/integration/session/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod history;
55
mod new_session;
66
mod pager;
77
mod retries;
8+
mod schema_agreement;
89
mod self_identity;
910
mod tracing;
1011
mod use_keyspace;

0 commit comments

Comments
 (0)