@@ -118,6 +118,8 @@ pub(crate) struct MetadataReader {
118
118
pub ( crate ) struct Metadata {
119
119
pub ( crate ) peers : Vec < Peer > ,
120
120
pub ( crate ) keyspaces : HashMap < String , Result < Keyspace , SingleKeyspaceMetadataError > > ,
121
+ /// The release_version obtained from `system.local` on control connection.
122
+ pub ( crate ) cluster_version : Option < String > ,
121
123
}
122
124
123
125
#[ non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
@@ -420,6 +422,7 @@ impl Metadata {
420
422
Metadata {
421
423
peers,
422
424
keyspaces : HashMap :: new ( ) ,
425
+ cluster_version : None ,
423
426
}
424
427
}
425
428
}
@@ -728,10 +731,10 @@ impl ControlConnection {
728
731
keyspace_to_fetch : & [ String ] ,
729
732
fetch_schema : bool ,
730
733
) -> Result < Metadata , MetadataError > {
731
- let peers_query = self . query_peers ( connect_port) ;
734
+ let peers_query = self . query_peers_and_cluster_version ( connect_port) ;
732
735
let keyspaces_query = self . query_keyspaces ( keyspace_to_fetch, fetch_schema) ;
733
736
734
- let ( peers, keyspaces) = tokio:: try_join!( peers_query, keyspaces_query) ?;
737
+ let ( ( peers, cluster_version ) , keyspaces) = tokio:: try_join!( peers_query, keyspaces_query) ?;
735
738
736
739
// There must be at least one peer
737
740
if peers. is_empty ( ) {
@@ -743,7 +746,11 @@ impl ControlConnection {
743
746
return Err ( MetadataError :: Peers ( PeersMetadataError :: EmptyTokenLists ) ) ;
744
747
}
745
748
746
- Ok ( Metadata { peers, keyspaces } )
749
+ Ok ( Metadata {
750
+ peers,
751
+ keyspaces,
752
+ cluster_version,
753
+ } )
747
754
}
748
755
}
749
756
@@ -779,7 +786,13 @@ impl NodeInfoSource {
779
786
const METADATA_QUERY_PAGE_SIZE : i32 = 1024 ;
780
787
781
788
impl ControlConnection {
782
- async fn query_peers ( & self , connect_port : u16 ) -> Result < Vec < Peer > , MetadataError > {
789
+ /// Returns the vector of peers and the cluster version.
790
+ /// Cluster version is the `release_version` column from `system.local` query
791
+ /// executed on control connection.
792
+ async fn query_peers_and_cluster_version (
793
+ & self ,
794
+ connect_port : u16 ,
795
+ ) -> Result < ( Vec < Peer > , Option < String > ) , MetadataError > {
783
796
let mut peers_query = Statement :: new (
784
797
"select host_id, release_version, rpc_address, data_center, rack, tokens from system.peers" ,
785
798
) ;
@@ -828,7 +841,9 @@ impl ControlConnection {
828
841
829
842
let translated_peers_futures = untranslated_rows. map ( |row_result| async {
830
843
match row_result {
831
- Ok ( ( source, row) ) => Self :: create_peer_from_row ( source, row, local_address) . await ,
844
+ Ok ( ( source, row) ) => Self :: create_peer_from_row ( source, row, local_address)
845
+ . await
846
+ . map ( |peer| ( source, peer) ) ,
832
847
Err ( err) => {
833
848
warn ! (
834
849
"system.peers or system.local has an invalid row, skipping it: {}" ,
@@ -839,12 +854,24 @@ impl ControlConnection {
839
854
}
840
855
} ) ;
841
856
842
- let peers = translated_peers_futures
857
+ let vec_capacity = translated_peers_futures. size_hint ( ) . 0 ;
858
+ let ( peers, cluster_version) = translated_peers_futures
843
859
. buffer_unordered ( 256 )
844
860
. filter_map ( std:: future:: ready)
845
- . collect :: < Vec < _ > > ( )
861
+ . fold (
862
+ ( Vec :: with_capacity ( vec_capacity) , None ) ,
863
+ |( mut peers, cluster_version) , ( source, peer) | async move {
864
+ let new_cluster_version = match ( & cluster_version, source) {
865
+ ( None , NodeInfoSource :: Local ) => peer. server_version . clone ( ) ,
866
+ _ => cluster_version,
867
+ } ;
868
+
869
+ peers. push ( peer) ;
870
+ ( peers, new_cluster_version)
871
+ } ,
872
+ )
846
873
. await ;
847
- Ok ( peers)
874
+ Ok ( ( peers, cluster_version ) )
848
875
}
849
876
850
877
async fn create_peer_from_row (
0 commit comments