Skip to content

Commit 60fa7c4

Browse files
committed
Session: extract read_node_schema_version
Schema agreement logic will become more complicated in next commits. Splitting this function should aid readability.
1 parent 4c7e189 commit 60fa7c4

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

scylla/src/client/session.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use crate::cluster::node::CloudEndpoint;
1212
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1313
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1414
use crate::errors::{
15-
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
16-
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
15+
BadQuery, BrokenConnectionError, ExecutionError, MetadataError, NewSessionError,
16+
PagerExecutionError, PrepareError, RequestAttemptError, RequestError, SchemaAgreementError,
17+
TracingError, UseKeyspaceError,
1718
};
1819
use crate::frame::response::result;
1920
use crate::network::tls::TlsProvider;
@@ -2193,23 +2194,7 @@ impl Session {
21932194
let per_node_connections = cluster_state.iter_working_connections_per_node()?;
21942195

21952196
// Therefore, this iterator is guaranteed to be nonempty, too.
2196-
let handles = per_node_connections.map(|connections_to_node| async move {
2197-
// Iterate over connections to the node. Fail if fetching schema version failed on all connections.
2198-
// Else, return the first fetched schema version, because all shards have the same schema version.
2199-
let mut first_err = None;
2200-
for connection in connections_to_node {
2201-
match connection.fetch_schema_version().await {
2202-
Ok(schema_version) => return Ok(schema_version),
2203-
Err(err) => {
2204-
if first_err.is_none() {
2205-
first_err = Some(err);
2206-
}
2207-
}
2208-
}
2209-
}
2210-
// The iterator was guaranteed to be nonempty, so there must have been at least one error.
2211-
Err(first_err.unwrap())
2212-
});
2197+
let handles = per_node_connections.map(Session::read_node_schema_version);
22132198
// Hence, this is nonempty, too.
22142199
let versions = try_join_all(handles).await?;
22152200

@@ -2219,6 +2204,27 @@ impl Session {
22192204
Ok(in_agreement.then_some(local_version))
22202205
}
22212206

2207+
// Iterator must be non-empty!
2208+
async fn read_node_schema_version(
2209+
connections_to_node: impl Iterator<Item = Arc<Connection>>,
2210+
) -> Result<Uuid, SchemaAgreementError> {
2211+
// Iterate over connections to the node. Fail if fetching schema version failed on all connections.
2212+
// Else, return the first fetched schema version, because all shards have the same schema version.
2213+
let mut first_err = None;
2214+
for connection in connections_to_node {
2215+
match connection.fetch_schema_version().await {
2216+
Ok(schema_version) => return Ok(schema_version),
2217+
Err(err) => {
2218+
if first_err.is_none() {
2219+
first_err = Some(err);
2220+
}
2221+
}
2222+
}
2223+
}
2224+
// The iterator was guaranteed to be nonempty, so there must have been at least one error.
2225+
Err(first_err.unwrap())
2226+
}
2227+
22222228
/// Retrieves the handle to execution profile that is used by this session
22232229
/// by default, i.e. when an executed statement does not define its own handle.
22242230
pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {

0 commit comments

Comments
 (0)