@@ -5,8 +5,9 @@ use scylla::client::session_builder::{GenericSessionBuilder, SessionBuilderKind}
5
5
use scylla:: cluster:: ClusterState ;
6
6
use scylla:: cluster:: NodeRef ;
7
7
use scylla:: deserialize:: DeserializeValue ;
8
- use scylla:: errors:: ExecutionError ;
8
+ use scylla:: errors:: { DbError , ExecutionError , RequestAttemptError } ;
9
9
use scylla:: policies:: load_balancing:: { FallbackPlan , LoadBalancingPolicy , RoutingInfo } ;
10
+ use scylla:: policies:: retry:: { RequestInfo , RetryDecision , RetryPolicy , RetrySession } ;
10
11
use scylla:: query:: Query ;
11
12
use scylla:: routing:: Shard ;
12
13
use std:: collections:: HashMap ;
@@ -17,6 +18,7 @@ use std::str::FromStr;
17
18
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
18
19
use std:: sync:: Arc ;
19
20
use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
21
+ use tracing:: { error, warn} ;
20
22
21
23
use scylla_proxy:: { Node , Proxy , ProxyError , RunningProxy , ShardAwareness } ;
22
24
@@ -207,12 +209,54 @@ impl LoadBalancingPolicy for SchemaQueriesLBP {
207
209
}
208
210
}
209
211
212
+ #[ derive( Debug , Default ) ]
213
+ struct SchemaQueriesRetrySession {
214
+ count : usize ,
215
+ }
216
+
217
+ impl RetrySession for SchemaQueriesRetrySession {
218
+ fn decide_should_retry ( & mut self , request_info : RequestInfo ) -> RetryDecision {
219
+ match request_info. error {
220
+ RequestAttemptError :: DbError ( DbError :: ServerError , s)
221
+ if s == "Failed to apply group 0 change due to concurrent modification" =>
222
+ {
223
+ self . count += 1 ;
224
+ // Give up if there are many failures.
225
+ // In this case we really should do something about it in the
226
+ // core, because it is absurd for DDL queries to fail this often.
227
+ if self . count >= 10 {
228
+ error ! ( "Received TENTH(!) group 0 concurrent modification error during DDL. Please fix Scylla Core." ) ;
229
+ RetryDecision :: DontRetry
230
+ } else {
231
+ warn ! ( "Received group 0 concurrent modification error during DDL. Performing retry #{}." , self . count) ;
232
+ RetryDecision :: RetrySameNode ( None )
233
+ }
234
+ }
235
+ _ => RetryDecision :: DontRetry ,
236
+ }
237
+ }
238
+
239
+ fn reset ( & mut self ) {
240
+ * self = Default :: default ( )
241
+ }
242
+ }
243
+
244
+ #[ derive( Debug ) ]
245
+ struct SchemaQueriesRetryPolicy ;
246
+
247
+ impl RetryPolicy for SchemaQueriesRetryPolicy {
248
+ fn new_session ( & self ) -> Box < dyn RetrySession > {
249
+ Box :: new ( SchemaQueriesRetrySession :: default ( ) )
250
+ }
251
+ }
252
+
210
253
fn apply_ddl_lbp ( query : & mut Query ) {
211
254
let policy = query
212
255
. get_execution_profile_handle ( )
213
256
. map ( |profile| profile. pointee_to_builder ( ) )
214
257
. unwrap_or ( ExecutionProfile :: builder ( ) )
215
258
. load_balancing_policy ( Arc :: new ( SchemaQueriesLBP ) )
259
+ . retry_policy ( Arc :: new ( SchemaQueriesRetryPolicy ) )
216
260
. build ( ) ;
217
261
query. set_execution_profile_handle ( Some ( policy. into_handle ( ) ) ) ;
218
262
}
0 commit comments