99
1010use std:: {
1111 collections:: { BTreeSet , HashMap } ,
12+ num:: NonZeroUsize ,
1213 ops:: Deref ,
13- sync:: Arc ,
14+ sync:: {
15+ atomic:: { AtomicUsize , Ordering } ,
16+ Arc , Mutex ,
17+ } ,
1418} ;
1519
1620use async_lock:: { Semaphore , SemaphoreGuard } ;
1721use dashmap:: { mapref:: entry:: Entry , DashMap } ;
1822use futures:: { future:: join_all, FutureExt as _, StreamExt } ;
19- use linera_base:: ensure;
23+ use linera_base:: {
24+ data_types:: { TimeDelta , Timestamp } ,
25+ ensure,
26+ } ;
27+ use lru:: LruCache ;
2028use scylla:: {
2129 client:: {
2230 execution_profile:: { ExecutionProfile , ExecutionProfileHandle } ,
2331 session:: Session ,
2432 session_builder:: SessionBuilder ,
2533 } ,
34+ cluster:: { ClusterState , Node , NodeRef } ,
2635 deserialize:: { DeserializationError , TypeCheckError } ,
2736 errors:: {
28- DbError , ExecutionError , IntoRowsResultError , NewSessionError , NextPageError , NextRowError ,
29- PagerExecutionError , PrepareError , RequestAttemptError , RequestError , RowsError ,
37+ ClusterStateTokenError , DbError , ExecutionError , IntoRowsResultError , NewSessionError ,
38+ NextPageError , NextRowError , PagerExecutionError , PrepareError , RequestAttemptError ,
39+ RequestError , RowsError ,
3040 } ,
3141 policies:: {
32- load_balancing:: { DefaultPolicy , LoadBalancingPolicy } ,
42+ load_balancing:: { DefaultPolicy , FallbackPlan , LoadBalancingPolicy , RoutingInfo } ,
3343 retry:: DefaultRetryPolicy ,
3444 } ,
3545 response:: PagingState ,
36- statement:: { batch:: BatchType , prepared:: PreparedStatement , Consistency } ,
46+ routing:: { Shard , Token } ,
47+ statement:: {
48+ batch:: { Batch , BatchType } ,
49+ prepared:: PreparedStatement ,
50+ Consistency ,
51+ } ,
3752} ;
3853use serde:: { Deserialize , Serialize } ;
3954use thiserror:: Error ;
@@ -97,6 +112,15 @@ const MAX_BATCH_SIZE: usize = 5000;
97112/// The keyspace to use for the ScyllaDB database.
98113const KEYSPACE : & str = "kv" ;
99114
115+ /// The default size of the cache for the load balancing policies.
116+ const DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE : usize = 50_000 ;
117+
118+ enum LoadBalancingPolicyCacheEntry {
119+ Ready ( Arc < dyn LoadBalancingPolicy > ) ,
120+ // The timestamp of the last time the policy creation was attempted.
121+ NotReady ( Timestamp , Option < Token > ) ,
122+ }
123+
100124/// The client for ScyllaDB:
101125/// * The session allows to pass queries
102126/// * The namespace that is being assigned to the database
@@ -116,6 +140,7 @@ struct ScyllaDbClient {
116140 find_key_values_by_prefix_bounded : PreparedStatement ,
117141 multi_key_values : DashMap < usize , PreparedStatement > ,
118142 multi_keys : DashMap < usize , PreparedStatement > ,
143+ batch_load_balancing_policies : Mutex < LruCache < Vec < u8 > , LoadBalancingPolicyCacheEntry > > ,
119144}
120145
121146impl ScyllaDbClient {
@@ -206,6 +231,10 @@ impl ScyllaDbClient {
206231 find_key_values_by_prefix_bounded,
207232 multi_key_values : DashMap :: new ( ) ,
208233 multi_keys : DashMap :: new ( ) ,
234+ batch_load_balancing_policies : Mutex :: new ( LruCache :: new (
235+ NonZeroUsize :: try_from ( DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE )
236+ . expect ( "DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE should not be zero" ) ,
237+ ) ) ,
209238 } )
210239 }
211240
@@ -411,46 +440,122 @@ impl ScyllaDbClient {
411440 Ok ( rows. next ( ) . is_some ( ) )
412441 }
413442
443+ fn attempt_sticky_shard_policy_creation (
444+ & self ,
445+ partition_key : & [ u8 ] ,
446+ token : Option < Token > ,
447+ cache : & mut LruCache < Vec < u8 > , LoadBalancingPolicyCacheEntry > ,
448+ ) -> Arc < dyn LoadBalancingPolicy > {
449+ match StickyShardPolicy :: new (
450+ & self . session ,
451+ & self . namespace ,
452+ partition_key,
453+ token,
454+ ScyllaDbClient :: build_default_policy ( ) ,
455+ ) {
456+ Ok ( policy) => {
457+ let policy = Arc :: new ( policy) ;
458+ cache. push (
459+ partition_key. to_vec ( ) ,
460+ LoadBalancingPolicyCacheEntry :: Ready ( policy. clone ( ) ) ,
461+ ) ;
462+ policy
463+ }
464+ Err ( error) => {
465+ // Cache that the policy creation failed, so we don't try again too soon, and don't
466+ // recalculate the token if not needed.
467+ let token = match error {
468+ ScyllaDbStoreInternalError :: MissingTokenEndpoints ( token) => Some ( token) ,
469+ _ => None ,
470+ } ;
471+ cache. push (
472+ partition_key. to_vec ( ) ,
473+ LoadBalancingPolicyCacheEntry :: NotReady ( Timestamp :: now ( ) , token) ,
474+ ) ;
475+ ScyllaDbClient :: build_default_policy ( )
476+ }
477+ }
478+ }
479+
480+ // Returns a batch query with a sticky shard policy, that always tries to route to the same
481+ // ScyllaDB shard.
482+ // Should be used only on batches where all statements are to the same partition key.
483+ fn get_sticky_batch_query (
484+ & self ,
485+ partition_key : & [ u8 ] ,
486+ ) -> Result < Batch , ScyllaDbStoreInternalError > {
487+ // Since we assume this is all to the same partition key, we can use an unlogged batch.
488+ // We could use a logged batch to get atomicity across different partitions, but that
489+ // comes with a huge performance penalty (seems to double write latency).
490+ let mut batch_query = Batch :: new ( BatchType :: Unlogged ) ;
491+ // Getting the sticky shard policy does some serializing and hashing under the hood, so
492+ // we cache the policy to avoid that extra work.
493+ let policy = {
494+ let mut cache = self
495+ . batch_load_balancing_policies
496+ . lock ( )
497+ . map_err ( |_| ScyllaDbStoreInternalError :: PoisonedMutex ) ?;
498+ if let Some ( policy) = cache. get ( partition_key) {
499+ match policy {
500+ LoadBalancingPolicyCacheEntry :: Ready ( policy) => policy. clone ( ) ,
501+ LoadBalancingPolicyCacheEntry :: NotReady ( timestamp, token) => {
502+ if Timestamp :: now ( ) . delta_since ( * timestamp) > TimeDelta :: from_secs ( 2 ) {
503+ self . attempt_sticky_shard_policy_creation (
504+ partition_key,
505+ * token,
506+ & mut cache,
507+ )
508+ } else {
509+ ScyllaDbClient :: build_default_policy ( )
510+ }
511+ }
512+ }
513+ } else {
514+ self . attempt_sticky_shard_policy_creation ( partition_key, None , & mut cache)
515+ }
516+ } ;
517+ let handle = Self :: build_default_execution_profile_handle ( policy) ;
518+ batch_query. set_execution_profile_handle ( Some ( handle) ) ;
519+
520+ Ok ( batch_query)
521+ }
522+
523+ // Batches should be always to the same partition key. Batches across different partitions
524+ // will not be atomic. If the caller wants atomicity, it's the caller's responsibility to
525+ // make sure that the batch only has statements to the same partition key.
414526 async fn write_batch_internal (
415527 & self ,
416528 root_key : & [ u8 ] ,
417529 batch : UnorderedBatch ,
418530 ) -> Result < ( ) , ScyllaDbStoreInternalError > {
419- let session = & self . session ;
420- let mut batch_query = scylla:: statement:: batch:: Batch :: new ( BatchType :: Unlogged ) ;
421- let mut batch_values = Vec :: new ( ) ;
422- let query1 = & self . write_batch_delete_prefix_unbounded ;
423- let query2 = & self . write_batch_delete_prefix_bounded ;
424531 Self :: check_batch_len ( & batch) ?;
532+ let session = & self . session ;
533+ let mut batch_query = self . get_sticky_batch_query ( root_key) ?;
534+ let mut batch_values = Vec :: with_capacity ( batch. len ( ) ) ;
535+
425536 for key_prefix in batch. key_prefix_deletions {
426537 Self :: check_key_size ( & key_prefix) ?;
427538 match get_upper_bound_option ( & key_prefix) {
428539 None => {
429- let values = vec ! [ root_key. to_vec( ) , key_prefix] ;
430- batch_values. push ( values) ;
431- batch_query. append_statement ( query1. clone ( ) ) ;
540+ batch_query. append_statement ( self . write_batch_delete_prefix_unbounded . clone ( ) ) ;
541+ batch_values. push ( vec ! [ root_key. to_vec( ) , key_prefix] ) ;
432542 }
433543 Some ( upper_bound) => {
434- let values = vec ! [ root_key. to_vec( ) , key_prefix, upper_bound] ;
435- batch_values. push ( values) ;
436- batch_query. append_statement ( query2. clone ( ) ) ;
544+ batch_query. append_statement ( self . write_batch_delete_prefix_bounded . clone ( ) ) ;
545+ batch_values. push ( vec ! [ root_key. to_vec( ) , key_prefix, upper_bound] ) ;
437546 }
438547 }
439548 }
440- let query3 = & self . write_batch_deletion ;
441549 for key in batch. simple_unordered_batch . deletions {
442550 Self :: check_key_size ( & key) ?;
443- let values = vec ! [ root_key. to_vec( ) , key] ;
444- batch_values. push ( values) ;
445- batch_query. append_statement ( query3. clone ( ) ) ;
551+ batch_query. append_statement ( self . write_batch_deletion . clone ( ) ) ;
552+ batch_values. push ( vec ! [ root_key. to_vec( ) , key] ) ;
446553 }
447- let query4 = & self . write_batch_insertion ;
448554 for ( key, value) in batch. simple_unordered_batch . insertions {
449555 Self :: check_key_size ( & key) ?;
450556 Self :: check_value_size ( & value) ?;
451- let values = vec ! [ root_key. to_vec( ) , key, value] ;
452- batch_values. push ( values) ;
453- batch_query. append_statement ( query4. clone ( ) ) ;
557+ batch_query. append_statement ( self . write_batch_insertion . clone ( ) ) ;
558+ batch_values. push ( vec ! [ root_key. to_vec( ) , key, value] ) ;
454559 }
455560 session. batch ( & batch_query, batch_values) . await ?;
456561 Ok ( ( ) )
@@ -523,6 +628,83 @@ impl ScyllaDbClient {
523628 }
524629}
525630
631+ // Batch statements in ScyllaDb are currently not token aware. The batch gets sent to a random
632+ // node: https://rust-driver.docs.scylladb.com/stable/statements/batch.html#performance
633+ // However, for batches where all statements are to the same partition key, we can use a sticky
634+ // shard policy to route to the same shard, and make batches be token aware.
635+ //
636+ // This is a policy that always tries to route to the ScyllaDB shards that contain the token, in a
637+ // round-robin fashion.
638+ #[ derive( Debug ) ]
639+ struct StickyShardPolicy {
640+ replicas : Vec < ( Arc < Node > , Shard ) > ,
641+ current_replica_index : AtomicUsize ,
642+ fallback : Arc < dyn LoadBalancingPolicy > ,
643+ }
644+
645+ impl StickyShardPolicy {
646+ fn new (
647+ session : & Session ,
648+ namespace : & str ,
649+ partition_key : & [ u8 ] ,
650+ token : Option < Token > ,
651+ fallback : Arc < dyn LoadBalancingPolicy > ,
652+ ) -> Result < Self , ScyllaDbStoreInternalError > {
653+ let cluster = session. get_cluster_state ( ) ;
654+ let token = if let Some ( token) = token {
655+ token
656+ } else {
657+ cluster. compute_token ( KEYSPACE , namespace, & ( partition_key, ) ) ?
658+ } ;
659+ let replicas = cluster. get_token_endpoints ( KEYSPACE , namespace, token) ;
660+ if replicas. is_empty ( ) {
661+ // The driver won't always have all the token information available,
662+ // but we can try again later.
663+ return Err ( ScyllaDbStoreInternalError :: MissingTokenEndpoints ( token) ) ;
664+ }
665+ Ok ( Self {
666+ replicas,
667+ current_replica_index : AtomicUsize :: new ( 0 ) ,
668+ fallback,
669+ } )
670+ }
671+ }
672+
673+ impl LoadBalancingPolicy for StickyShardPolicy {
674+ fn name ( & self ) -> String {
675+ "StickyShardPolicy" . to_string ( )
676+ }
677+
678+ // Always try first to route to the sticky shard.
679+ fn pick < ' a > (
680+ & ' a self ,
681+ _request : & ' a RoutingInfo < ' a > ,
682+ _cluster : & ' a ClusterState ,
683+ ) -> Option < ( NodeRef < ' a > , Option < Shard > ) > {
684+ let current_replica_index = self . current_replica_index . load ( Ordering :: Acquire ) ;
685+ let new_replica_index = ( current_replica_index + 1 ) % self . replicas . len ( ) ;
686+ self . current_replica_index
687+ . compare_exchange (
688+ current_replica_index,
689+ new_replica_index,
690+ Ordering :: Release ,
691+ Ordering :: Relaxed ,
692+ )
693+ . ok ( ) ?;
694+ let ( node, shard) = & self . replicas [ new_replica_index] ;
695+ Some ( ( node, Some ( * shard) ) )
696+ }
697+
698+ // Fallback to the default policy.
699+ fn fallback < ' a > (
700+ & ' a self ,
701+ request : & ' a RoutingInfo ,
702+ cluster : & ' a ClusterState ,
703+ ) -> FallbackPlan < ' a > {
704+ self . fallback . fallback ( request, cluster)
705+ }
706+ }
707+
526708/// The client itself and the keeping of the count of active connections.
527709#[ derive( Clone ) ]
528710pub struct ScyllaDbStoreInternal {
@@ -594,6 +776,18 @@ pub enum ScyllaDbStoreInternalError {
594776 /// A next row error in ScyllaDB
595777 #[ error( transparent) ]
596778 NextRowError ( #[ from] NextRowError ) ,
779+
780+ /// A token error in ScyllaDB
781+ #[ error( transparent) ]
782+ ClusterStateTokenError ( #[ from] ClusterStateTokenError ) ,
783+
784+ /// The token endpoint information is currently missing from the driver
785+ #[ error( "The token endpoint information is currently missing from the driver" ) ]
786+ MissingTokenEndpoints ( Token ) ,
787+
788+ /// The mutex is poisoned
789+ #[ error( "The mutex is poisoned" ) ]
790+ PoisonedMutex ,
597791}
598792
599793impl KeyValueStoreError for ScyllaDbStoreInternalError {
@@ -705,6 +899,9 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
705899 // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
706900 type Batch = UnorderedBatch ;
707901
902+ // Batches should be always to the same partition key. Batches across different partitions
903+ // will not be atomic. If the caller wants atomicity, it's the caller's responsibility to
904+ // make sure that the batch only has statements to the same partition key.
708905 async fn write_batch ( & self , batch : Self :: Batch ) -> Result < ( ) , ScyllaDbStoreInternalError > {
709906 let store = self . store . deref ( ) ;
710907 let _guard = self . acquire ( ) . await ;
0 commit comments