Skip to content

Commit 18a2ce6

Browse files
committed
Require schema agreement response from DDL coordinator
This fixes the issue described in parent commit. Internal schema agreement APIs now accept a `Option<Uuid>` that is a host id of a node that must be a part of schema agreement. For user-requested agreements it will be None, and for agreements after DDL it will be the coordinator.
1 parent 3416b9a commit 18a2ce6

File tree

3 files changed

+106
-20
lines changed

3 files changed

+106
-20
lines changed

scylla/src/client/session.rs

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,8 @@ impl Session {
11161116
};
11171117

11181118
self.handle_set_keyspace_response(&response).await?;
1119-
self.handle_auto_await_schema_agreement(&response).await?;
1119+
self.handle_auto_await_schema_agreement(&response, coordinator.node().host_id)
1120+
.await?;
11201121

11211122
let (result, paging_state_response) =
11221123
response.into_query_result_and_paging_state(coordinator)?;
@@ -1144,10 +1145,12 @@ impl Session {
11441145
async fn handle_auto_await_schema_agreement(
11451146
&self,
11461147
response: &NonErrorQueryResponse,
1148+
coordinator_id: Uuid,
11471149
) -> Result<(), ExecutionError> {
11481150
if self.schema_agreement_automatic_waiting {
11491151
if response.as_schema_change().is_some() {
1150-
self.await_schema_agreement().await?;
1152+
self.await_schema_agreement_with_required_node(Some(coordinator_id))
1153+
.await?;
11511154
}
11521155

11531156
if self.refresh_metadata_on_auto_schema_agreement
@@ -1489,7 +1492,8 @@ impl Session {
14891492
};
14901493

14911494
self.handle_set_keyspace_response(&response).await?;
1492-
self.handle_auto_await_schema_agreement(&response).await?;
1495+
self.handle_auto_await_schema_agreement(&response, coordinator.node().host_id)
1496+
.await?;
14931497

14941498
let (result, paging_state_response) =
14951499
response.into_query_result_and_paging_state(coordinator)?;
@@ -2160,10 +2164,19 @@ impl Session {
21602164
///
21612165
/// Issues an agreement check each `Session::schema_agreement_interval`.
21622166
/// Loops indefinitely until the agreement is reached.
2163-
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
2167+
///
2168+
/// If `required_node` is Some, only returns Ok if this node successfully
2169+
/// returned its schema version during the agreement process.
2170+
async fn await_schema_agreement_indefinitely(
2171+
&self,
2172+
required_node: Option<Uuid>,
2173+
) -> Result<Uuid, SchemaAgreementError> {
21642174
loop {
21652175
tokio::time::sleep(self.schema_agreement_interval).await;
2166-
if let Some(agreed_version) = self.check_schema_agreement().await? {
2176+
if let Some(agreed_version) = self
2177+
.check_schema_agreement_with_required_node(required_node)
2178+
.await?
2179+
{
21672180
return Ok(agreed_version);
21682181
}
21692182
}
@@ -2177,7 +2190,29 @@ impl Session {
21772190
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
21782191
timeout(
21792192
self.schema_agreement_timeout,
2180-
self.await_schema_agreement_indefinitely(),
2193+
self.await_schema_agreement_indefinitely(None),
2194+
)
2195+
.await
2196+
.unwrap_or(Err(SchemaAgreementError::Timeout(
2197+
self.schema_agreement_timeout,
2198+
)))
2199+
}
2200+
2201+
/// Awaits schema agreement among all reachable nodes.
2202+
///
2203+
/// Issues an agreement check each `Session::schema_agreement_interval`.
2204+
/// If agreement is not reached in `Session::schema_agreement_timeout`,
2205+
/// `SchemaAgreementError::Timeout` is returned.
2206+
///
2207+
/// If `required_node` is Some, only returns Ok if this node successfully
2208+
/// returned its schema version during the agreement process.
2209+
async fn await_schema_agreement_with_required_node(
2210+
&self,
2211+
required_node: Option<Uuid>,
2212+
) -> Result<Uuid, SchemaAgreementError> {
2213+
timeout(
2214+
self.schema_agreement_timeout,
2215+
self.await_schema_agreement_indefinitely(required_node),
21812216
)
21822217
.await
21832218
.unwrap_or(Err(SchemaAgreementError::Timeout(
@@ -2189,14 +2224,54 @@ impl Session {
21892224
///
21902225
/// If so, returns that agreed upon version.
21912226
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2227+
self.check_schema_agreement_with_required_node(None).await
2228+
}
2229+
2230+
/// Checks if all reachable nodes have the same schema version.
2231+
/// If so, returns that agreed upon version.
2232+
///
2233+
/// If `required_node` is Some, only returns Ok if this node successfully
2234+
/// returned its schema version.
2235+
async fn check_schema_agreement_with_required_node(
2236+
&self,
2237+
required_node: Option<Uuid>,
2238+
) -> Result<Option<Uuid>, SchemaAgreementError> {
21922239
let cluster_state = self.get_cluster_state();
21932240
// The iterator is guaranteed to be nonempty.
21942241
let per_node_connections = cluster_state.iter_working_connections_per_node()?;
21952242

21962243
// Therefore, this iterator is guaranteed to be nonempty, too.
2197-
let handles = per_node_connections.map(Session::read_node_schema_version);
2244+
let handles = per_node_connections.map(|(host_id, pool)| async move {
2245+
(host_id, Session::read_node_schema_version(pool).await)
2246+
});
21982247
// Hence, this is nonempty, too.
2199-
let versions_results = try_join_all(handles).await?;
2248+
let versions_results = join_all(handles).await;
2249+
2250+
// Verify that required host is present, and returned success.
2251+
if let Some(required_node) = required_node {
2252+
match versions_results
2253+
.iter()
2254+
.find(|(host_id, _)| *host_id == required_node)
2255+
{
2256+
Some((_, Ok(SchemaNodeResult::Success(_version)))) => (),
2257+
// For other connections we can ignore Broken error, but for required
2258+
// host we need an actual schema version.
2259+
Some((_, Ok(SchemaNodeResult::BrokenConnection(e)))) => {
2260+
return Err(SchemaAgreementError::RequestError(
2261+
RequestAttemptError::BrokenConnectionError(e.clone()),
2262+
))
2263+
}
2264+
Some((_, Err(e))) => return Err(e.clone()),
2265+
None => return Err(SchemaAgreementError::RequiredHostAbsent(required_node)),
2266+
}
2267+
}
2268+
2269+
// Now we no longer need all the errors. We can return if there is
2270+
// irrecoverable one, and collect the Ok values otherwise.
2271+
let versions_results: Vec<_> = versions_results
2272+
.into_iter()
2273+
.map(|(_, result)| result)
2274+
.try_collect()?;
22002275

22012276
// unwrap is safe because iterator is still not empty.
22022277
let local_version = match versions_results

scylla/src/cluster/state.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -324,17 +324,21 @@ impl ClusterState {
324324
/// Internal iterator iterates over working connections to all shards of given node.
325325
pub(crate) fn iter_working_connections_per_node(
326326
&self,
327-
) -> Result<impl Iterator<Item = impl Iterator<Item = Arc<Connection>>> + '_, ConnectionPoolError>
328-
{
327+
) -> Result<
328+
impl Iterator<Item = (Uuid, impl Iterator<Item = Arc<Connection>>)> + '_,
329+
ConnectionPoolError,
330+
> {
329331
// The returned iterator is nonempty by nonemptiness invariant of `self.known_peers`.
330332
assert!(!self.known_peers.is_empty());
331333
let nodes_iter = self.known_peers.values();
332-
let mut connection_pool_per_node_iter =
333-
nodes_iter.map(|node| node.get_working_connections());
334+
let mut connection_pool_per_node_iter = nodes_iter.map(|node| {
335+
node.get_working_connections()
336+
.map(|pool| (node.host_id, pool))
337+
});
334338

335339
// First we try to find the first working pool of connections.
336340
// If none is found, return error.
337-
let first_working_pool_or_error: Result<Vec<Arc<Connection>>, ConnectionPoolError> =
341+
let first_working_pool_or_error: Result<(Uuid, Vec<Arc<Connection>>), ConnectionPoolError> =
338342
connection_pool_per_node_iter
339343
.by_ref()
340344
.find_or_first(Result::is_ok)
@@ -344,19 +348,19 @@ impl ClusterState {
344348
// 1. either consumed the whole iterator without success and got the first error,
345349
// in which case we propagate it;
346350
// 2. or found the first working pool of connections.
347-
let first_working_pool: Vec<Arc<Connection>> = first_working_pool_or_error?;
351+
let first_working_pool: (Uuid, Vec<Arc<Connection>>) = first_working_pool_or_error?;
348352

349353
// We retrieve connection pools for remaining nodes (those that are left in the iterator
350354
// once the first working pool has been found).
351355
let remaining_pools_iter = connection_pool_per_node_iter;
352356
// Errors (non-working pools) are filtered out.
353357
let remaining_working_pools_iter = remaining_pools_iter.filter_map(Result::ok);
354-
// Pools are made iterators, so now we have `impl Iterator<Item = impl Iterator<Item = Arc<Connection>>>`.
355-
let remaining_working_per_node_connections_iter =
356-
remaining_working_pools_iter.map(IntoIterator::into_iter);
357358

358-
Ok(std::iter::once(first_working_pool.into_iter())
359-
.chain(remaining_working_per_node_connections_iter))
359+
// First pool is chained with the rest.
360+
// Then, pools are made iterators, so now we have `impl Iterator<Item = (Uuid, impl Iterator<Item = Arc<Connection>>)>`.
361+
Ok(std::iter::once(first_working_pool)
362+
.chain(remaining_working_pools_iter)
363+
.map(|(host_id, pool)| (host_id, IntoIterator::into_iter(pool))))
360364
// By an invariant `self.known_peers` is nonempty, so the returned iterator
361365
// is nonempty, too.
362366
}
@@ -366,7 +370,7 @@ impl ClusterState {
366370
&self,
367371
) -> Result<impl Iterator<Item = Arc<Connection>> + '_, ConnectionPoolError> {
368372
self.iter_working_connections_per_node()
369-
.map(|iter| iter.flatten())
373+
.map(|outer_iter| outer_iter.flat_map(|(_, inner_iter)| inner_iter))
370374
}
371375

372376
/// Returns nonempty iterator of working connections to all nodes.

scylla/src/errors.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::net::{AddrParseError, IpAddr, SocketAddr};
66
use std::num::ParseIntError;
77
use std::sync::Arc;
88
use thiserror::Error;
9+
use uuid::Uuid;
910

1011
use crate::frame::response;
1112

@@ -209,6 +210,12 @@ pub enum SchemaAgreementError {
209210
/// Schema agreement timed out.
210211
#[error("Schema agreement exceeded {}ms", std::time::Duration::as_millis(.0))]
211212
Timeout(std::time::Duration),
213+
214+
#[error(
215+
"Host with id {} required for schema agreement is not present in connection pool",
216+
0
217+
)]
218+
RequiredHostAbsent(Uuid),
212219
}
213220

214221
/// An error that occurred during tracing info fetch.

0 commit comments

Comments
 (0)