|
| 1 | +//! This example show how to enforce the target the request is sent to. |
| 2 | +
|
| 3 | +use std::net::{IpAddr, SocketAddr}; |
| 4 | +use std::sync::Arc; |
| 5 | + |
| 6 | +use anyhow::Result; |
| 7 | +use scylla::client::session::Session; |
| 8 | +use scylla::client::session_builder::SessionBuilder; |
| 9 | +use scylla::cluster::Node; |
| 10 | +use scylla::policies::load_balancing::{NodeIdentifier, SingleTargetLoadBalancingPolicy}; |
| 11 | +use scylla::statement::prepared::PreparedStatement; |
| 12 | + |
| 13 | +/// Executes "SELECT host_id, rpc_address FROM system.local" query with `node` as the enforced target. |
| 14 | +/// Checks whether the result matches the expected values (i.e. ones stored in peers metadata). |
| 15 | +async fn query_system_local_and_verify( |
| 16 | + session: &Session, |
| 17 | + node: &Arc<Node>, |
| 18 | + query_local: &PreparedStatement, |
| 19 | +) { |
| 20 | + let (actual_host_id, actual_node_ip) = session |
| 21 | + .execute_unpaged(query_local, ()) |
| 22 | + .await |
| 23 | + .unwrap() |
| 24 | + .into_rows_result() |
| 25 | + .unwrap() |
| 26 | + .single_row::<(uuid::Uuid, IpAddr)>() |
| 27 | + .unwrap(); |
| 28 | + |
| 29 | + println!( |
| 30 | + "queried host_id: {}; queried node_ip: {}", |
| 31 | + actual_host_id, actual_node_ip |
| 32 | + ); |
| 33 | + assert_eq!(node.host_id, actual_host_id); |
| 34 | + assert_eq!(node.address.ip(), actual_node_ip); |
| 35 | +} |
| 36 | + |
| 37 | +#[tokio::main] |
| 38 | +async fn main() -> Result<()> { |
| 39 | + let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); |
| 40 | + |
| 41 | + let session: Session = SessionBuilder::new().known_node(uri).build().await?; |
| 42 | + |
| 43 | + let state = session.get_cluster_state(); |
| 44 | + let node = state |
| 45 | + .get_nodes_info() |
| 46 | + .first() |
| 47 | + .ok_or_else(|| anyhow::anyhow!("No nodes in metadata!"))?; |
| 48 | + |
| 49 | + let expected_host_id = node.host_id; |
| 50 | + let expected_node_ip = node.address.ip(); |
| 51 | + |
| 52 | + let mut query_local = session |
| 53 | + .prepare("SELECT host_id, rpc_address FROM system.local where key='local'") |
| 54 | + .await?; |
| 55 | + |
| 56 | + // Enforce the node using `Arc<Node>`. |
| 57 | + { |
| 58 | + let node_identifier = NodeIdentifier::Node(Arc::clone(node)); |
| 59 | + println!("Enforcing target using {:?}...", node_identifier); |
| 60 | + query_local.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new( |
| 61 | + node_identifier, |
| 62 | + None, |
| 63 | + ))); |
| 64 | + |
| 65 | + query_system_local_and_verify(&session, node, &query_local).await; |
| 66 | + } |
| 67 | + |
| 68 | + // Enforce the node using host_id. |
| 69 | + { |
| 70 | + let node_identifier = NodeIdentifier::HostId(expected_host_id); |
| 71 | + println!("Enforcing target using {:?}...", node_identifier); |
| 72 | + query_local.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new( |
| 73 | + node_identifier, |
| 74 | + None, |
| 75 | + ))); |
| 76 | + |
| 77 | + query_system_local_and_verify(&session, node, &query_local).await; |
| 78 | + } |
| 79 | + |
| 80 | + // Enforce the node using **untranslated** node address. |
| 81 | + { |
| 82 | + let node_identifier = |
| 83 | + NodeIdentifier::NodeAddress(SocketAddr::new(expected_node_ip, node.address.port())); |
| 84 | + println!("Enforcing target using {:?}...", node_identifier); |
| 85 | + query_local.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new( |
| 86 | + node_identifier, |
| 87 | + None, |
| 88 | + ))); |
| 89 | + |
| 90 | + query_system_local_and_verify(&session, node, &query_local).await; |
| 91 | + } |
| 92 | + |
| 93 | + Ok(()) |
| 94 | +} |
0 commit comments