@@ -93,6 +93,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
9393/// correct.
9494const MAX_BATCH_SIZE : usize = 5000 ;
9595
96+ type OccurencesMap = HashMap < Vec < u8 > , Vec < usize > > ;
97+ type OccurencesMapByType = HashMap < u8 , OccurencesMap > ;
98+
9699/// The client for ScyllaDB:
97100/// * The session allows to pass queries
98101/// * The namespace that is being assigned to the database
@@ -126,7 +129,7 @@ impl ScyllaDbClient {
126129 . collect :: < Vec < _ > > ( )
127130 . join ( "," ) ;
128131 let query = format ! (
129- "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})" ,
132+ "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})" ,
130133 self . namespace, markers
131134 ) ;
132135 let prepared_statement = self . session . prepare ( query) . await ?;
@@ -146,7 +149,7 @@ impl ScyllaDbClient {
146149 . collect :: < Vec < _ > > ( )
147150 . join ( "," ) ;
148151 let query = format ! (
149- "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})" ,
152+ "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})" ,
150153 self . namespace, markers
151154 ) ;
152155 let prepared_statement = self . session . prepare ( query) . await ?;
@@ -183,70 +186,70 @@ impl ScyllaDbClient {
183186 let namespace = namespace. to_string ( ) ;
184187 let read_value = session
185188 . prepare ( format ! (
186- "SELECT v FROM kv.{} WHERE root_key = ? AND k = ?" ,
189+ "SELECT v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?" ,
187190 namespace
188191 ) )
189192 . await ?;
190193
191194 let contains_key = session
192195 . prepare ( format ! (
193- "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?" ,
196+ "SELECT root_key FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?" ,
194197 namespace
195198 ) )
196199 . await ?;
197200
198201 let write_batch_delete_prefix_unbounded = session
199202 . prepare ( format ! (
200- "DELETE FROM kv.{} WHERE root_key = ? AND k >= ?" ,
203+ "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?" ,
201204 namespace
202205 ) )
203206 . await ?;
204207
205208 let write_batch_delete_prefix_bounded = session
206209 . prepare ( format ! (
207- "DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?" ,
210+ "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?" ,
208211 namespace
209212 ) )
210213 . await ?;
211214
212215 let write_batch_deletion = session
213216 . prepare ( format ! (
214- "DELETE FROM kv.{} WHERE root_key = ? AND k = ?" ,
217+ "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?" ,
215218 namespace
216219 ) )
217220 . await ?;
218221
219222 let write_batch_insertion = session
220223 . prepare ( format ! (
221- "INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)" ,
224+ "INSERT INTO kv.{} (root_key, type_id, k, v) VALUES (?, ?, ?, ?)" ,
222225 namespace
223226 ) )
224227 . await ?;
225228
226229 let find_keys_by_prefix_unbounded = session
227230 . prepare ( format ! (
228- "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?" ,
231+ "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?" ,
229232 namespace
230233 ) )
231234 . await ?;
232235
233236 let find_keys_by_prefix_bounded = session
234237 . prepare ( format ! (
235- "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?" ,
238+ "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?" ,
236239 namespace
237240 ) )
238241 . await ?;
239242
240243 let find_key_values_by_prefix_unbounded = session
241244 . prepare ( format ! (
242- "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?" ,
245+ "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?" ,
243246 namespace
244247 ) )
245248 . await ?;
246249
247250 let find_key_values_by_prefix_bounded = session
248251 . prepare ( format ! (
249- "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?" ,
252+ "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?" ,
250253 namespace
251254 ) )
252255 . await ?;
@@ -277,7 +280,7 @@ impl ScyllaDbClient {
277280 Self :: check_key_size ( & key) ?;
278281 let session = & self . session ;
279282 // Read the value of a key
280- let values = ( root_key. to_vec ( ) , key) ;
283+ let values = ( root_key. to_vec ( ) , vec ! [ key [ 0 ] ] , key) ;
281284
282285 let ( result, _) = session
283286 . execute_single_page ( & self . read_value , & values, PagingState :: start ( ) )
@@ -292,11 +295,16 @@ impl ScyllaDbClient {
292295
293296 fn get_occurences_map (
294297 keys : Vec < Vec < u8 > > ,
295- ) -> Result < HashMap < Vec < u8 > , Vec < usize > > , ScyllaDbStoreInternalError > {
296- let mut map = HashMap :: < Vec < u8 > , Vec < usize > > :: new ( ) ;
298+ ) -> Result < OccurencesMapByType , ScyllaDbStoreInternalError > {
299+ let mut map = OccurencesMapByType :: new ( ) ;
297300 for ( i_key, key) in keys. into_iter ( ) . enumerate ( ) {
298301 Self :: check_key_size ( & key) ?;
299- map. entry ( key) . or_default ( ) . push ( i_key) ;
302+ let type_id = key[ 0 ] ;
303+ map. entry ( type_id)
304+ . or_default ( )
305+ . entry ( key)
306+ . or_default ( )
307+ . push ( i_key) ;
300308 }
301309 Ok ( map)
302310 }
@@ -308,21 +316,48 @@ impl ScyllaDbClient {
308316 ) -> Result < Vec < Option < Vec < u8 > > > , ScyllaDbStoreInternalError > {
309317 let mut values = vec ! [ None ; keys. len( ) ] ;
310318 let map = Self :: get_occurences_map ( keys) ?;
311- let statement = self . get_multi_key_values_statement ( map. len ( ) ) . await ?;
312- let mut inputs = vec ! [ root_key. to_vec( ) ] ;
313- inputs. extend ( map. keys ( ) . cloned ( ) ) ;
314- let mut rows = self
315- . session
316- . execute_iter ( statement, & inputs)
317- . await ?
318- . rows_stream :: < ( Vec < u8 > , Vec < u8 > ) > ( ) ?;
319+ let statements = map
320+ . iter ( )
321+ . map ( |( type_id, map) | async {
322+ let statement = self . get_multi_key_values_statement ( map. len ( ) ) . await ?;
323+ let mut inputs = vec ! [ root_key. to_vec( ) , vec![ * type_id] ] ;
324+ inputs. extend ( map. keys ( ) . cloned ( ) ) ;
325+ Ok :: < _ , ScyllaDbStoreInternalError > ( ( * type_id, statement, inputs) )
326+ } )
327+ . collect :: < Vec < _ > > ( ) ;
328+ let statements = try_join_all ( statements) . await ?;
329+
330+ let mut futures = Vec :: new ( ) ;
331+ let map_ref = & map;
332+ for ( type_id, statement, inputs) in statements {
333+ futures. push ( async move {
334+ let mut rows = self
335+ . session
336+ . execute_iter ( statement, & inputs)
337+ . await ?
338+ . rows_stream :: < ( Vec < u8 > , Vec < u8 > ) > ( ) ?;
339+ let mut value_pairs = Vec :: new ( ) ;
340+ while let Some ( row) = rows. next ( ) . await {
341+ let ( key, value) = row?;
342+ for i_key in map_ref
343+ . get ( & type_id)
344+ . expect ( "type_id is supposed to be in map" )
345+ . get ( & key)
346+ . expect ( "key is supposed to be in map" )
347+ {
348+ value_pairs. push ( ( * i_key, value. clone ( ) ) ) ;
349+ }
350+ }
319351
320- while let Some ( row) = rows. next ( ) . await {
321- let ( key, value) = row?;
322- for i_key in map. get ( & key) . expect ( "key is supposed to be in map" ) {
323- values[ * i_key] = Some ( value. clone ( ) ) ;
324- }
352+ Ok :: < _ , ScyllaDbStoreInternalError > ( value_pairs)
353+ } ) ;
354+ }
355+
356+ let values_pairs = try_join_all ( futures) . await ?;
357+ for ( i_key, value) in values_pairs. iter ( ) . flatten ( ) {
358+ values[ * i_key] = Some ( value. clone ( ) ) ;
325359 }
360+
326361 Ok ( values)
327362 }
328363
@@ -333,20 +368,46 @@ impl ScyllaDbClient {
333368 ) -> Result < Vec < bool > , ScyllaDbStoreInternalError > {
334369 let mut values = vec ! [ false ; keys. len( ) ] ;
335370 let map = Self :: get_occurences_map ( keys) ?;
336- let statement = self . get_multi_keys_statement ( map. len ( ) ) . await ?;
337- let mut inputs = vec ! [ root_key. to_vec( ) ] ;
338- inputs. extend ( map. keys ( ) . cloned ( ) ) ;
339- let mut rows = self
340- . session
341- . execute_iter ( statement, & inputs)
342- . await ?
343- . rows_stream :: < ( Vec < u8 > , ) > ( ) ?;
371+ let statements = map
372+ . iter ( )
373+ . map ( |( type_id, map) | async {
374+ let statement = self . get_multi_keys_statement ( map. len ( ) ) . await ?;
375+ let mut inputs = vec ! [ root_key. to_vec( ) , vec![ * type_id] ] ;
376+ inputs. extend ( map. keys ( ) . cloned ( ) ) ;
377+ Ok :: < _ , ScyllaDbStoreInternalError > ( ( * type_id, statement, inputs) )
378+ } )
379+ . collect :: < Vec < _ > > ( ) ;
380+ let statements = try_join_all ( statements) . await ?;
381+
382+ let mut futures = Vec :: new ( ) ;
383+ let map_ref = & map;
384+ for ( type_id, statement, inputs) in statements {
385+ futures. push ( async move {
386+ let mut rows = self
387+ . session
388+ . execute_iter ( statement, & inputs)
389+ . await ?
390+ . rows_stream :: < ( Vec < u8 > , ) > ( ) ?;
391+ let mut keys = Vec :: new ( ) ;
392+ while let Some ( row) = rows. next ( ) . await {
393+ let ( key, ) = row?;
394+ for i_key in map_ref
395+ . get ( & type_id)
396+ . expect ( "type_id is supposed to be in map" )
397+ . get ( & key)
398+ . expect ( "key is supposed to be in map" )
399+ {
400+ keys. push ( * i_key) ;
401+ }
402+ }
344403
345- while let Some ( row) = rows. next ( ) . await {
346- let ( key, ) = row?;
347- for i_key in map. get ( & key) . expect ( "key is supposed to be in map" ) {
348- values[ * i_key] = true ;
349- }
404+ Ok :: < _ , ScyllaDbStoreInternalError > ( keys)
405+ } ) ;
406+ }
407+ let keys = try_join_all ( futures) . await ?;
408+
409+ for i_key in keys. iter ( ) . flatten ( ) {
410+ values[ * i_key] = true ;
350411 }
351412
352413 Ok ( values)
@@ -360,7 +421,7 @@ impl ScyllaDbClient {
360421 Self :: check_key_size ( & key) ?;
361422 let session = & self . session ;
362423 // Read the value of a key
363- let values = ( root_key. to_vec ( ) , key) ;
424+ let values = ( root_key. to_vec ( ) , vec ! [ key [ 0 ] ] , key) ;
364425
365426 let ( result, _) = session
366427 . execute_single_page ( & self . contains_key , & values, PagingState :: start ( ) )
@@ -385,7 +446,7 @@ impl ScyllaDbClient {
385446 match get_upper_bound_option ( & key_prefix) {
386447 None => {
387448 let prepared_statement = & self . write_batch_delete_prefix_unbounded ;
388- let values = ( root_key. clone ( ) , key_prefix) ;
449+ let values = ( root_key. clone ( ) , vec ! [ key_prefix [ 0 ] ] , key_prefix) ;
389450 futures. push ( Box :: pin ( async move {
390451 session
391452 . execute_single_page ( prepared_statement, values, PagingState :: start ( ) )
@@ -396,7 +457,7 @@ impl ScyllaDbClient {
396457 }
397458 Some ( upper) => {
398459 let prepared_statement = & self . write_batch_delete_prefix_bounded ;
399- let values = ( root_key. clone ( ) , key_prefix, upper) ;
460+ let values = ( root_key. clone ( ) , vec ! [ key_prefix [ 0 ] ] , key_prefix, upper) ;
400461 futures. push ( Box :: pin ( async move {
401462 session
402463 . execute_single_page ( prepared_statement, values, PagingState :: start ( ) )
@@ -413,7 +474,7 @@ impl ScyllaDbClient {
413474 for key in batch. simple_unordered_batch . deletions {
414475 Self :: check_key_size ( & key) ?;
415476 let prepared_statement = & self . write_batch_deletion ;
416- let values = ( root_key. clone ( ) , key) ;
477+ let values = ( root_key. clone ( ) , vec ! [ key [ 0 ] ] , key) ;
417478 futures. push ( Box :: pin ( async move {
418479 session
419480 . execute_single_page ( prepared_statement, values, PagingState :: start ( ) )
@@ -427,7 +488,7 @@ impl ScyllaDbClient {
427488 Self :: check_key_size ( & key) ?;
428489 Self :: check_value_size ( & value) ?;
429490 let prepared_statement = & self . write_batch_insertion ;
430- let values = ( root_key. clone ( ) , key, value) ;
491+ let values = ( root_key. clone ( ) , vec ! [ key [ 0 ] ] , key, value) ;
431492 futures. push ( Box :: pin ( async move {
432493 session
433494 . execute_single_page ( prepared_statement, values, PagingState :: start ( ) )
@@ -454,13 +515,18 @@ impl ScyllaDbClient {
454515 let query_bounded = & self . find_keys_by_prefix_bounded ;
455516 let rows = match get_upper_bound_option ( & key_prefix) {
456517 None => {
457- let values = ( root_key. to_vec ( ) , key_prefix. clone ( ) ) ;
518+ let values = ( root_key. to_vec ( ) , vec ! [ key_prefix [ 0 ] ] , key_prefix. clone ( ) ) ;
458519 session
459520 . execute_iter ( query_unbounded. clone ( ) , values)
460521 . await ?
461522 }
462523 Some ( upper_bound) => {
463- let values = ( root_key. to_vec ( ) , key_prefix. clone ( ) , upper_bound) ;
524+ let values = (
525+ root_key. to_vec ( ) ,
526+ vec ! [ key_prefix[ 0 ] ] ,
527+ key_prefix. clone ( ) ,
528+ upper_bound,
529+ ) ;
464530 session. execute_iter ( query_bounded. clone ( ) , values) . await ?
465531 }
466532 } ;
@@ -487,13 +553,18 @@ impl ScyllaDbClient {
487553 let query_bounded = & self . find_key_values_by_prefix_bounded ;
488554 let rows = match get_upper_bound_option ( & key_prefix) {
489555 None => {
490- let values = ( root_key. to_vec ( ) , key_prefix. clone ( ) ) ;
556+ let values = ( root_key. to_vec ( ) , vec ! [ key_prefix [ 0 ] ] , key_prefix. clone ( ) ) ;
491557 session
492558 . execute_iter ( query_unbounded. clone ( ) , values)
493559 . await ?
494560 }
495561 Some ( upper_bound) => {
496- let values = ( root_key. to_vec ( ) , key_prefix. clone ( ) , upper_bound) ;
562+ let values = (
563+ root_key. to_vec ( ) ,
564+ vec ! [ key_prefix[ 0 ] ] ,
565+ key_prefix. clone ( ) ,
566+ upper_bound,
567+ ) ;
497568 session. execute_iter ( query_bounded. clone ( ) , values) . await ?
498569 }
499570 } ;
@@ -910,9 +981,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal {
910981 . prepare ( format ! (
911982 "CREATE TABLE kv.{} (\
912983 root_key blob, \
984+ type_id blob, \
913985 k blob, \
914986 v blob, \
915- PRIMARY KEY (root_key, k) \
987+ PRIMARY KEY (( root_key, type_id) , k) \
916988 ) \
917989 WITH compaction = {{ \
918990 'class' : 'SizeTieredCompactionStrategy', \
0 commit comments