@@ -128,6 +128,7 @@ impl Default for ScyllaDbClientConfig {
128128
129129/// Map from partition_key to a map from keys to a list of their occurrences in the original vector.
130130type OccurrencesMap = HashMap < Vec < u8 > , HashMap < Vec < u8 > , Vec < usize > > > ;
131+ type BatchAndValues = ( Batch , Vec < Vec < Vec < u8 > > > ) ;
131132
132133/// The client for ScyllaDB:
133134/// * The session allows to pass queries
@@ -376,61 +377,75 @@ impl ScyllaDbClient {
376377 Ok ( ( ) )
377378 }
378379
379- fn check_batch_len ( batch : & UnorderedBatch ) -> Result < ( ) , ScyllaDbStoreInternalError > {
380+ fn check_batch_len ( batch : & Batch ) -> Result < ( ) , ScyllaDbStoreInternalError > {
380381 ensure ! (
381- batch. len( ) <= MAX_BATCH_SIZE ,
382+ batch. statements . len( ) <= MAX_BATCH_SIZE ,
382383 ScyllaDbStoreInternalError :: BatchTooLong
383384 ) ;
384385 Ok ( ( ) )
385386 }
386387
387- fn check_batch_partition_key (
388- & self ,
389- partition_key_prefix : & [ u8 ] ,
390- key : & [ u8 ] ,
391- batch_partition_key : & mut Option < Vec < u8 > > ,
392- ) -> Result < ( ) , ScyllaDbStoreInternalError > {
393- let partition_key = self . get_partition_key ( partition_key_prefix, key) ?;
394- if let Some ( batch_partition_key) = batch_partition_key {
395- ensure ! (
396- * batch_partition_key == partition_key,
397- ScyllaDbStoreInternalError :: MultiplePartitionKeysInBatch
398- ) ;
399- } else {
400- * batch_partition_key = Some ( partition_key. to_vec ( ) ) ;
401- }
402- Ok ( ( ) )
403- }
404-
405- fn check_batch_and_partition_keys (
388+ fn get_per_partition_batches (
406389 & self ,
407390 partition_key_prefix : & [ u8 ] ,
408391 exclusive_mode : bool ,
409392 batch : & UnorderedBatch ,
410- ) -> Result < Vec < u8 > , ScyllaDbStoreInternalError > {
393+ ) -> Result < HashMap < Vec < u8 > , BatchAndValues > , ScyllaDbStoreInternalError > {
411394 if !exclusive_mode {
412395 ensure ! (
413396 batch. key_prefix_deletions. is_empty( ) ,
414397 ScyllaDbStoreInternalError :: PrefixDeletionsNotAllowedInNonExclusiveMode
415398 ) ;
416399 }
417400
418- let mut batch_partition_key = None ;
401+ let mut batches = HashMap :: new ( ) ;
419402 for key_prefix in & batch. key_prefix_deletions {
420- self . check_batch_partition_key (
421- partition_key_prefix,
422- key_prefix,
423- & mut batch_partition_key,
424- ) ?;
403+ Self :: check_key_size ( key_prefix) ?;
404+ let partition_key = self . get_partition_key ( partition_key_prefix, key_prefix) ?;
405+
406+ let ( batch_query, batch_values) = batches
407+ . entry ( partition_key. clone ( ) )
408+ . or_insert ( ( self . get_sticky_batch_query ( & partition_key) ?, Vec :: new ( ) ) ) ;
409+
410+ match get_upper_bound_option ( key_prefix) {
411+ None => {
412+ batch_query. append_statement ( self . write_batch_delete_prefix_unbounded . clone ( ) ) ;
413+ batch_values. push ( vec ! [ partition_key. clone( ) , key_prefix. clone( ) ] ) ;
414+ }
415+ Some ( upper_bound) => {
416+ batch_query. append_statement ( self . write_batch_delete_prefix_bounded . clone ( ) ) ;
417+ batch_values. push ( vec ! [ partition_key. clone( ) , key_prefix. clone( ) , upper_bound] ) ;
418+ }
419+ }
425420 }
421+
426422 for key in & batch. simple_unordered_batch . deletions {
427- self . check_batch_partition_key ( partition_key_prefix, key, & mut batch_partition_key) ?;
423+ Self :: check_key_size ( key) ?;
424+ let partition_key = self . get_partition_key ( partition_key_prefix, key) ?;
425+ let ( batch_query, batch_values) = batches
426+ . entry ( partition_key. clone ( ) )
427+ . or_insert ( ( self . get_sticky_batch_query ( & partition_key) ?, Vec :: new ( ) ) ) ;
428+
429+ batch_query. append_statement ( self . write_batch_deletion . clone ( ) ) ;
430+ batch_values. push ( vec ! [ partition_key. clone( ) , key. clone( ) ] ) ;
431+ }
432+ for ( key, value) in & batch. simple_unordered_batch . insertions {
433+ Self :: check_key_size ( key) ?;
434+ Self :: check_value_size ( value) ?;
435+ let partition_key = self . get_partition_key ( partition_key_prefix, key) ?;
436+ let ( batch_query, batch_values) = batches
437+ . entry ( partition_key. clone ( ) )
438+ . or_insert ( ( self . get_sticky_batch_query ( & partition_key) ?, Vec :: new ( ) ) ) ;
439+
440+ batch_query. append_statement ( self . write_batch_insertion . clone ( ) ) ;
441+ batch_values. push ( vec ! [ partition_key. clone( ) , key. clone( ) , value. clone( ) ] ) ;
428442 }
429- for ( key, _) in & batch. simple_unordered_batch . insertions {
430- self . check_batch_partition_key ( partition_key_prefix, key, & mut batch_partition_key) ?;
443+
444+ for ( batch, _) in batches. values ( ) {
445+ Self :: check_batch_len ( batch) ?;
431446 }
432447
433- batch_partition_key . ok_or ( ScyllaDbStoreInternalError :: NoPartitionKeyInBatch )
448+ Ok ( batches )
434449 }
435450
436451 async fn read_value_internal (
@@ -675,42 +690,15 @@ impl ScyllaDbClient {
675690 return Ok ( ( ) ) ;
676691 }
677692
678- Self :: check_batch_len ( & batch) ?;
679- let partition_key =
680- self . check_batch_and_partition_keys ( partition_key_prefix, exclusive_mode, & batch) ?;
681- let session = & self . session ;
682- let mut batch_query = self . get_sticky_batch_query ( & partition_key) ?;
683- let mut batch_values: Vec < Vec < Vec < u8 > > > = Vec :: new ( ) ;
684-
685- for key_prefix in batch. key_prefix_deletions {
686- // We'll be always on exclusive mode here, which check_batch_and_partition_keys
687- // guarantees.
688- Self :: check_key_size ( & key_prefix) ?;
689- match get_upper_bound_option ( & key_prefix) {
690- None => {
691- batch_query. append_statement ( self . write_batch_delete_prefix_unbounded . clone ( ) ) ;
692- batch_values. push ( vec ! [ partition_key. clone( ) , key_prefix] ) ;
693- }
694- Some ( upper_bound) => {
695- batch_query. append_statement ( self . write_batch_delete_prefix_bounded . clone ( ) ) ;
696- batch_values. push ( vec ! [ partition_key. clone( ) , key_prefix, upper_bound] ) ;
697- }
698- }
699- }
693+ let batches =
694+ self . get_per_partition_batches ( partition_key_prefix, exclusive_mode, & batch) ?;
700695
701- for key in batch. simple_unordered_batch . deletions {
702- Self :: check_key_size ( & key) ?;
703- batch_query. append_statement ( self . write_batch_deletion . clone ( ) ) ;
704- batch_values. push ( vec ! [ partition_key. clone( ) , key] ) ;
705- }
706- for ( key, value) in batch. simple_unordered_batch . insertions {
707- Self :: check_key_size ( & key) ?;
708- Self :: check_value_size ( & value) ?;
709- batch_query. append_statement ( self . write_batch_insertion . clone ( ) ) ;
710- batch_values. push ( vec ! [ partition_key. clone( ) , key, value] ) ;
696+ let mut futures = Vec :: new ( ) ;
697+ for ( _, ( batch, batch_values) ) in batches {
698+ futures. push ( async move { self . session . batch ( & batch, batch_values) . await } ) ;
711699 }
712700
713- session . batch ( & batch_query , batch_values ) . await ?;
701+ try_join_all ( futures ) . await ?;
714702 Ok ( ( ) )
715703 }
716704
0 commit comments