Skip to content

Commit 3416b9a

Browse files
committed
Schema agreement: Ignore BrokenConnectionError
This should fix #1240 Such fix also makes sense from another perspective: `await_scheme_agreement` doc comment says "Awaits schema agreement among all reachable nodes.". If all connections to a given node are broken, we can definitely conclude that the node is not reachable. Previously I thought that doing this would introduce a bug: what if a coordinator of DDL becomes unreachable after returning a response, but before agreement is reached? We could reach agreement on old schema version! Now I see that this issue is pre-existing: `await_schema_agreement` reads ClusterState itself, so the following race is possible: - Driver sends DDL - Coordinator responds and dies - Driver reads the response - Driver detects that the coordinator is dead, notes that in ClusterState - Driver tries to perform schema agreement, and does that without using the coordinator. This issue will be fixed in next commits.
1 parent 60fa7c4 commit 3416b9a

File tree

1 file changed

+60
-13
lines changed

1 file changed

+60
-13
lines changed

scylla/src/client/session.rs

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,33 +2196,74 @@ impl Session {
21962196
// Therefore, this iterator is guaranteed to be nonempty, too.
21972197
let handles = per_node_connections.map(Session::read_node_schema_version);
21982198
// Hence, this is nonempty, too.
2199-
let versions = try_join_all(handles).await?;
2199+
let versions_results = try_join_all(handles).await?;
22002200

2201-
// Therefore, taking the first element is safe.
2202-
let local_version: Uuid = versions[0];
2203-
let in_agreement = versions.into_iter().all(|v| v == local_version);
2201+
// unwrap is safe because iterator is still not empty.
2202+
let local_version = match versions_results
2203+
.iter()
2204+
.find_or_first(|r| matches!(r, SchemaNodeResult::Success(_)))
2205+
.unwrap()
2206+
{
2207+
SchemaNodeResult::Success(v) => *v,
2208+
SchemaNodeResult::BrokenConnection(err) => {
2209+
// There are only broken connection errors. Nothing better to do
2210+
// than to return an error.
2211+
return Err(SchemaAgreementError::RequestError(
2212+
RequestAttemptError::BrokenConnectionError(err.clone()),
2213+
));
2214+
}
2215+
};
2216+
2217+
let in_agreement = versions_results
2218+
.into_iter()
2219+
.filter_map(|v_r| match v_r {
2220+
SchemaNodeResult::Success(v) => Some(v),
2221+
SchemaNodeResult::BrokenConnection(_) => None,
2222+
})
2223+
.all(|v| v == local_version);
22042224
Ok(in_agreement.then_some(local_version))
22052225
}
22062226

2207-
// Iterator must be non-empty!
2227+
/// Iterate over connections to the node.
2228+
/// If fetching succeeds on some connection, return first fetched schema version,
2229+
/// as Ok(SchemaNodeResult::Success(...)).
2230+
/// Otherwise it means there are only errors:
2231+
/// - If, and only if, all connections returned ConnectionBrokenError, first such error will be returned,
2232+
/// as Ok(SchemaNodeResult::BrokenConnection(...)).
2233+
/// - Otherwise there is some other type of error on some connection. First such error will be returned as Err(...).
2234+
///
2235+
/// `connections_to_node` iterator must be non-empty!
22082236
async fn read_node_schema_version(
22092237
connections_to_node: impl Iterator<Item = Arc<Connection>>,
2210-
) -> Result<Uuid, SchemaAgreementError> {
2211-
// Iterate over connections to the node. Fail if fetching schema version failed on all connections.
2212-
// Else, return the first fetched schema version, because all shards have the same schema version.
2213-
let mut first_err = None;
2238+
) -> Result<SchemaNodeResult, SchemaAgreementError> {
2239+
let mut first_broken_connection_err: Option<BrokenConnectionError> = None;
2240+
let mut first_unignorable_err: Option<SchemaAgreementError> = None;
22142241
for connection in connections_to_node {
22152242
match connection.fetch_schema_version().await {
2216-
Ok(schema_version) => return Ok(schema_version),
2243+
Ok(schema_version) => return Ok(SchemaNodeResult::Success(schema_version)),
2244+
Err(SchemaAgreementError::RequestError(
2245+
RequestAttemptError::BrokenConnectionError(conn_err),
2246+
)) => {
2247+
if first_broken_connection_err.is_none() {
2248+
first_broken_connection_err = Some(conn_err);
2249+
}
2250+
}
22172251
Err(err) => {
2218-
if first_err.is_none() {
2219-
first_err = Some(err);
2252+
if first_unignorable_err.is_none() {
2253+
first_unignorable_err = Some(err);
22202254
}
22212255
}
22222256
}
22232257
}
22242258
// The iterator was guaranteed to be nonempty, so there must have been at least one error.
2225-
Err(first_err.unwrap())
2259+
// It means at least one of `first_broken_connection_err` and `first_unrecoverable_err` is Some.
2260+
if let Some(err) = first_unignorable_err {
2261+
return Err(err);
2262+
}
2263+
2264+
Ok(SchemaNodeResult::BrokenConnection(
2265+
first_broken_connection_err.unwrap(),
2266+
))
22262267
}
22272268

22282269
/// Retrieves the handle to execution profile that is used by this session
@@ -2306,3 +2347,9 @@ impl ExecuteRequestContext<'_> {
23062347
.log_attempt_error(*attempt_id, error, retry_decision);
23072348
}
23082349
}
2350+
2351+
#[derive(Debug)]
2352+
enum SchemaNodeResult {
2353+
Success(Uuid),
2354+
BrokenConnection(BrokenConnectionError),
2355+
}

0 commit comments

Comments
 (0)