|
| 1 | +use crate::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo}; |
| 2 | +use crate::query::Query; |
| 3 | +use crate::routing::Shard; |
| 4 | +use crate::transport::connection::Connection; |
| 5 | +use crate::transport::errors::QueryError; |
1 | 6 | use crate::transport::session_builder::{GenericSessionBuilder, SessionBuilderKind}; |
2 | | -use crate::Session; |
| 7 | +use crate::transport::{ClusterData, NodeRef}; |
| 8 | +use crate::{CachingSession, ExecutionProfile, Session}; |
| 9 | +use std::sync::Arc; |
3 | 10 | use std::{num::NonZeroU32, time::Duration}; |
4 | 11 | use std::{ |
5 | 12 | sync::atomic::{AtomicUsize, Ordering}, |
@@ -100,3 +107,81 @@ pub(crate) fn setup_tracing() { |
100 | 107 | .with_writer(tracing_subscriber::fmt::TestWriter::new()) |
101 | 108 | .try_init(); |
102 | 109 | } |
| 110 | + |
| 111 | +// This LBP produces a predictable query plan - it order the nodes |
| 112 | +// by position in the ring. |
| 113 | +// This is to make sure that all DDL queries land on the same node, |
| 114 | +// to prevent errors from concurrent DDL queries executed on different nodes. |
| 115 | +#[derive(Debug)] |
| 116 | +struct SchemaQueriesLBP; |
| 117 | + |
| 118 | +impl LoadBalancingPolicy for SchemaQueriesLBP { |
| 119 | + fn pick<'a>( |
| 120 | + &'a self, |
| 121 | + _query: &'a RoutingInfo, |
| 122 | + cluster: &'a ClusterData, |
| 123 | + ) -> Option<(NodeRef<'a>, Option<Shard>)> { |
| 124 | + // I'm not sure if Scylla can handle concurrent DDL queries to different shard, |
| 125 | + // in other words if its local lock is per-node or per shard. |
| 126 | + // Just to be safe, let's use explicit shard. |
| 127 | + cluster.get_nodes_info().first().map(|node| (node, Some(0))) |
| 128 | + } |
| 129 | + |
| 130 | + fn fallback<'a>( |
| 131 | + &'a self, |
| 132 | + _query: &'a RoutingInfo, |
| 133 | + cluster: &'a ClusterData, |
| 134 | + ) -> FallbackPlan<'a> { |
| 135 | + Box::new(cluster.get_nodes_info().iter().map(|node| (node, Some(0)))) |
| 136 | + } |
| 137 | + |
| 138 | + fn name(&self) -> String { |
| 139 | + "SchemaQueriesLBP".to_owned() |
| 140 | + } |
| 141 | +} |
| 142 | + |
| 143 | +fn apply_ddl_lbp(query: &mut Query) { |
| 144 | + let policy = query |
| 145 | + .get_execution_profile_handle() |
| 146 | + .map(|profile| profile.pointee_to_builder()) |
| 147 | + .unwrap_or(ExecutionProfile::builder()) |
| 148 | + .load_balancing_policy(Arc::new(SchemaQueriesLBP)) |
| 149 | + .build(); |
| 150 | + query.set_execution_profile_handle(Some(policy.into_handle())); |
| 151 | +} |
| 152 | + |
| 153 | +// This is just to make it easier to call the above function: |
| 154 | +// we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...) |
| 155 | +// or something like that. |
| 156 | +#[allow(unused)] |
| 157 | +#[async_trait::async_trait] |
| 158 | +pub(crate) trait PerformDDL { |
| 159 | + async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), QueryError>; |
| 160 | +} |
| 161 | + |
| 162 | +#[async_trait::async_trait] |
| 163 | +impl PerformDDL for Session { |
| 164 | + async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), QueryError> { |
| 165 | + let mut query = query.into(); |
| 166 | + apply_ddl_lbp(&mut query); |
| 167 | + self.query_unpaged(query, &[]).await.map(|_| ()) |
| 168 | + } |
| 169 | +} |
| 170 | + |
| 171 | +#[async_trait::async_trait] |
| 172 | +impl PerformDDL for CachingSession { |
| 173 | + async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), QueryError> { |
| 174 | + let mut query = query.into(); |
| 175 | + apply_ddl_lbp(&mut query); |
| 176 | + self.execute_unpaged(query, &[]).await.map(|_| ()) |
| 177 | + } |
| 178 | +} |
| 179 | + |
| 180 | +#[async_trait::async_trait] |
| 181 | +impl PerformDDL for Connection { |
| 182 | + async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), QueryError> { |
| 183 | + let mut query = query.into(); |
| 184 | + apply_ddl_lbp(&mut query); |
| 185 | + self.query_unpaged(query).await.map(|_| ()) |
| 186 | + } |
| 187 | +} |
0 commit comments