Skip to content

Commit bc6b089

Browse files
committed
Add type_id as primary key to ScyllaDb schema
1 parent 8493ce6 commit bc6b089

File tree

1 file changed

+131
-54
lines changed

1 file changed

+131
-54
lines changed

linera-views/src/backends/scylla_db.rs

Lines changed: 131 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
9393
/// correct.
9494
const MAX_BATCH_SIZE: usize = 5000;
9595

96+
type OccurencesMap = HashMap<Vec<u8>, Vec<usize>>;
97+
type OccurencesMapByType = HashMap<u8, OccurencesMap>;
98+
9699
/// The client for ScyllaDB:
97100
/// * The session allows to pass queries
98101
/// * The namespace that is being assigned to the database
@@ -126,7 +129,7 @@ impl ScyllaDbClient {
126129
.collect::<Vec<_>>()
127130
.join(",");
128131
let query = format!(
129-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})",
132+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
130133
self.namespace, markers
131134
);
132135
let prepared_statement = self.session.prepare(query).await?;
@@ -146,7 +149,7 @@ impl ScyllaDbClient {
146149
.collect::<Vec<_>>()
147150
.join(",");
148151
let query = format!(
149-
"SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})",
152+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
150153
self.namespace, markers
151154
);
152155
let prepared_statement = self.session.prepare(query).await?;
@@ -183,70 +186,70 @@ impl ScyllaDbClient {
183186
let namespace = namespace.to_string();
184187
let read_value = session
185188
.prepare(format!(
186-
"SELECT v FROM kv.{} WHERE root_key = ? AND k = ?",
189+
"SELECT v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
187190
namespace
188191
))
189192
.await?;
190193

191194
let contains_key = session
192195
.prepare(format!(
193-
"SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?",
196+
"SELECT root_key FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
194197
namespace
195198
))
196199
.await?;
197200

198201
let write_batch_delete_prefix_unbounded = session
199202
.prepare(format!(
200-
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ?",
203+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
201204
namespace
202205
))
203206
.await?;
204207

205208
let write_batch_delete_prefix_bounded = session
206209
.prepare(format!(
207-
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
210+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
208211
namespace
209212
))
210213
.await?;
211214

212215
let write_batch_deletion = session
213216
.prepare(format!(
214-
"DELETE FROM kv.{} WHERE root_key = ? AND k = ?",
217+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
215218
namespace
216219
))
217220
.await?;
218221

219222
let write_batch_insertion = session
220223
.prepare(format!(
221-
"INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)",
224+
"INSERT INTO kv.{} (root_key, type_id, k, v) VALUES (?, ?, ?, ?)",
222225
namespace
223226
))
224227
.await?;
225228

226229
let find_keys_by_prefix_unbounded = session
227230
.prepare(format!(
228-
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?",
231+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
229232
namespace
230233
))
231234
.await?;
232235

233236
let find_keys_by_prefix_bounded = session
234237
.prepare(format!(
235-
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
238+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
236239
namespace
237240
))
238241
.await?;
239242

240243
let find_key_values_by_prefix_unbounded = session
241244
.prepare(format!(
242-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?",
245+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
243246
namespace
244247
))
245248
.await?;
246249

247250
let find_key_values_by_prefix_bounded = session
248251
.prepare(format!(
249-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
252+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
250253
namespace
251254
))
252255
.await?;
@@ -277,7 +280,7 @@ impl ScyllaDbClient {
277280
Self::check_key_size(&key)?;
278281
let session = &self.session;
279282
// Read the value of a key
280-
let values = (root_key.to_vec(), key);
283+
let values = (root_key.to_vec(), vec![key[0]], key);
281284

282285
let (result, _) = session
283286
.execute_single_page(&self.read_value, &values, PagingState::start())
@@ -292,13 +295,23 @@ impl ScyllaDbClient {
292295

293296
fn get_occurences_map(
294297
keys: Vec<Vec<u8>>,
295-
) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
296-
let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
298+
) -> Result<OccurencesMapByType, ScyllaDbStoreInternalError> {
299+
let mut map = OccurencesMapByType::new();
297300
for (i_key, key) in keys.into_iter().enumerate() {
298301
Self::check_key_size(&key)?;
299-
map.entry(key)
300-
.and_modify(|entry| entry.push(i_key))
301-
.or_insert(vec![i_key]);
302+
let type_id = key[0];
303+
map.entry(type_id)
304+
.and_modify(|entry| {
305+
entry
306+
.entry(key.clone())
307+
.and_modify(|entry| entry.push(i_key))
308+
.or_insert(vec![i_key]);
309+
})
310+
.or_insert_with(|| {
311+
let mut map = HashMap::new();
312+
map.insert(key, vec![i_key]);
313+
map
314+
});
302315
}
303316
Ok(map)
304317
}
@@ -310,21 +323,48 @@ impl ScyllaDbClient {
310323
) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
311324
let mut values = vec![None; keys.len()];
312325
let map = Self::get_occurences_map(keys)?;
313-
let statement = self.get_multi_key_values_statement(map.len()).await?;
314-
let mut inputs = vec![root_key.to_vec()];
315-
inputs.extend(map.keys().cloned());
316-
let mut rows = self
317-
.session
318-
.execute_iter(statement, &inputs)
319-
.await?
320-
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
326+
let statements = map
327+
.iter()
328+
.map(|(type_id, map)| async {
329+
let statement = self.get_multi_key_values_statement(map.len()).await?;
330+
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
331+
inputs.extend(map.keys().cloned());
332+
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
333+
})
334+
.collect::<Vec<_>>();
335+
let statements = try_join_all(statements).await?;
336+
337+
let mut futures = Vec::new();
338+
let map_ref = &map;
339+
for (type_id, statement, inputs) in statements {
340+
futures.push(async move {
341+
let mut rows = self
342+
.session
343+
.execute_iter(statement, &inputs)
344+
.await?
345+
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
346+
let mut value_pairs = Vec::new();
347+
while let Some(row) = rows.next().await {
348+
let (key, value) = row?;
349+
for i_key in map_ref
350+
.get(&type_id)
351+
.expect("type_id is supposed to be in map")
352+
.get(&key)
353+
.expect("key is supposed to be in map")
354+
{
355+
value_pairs.push((*i_key, value.clone()));
356+
}
357+
}
321358

322-
while let Some(row) = rows.next().await {
323-
let (key, value) = row?;
324-
for i_key in map.get(&key).expect("key is supposed to be in map") {
325-
values[*i_key] = Some(value.clone());
326-
}
359+
Ok::<_, ScyllaDbStoreInternalError>(value_pairs)
360+
});
361+
}
362+
363+
let values_pairs = try_join_all(futures).await?;
364+
for (i_key, value) in values_pairs.iter().flatten() {
365+
values[*i_key] = Some(value.clone());
327366
}
367+
328368
Ok(values)
329369
}
330370

@@ -335,20 +375,46 @@ impl ScyllaDbClient {
335375
) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
336376
let mut values = vec![false; keys.len()];
337377
let map = Self::get_occurences_map(keys)?;
338-
let statement = self.get_multi_keys_statement(map.len()).await?;
339-
let mut inputs = vec![root_key.to_vec()];
340-
inputs.extend(map.keys().cloned());
341-
let mut rows = self
342-
.session
343-
.execute_iter(statement, &inputs)
344-
.await?
345-
.rows_stream::<(Vec<u8>,)>()?;
378+
let statements = map
379+
.iter()
380+
.map(|(type_id, map)| async {
381+
let statement = self.get_multi_keys_statement(map.len()).await?;
382+
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
383+
inputs.extend(map.keys().cloned());
384+
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
385+
})
386+
.collect::<Vec<_>>();
387+
let statements = try_join_all(statements).await?;
388+
389+
let mut futures = Vec::new();
390+
let map_ref = &map;
391+
for (type_id, statement, inputs) in statements {
392+
futures.push(async move {
393+
let mut rows = self
394+
.session
395+
.execute_iter(statement, &inputs)
396+
.await?
397+
.rows_stream::<(Vec<u8>,)>()?;
398+
let mut keys = Vec::new();
399+
while let Some(row) = rows.next().await {
400+
let (key,) = row?;
401+
for i_key in map_ref
402+
.get(&type_id)
403+
.expect("type_id is supposed to be in map")
404+
.get(&key)
405+
.expect("key is supposed to be in map")
406+
{
407+
keys.push(*i_key);
408+
}
409+
}
346410

347-
while let Some(row) = rows.next().await {
348-
let (key,) = row?;
349-
for i_key in map.get(&key).expect("key is supposed to be in map") {
350-
values[*i_key] = true;
351-
}
411+
Ok::<_, ScyllaDbStoreInternalError>(keys)
412+
});
413+
}
414+
let keys = try_join_all(futures).await?;
415+
416+
for i_key in keys.iter().flatten() {
417+
values[*i_key] = true;
352418
}
353419

354420
Ok(values)
@@ -362,7 +428,7 @@ impl ScyllaDbClient {
362428
Self::check_key_size(&key)?;
363429
let session = &self.session;
364430
// Read the value of a key
365-
let values = (root_key.to_vec(), key);
431+
let values = (root_key.to_vec(), vec![key[0]], key);
366432

367433
let (result, _) = session
368434
.execute_single_page(&self.contains_key, &values, PagingState::start())
@@ -387,7 +453,7 @@ impl ScyllaDbClient {
387453
match get_upper_bound_option(&key_prefix) {
388454
None => {
389455
let prepared_statement = &self.write_batch_delete_prefix_unbounded;
390-
let values = (root_key.clone(), key_prefix);
456+
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix);
391457
futures.push(Box::pin(async move {
392458
session
393459
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -398,7 +464,7 @@ impl ScyllaDbClient {
398464
}
399465
Some(upper) => {
400466
let prepared_statement = &self.write_batch_delete_prefix_bounded;
401-
let values = (root_key.clone(), key_prefix, upper);
467+
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix, upper);
402468
futures.push(Box::pin(async move {
403469
session
404470
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -415,7 +481,7 @@ impl ScyllaDbClient {
415481
for key in batch.simple_unordered_batch.deletions {
416482
Self::check_key_size(&key)?;
417483
let prepared_statement = &self.write_batch_deletion;
418-
let values = (root_key.clone(), key);
484+
let values = (root_key.clone(), vec![key[0]], key);
419485
futures.push(Box::pin(async move {
420486
session
421487
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -429,7 +495,7 @@ impl ScyllaDbClient {
429495
Self::check_key_size(&key)?;
430496
Self::check_value_size(&value)?;
431497
let prepared_statement = &self.write_batch_insertion;
432-
let values = (root_key.clone(), key, value);
498+
let values = (root_key.clone(), vec![key[0]], key, value);
433499
futures.push(Box::pin(async move {
434500
session
435501
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -456,13 +522,18 @@ impl ScyllaDbClient {
456522
let query_bounded = &self.find_keys_by_prefix_bounded;
457523
let rows = match get_upper_bound_option(&key_prefix) {
458524
None => {
459-
let values = (root_key.to_vec(), key_prefix.clone());
525+
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
460526
session
461527
.execute_iter(query_unbounded.clone(), values)
462528
.await?
463529
}
464530
Some(upper_bound) => {
465-
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
531+
let values = (
532+
root_key.to_vec(),
533+
vec![key_prefix[0]],
534+
key_prefix.clone(),
535+
upper_bound,
536+
);
466537
session.execute_iter(query_bounded.clone(), values).await?
467538
}
468539
};
@@ -489,13 +560,18 @@ impl ScyllaDbClient {
489560
let query_bounded = &self.find_key_values_by_prefix_bounded;
490561
let rows = match get_upper_bound_option(&key_prefix) {
491562
None => {
492-
let values = (root_key.to_vec(), key_prefix.clone());
563+
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
493564
session
494565
.execute_iter(query_unbounded.clone(), values)
495566
.await?
496567
}
497568
Some(upper_bound) => {
498-
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
569+
let values = (
570+
root_key.to_vec(),
571+
vec![key_prefix[0]],
572+
key_prefix.clone(),
573+
upper_bound,
574+
);
499575
session.execute_iter(query_bounded.clone(), values).await?
500576
}
501577
};
@@ -912,9 +988,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal {
912988
.prepare(format!(
913989
"CREATE TABLE kv.{} (\
914990
root_key blob, \
991+
type_id tinyint, \
915992
k blob, \
916993
v blob, \
917-
PRIMARY KEY (root_key, k) \
994+
PRIMARY KEY ((root_key, type_id), k) \
918995
) \
919996
WITH compaction = {{ \
920997
'class' : 'SizeTieredCompactionStrategy', \

0 commit comments

Comments
 (0)