Skip to content

Commit f18a454

Browse files
committed
Add type_id as primary key to ScyllaDb schema
1 parent 8493ce6 commit f18a454

File tree

1 file changed

+124
-54
lines changed

1 file changed

+124
-54
lines changed

linera-views/src/backends/scylla_db.rs

Lines changed: 124 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
9393
/// correct.
9494
const MAX_BATCH_SIZE: usize = 5000;
9595

96+
type OccurencesMap = HashMap<Vec<u8>, Vec<usize>>;
97+
type OccurencesMapByType = HashMap<u8, OccurencesMap>;
98+
9699
/// The client for ScyllaDB:
97100
/// * The session allows to pass queries
98101
/// * The namespace that is being assigned to the database
@@ -126,7 +129,7 @@ impl ScyllaDbClient {
126129
.collect::<Vec<_>>()
127130
.join(",");
128131
let query = format!(
129-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})",
132+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
130133
self.namespace, markers
131134
);
132135
let prepared_statement = self.session.prepare(query).await?;
@@ -146,7 +149,7 @@ impl ScyllaDbClient {
146149
.collect::<Vec<_>>()
147150
.join(",");
148151
let query = format!(
149-
"SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})",
152+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})",
150153
self.namespace, markers
151154
);
152155
let prepared_statement = self.session.prepare(query).await?;
@@ -183,70 +186,70 @@ impl ScyllaDbClient {
183186
let namespace = namespace.to_string();
184187
let read_value = session
185188
.prepare(format!(
186-
"SELECT v FROM kv.{} WHERE root_key = ? AND k = ?",
189+
"SELECT v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
187190
namespace
188191
))
189192
.await?;
190193

191194
let contains_key = session
192195
.prepare(format!(
193-
"SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?",
196+
"SELECT root_key FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
194197
namespace
195198
))
196199
.await?;
197200

198201
let write_batch_delete_prefix_unbounded = session
199202
.prepare(format!(
200-
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ?",
203+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
201204
namespace
202205
))
203206
.await?;
204207

205208
let write_batch_delete_prefix_bounded = session
206209
.prepare(format!(
207-
"DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
210+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
208211
namespace
209212
))
210213
.await?;
211214

212215
let write_batch_deletion = session
213216
.prepare(format!(
214-
"DELETE FROM kv.{} WHERE root_key = ? AND k = ?",
217+
"DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?",
215218
namespace
216219
))
217220
.await?;
218221

219222
let write_batch_insertion = session
220223
.prepare(format!(
221-
"INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)",
224+
"INSERT INTO kv.{} (root_key, type_id, k, v) VALUES (?, ?, ?, ?)",
222225
namespace
223226
))
224227
.await?;
225228

226229
let find_keys_by_prefix_unbounded = session
227230
.prepare(format!(
228-
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?",
231+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
229232
namespace
230233
))
231234
.await?;
232235

233236
let find_keys_by_prefix_bounded = session
234237
.prepare(format!(
235-
"SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
238+
"SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
236239
namespace
237240
))
238241
.await?;
239242

240243
let find_key_values_by_prefix_unbounded = session
241244
.prepare(format!(
242-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?",
245+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?",
243246
namespace
244247
))
245248
.await?;
246249

247250
let find_key_values_by_prefix_bounded = session
248251
.prepare(format!(
249-
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?",
252+
"SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?",
250253
namespace
251254
))
252255
.await?;
@@ -277,7 +280,7 @@ impl ScyllaDbClient {
277280
Self::check_key_size(&key)?;
278281
let session = &self.session;
279282
// Read the value of a key
280-
let values = (root_key.to_vec(), key);
283+
let values = (root_key.to_vec(), vec![key[0]], key);
281284

282285
let (result, _) = session
283286
.execute_single_page(&self.read_value, &values, PagingState::start())
@@ -292,13 +295,16 @@ impl ScyllaDbClient {
292295

293296
fn get_occurences_map(
294297
keys: Vec<Vec<u8>>,
295-
) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
296-
let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
298+
) -> Result<OccurencesMapByType, ScyllaDbStoreInternalError> {
299+
let mut map = OccurencesMapByType::new();
297300
for (i_key, key) in keys.into_iter().enumerate() {
298301
Self::check_key_size(&key)?;
299-
map.entry(key)
300-
.and_modify(|entry| entry.push(i_key))
301-
.or_insert(vec![i_key]);
302+
let type_id = key[0];
303+
map.entry(type_id)
304+
.or_default()
305+
.entry(key)
306+
.or_default()
307+
.push(i_key);
302308
}
303309
Ok(map)
304310
}
@@ -310,21 +316,48 @@ impl ScyllaDbClient {
310316
) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
311317
let mut values = vec![None; keys.len()];
312318
let map = Self::get_occurences_map(keys)?;
313-
let statement = self.get_multi_key_values_statement(map.len()).await?;
314-
let mut inputs = vec![root_key.to_vec()];
315-
inputs.extend(map.keys().cloned());
316-
let mut rows = self
317-
.session
318-
.execute_iter(statement, &inputs)
319-
.await?
320-
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
319+
let statements = map
320+
.iter()
321+
.map(|(type_id, map)| async {
322+
let statement = self.get_multi_key_values_statement(map.len()).await?;
323+
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
324+
inputs.extend(map.keys().cloned());
325+
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
326+
})
327+
.collect::<Vec<_>>();
328+
let statements = try_join_all(statements).await?;
329+
330+
let mut futures = Vec::new();
331+
let map_ref = &map;
332+
for (type_id, statement, inputs) in statements {
333+
futures.push(async move {
334+
let mut rows = self
335+
.session
336+
.execute_iter(statement, &inputs)
337+
.await?
338+
.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
339+
let mut value_pairs = Vec::new();
340+
while let Some(row) = rows.next().await {
341+
let (key, value) = row?;
342+
for i_key in map_ref
343+
.get(&type_id)
344+
.expect("type_id is supposed to be in map")
345+
.get(&key)
346+
.expect("key is supposed to be in map")
347+
{
348+
value_pairs.push((*i_key, value.clone()));
349+
}
350+
}
321351

322-
while let Some(row) = rows.next().await {
323-
let (key, value) = row?;
324-
for i_key in map.get(&key).expect("key is supposed to be in map") {
325-
values[*i_key] = Some(value.clone());
326-
}
352+
Ok::<_, ScyllaDbStoreInternalError>(value_pairs)
353+
});
354+
}
355+
356+
let values_pairs = try_join_all(futures).await?;
357+
for (i_key, value) in values_pairs.iter().flatten() {
358+
values[*i_key] = Some(value.clone());
327359
}
360+
328361
Ok(values)
329362
}
330363

@@ -335,20 +368,46 @@ impl ScyllaDbClient {
335368
) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
336369
let mut values = vec![false; keys.len()];
337370
let map = Self::get_occurences_map(keys)?;
338-
let statement = self.get_multi_keys_statement(map.len()).await?;
339-
let mut inputs = vec![root_key.to_vec()];
340-
inputs.extend(map.keys().cloned());
341-
let mut rows = self
342-
.session
343-
.execute_iter(statement, &inputs)
344-
.await?
345-
.rows_stream::<(Vec<u8>,)>()?;
371+
let statements = map
372+
.iter()
373+
.map(|(type_id, map)| async {
374+
let statement = self.get_multi_keys_statement(map.len()).await?;
375+
let mut inputs = vec![root_key.to_vec(), vec![*type_id]];
376+
inputs.extend(map.keys().cloned());
377+
Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs))
378+
})
379+
.collect::<Vec<_>>();
380+
let statements = try_join_all(statements).await?;
381+
382+
let mut futures = Vec::new();
383+
let map_ref = &map;
384+
for (type_id, statement, inputs) in statements {
385+
futures.push(async move {
386+
let mut rows = self
387+
.session
388+
.execute_iter(statement, &inputs)
389+
.await?
390+
.rows_stream::<(Vec<u8>,)>()?;
391+
let mut keys = Vec::new();
392+
while let Some(row) = rows.next().await {
393+
let (key,) = row?;
394+
for i_key in map_ref
395+
.get(&type_id)
396+
.expect("type_id is supposed to be in map")
397+
.get(&key)
398+
.expect("key is supposed to be in map")
399+
{
400+
keys.push(*i_key);
401+
}
402+
}
346403

347-
while let Some(row) = rows.next().await {
348-
let (key,) = row?;
349-
for i_key in map.get(&key).expect("key is supposed to be in map") {
350-
values[*i_key] = true;
351-
}
404+
Ok::<_, ScyllaDbStoreInternalError>(keys)
405+
});
406+
}
407+
let keys = try_join_all(futures).await?;
408+
409+
for i_key in keys.iter().flatten() {
410+
values[*i_key] = true;
352411
}
353412

354413
Ok(values)
@@ -362,7 +421,7 @@ impl ScyllaDbClient {
362421
Self::check_key_size(&key)?;
363422
let session = &self.session;
364423
// Read the value of a key
365-
let values = (root_key.to_vec(), key);
424+
let values = (root_key.to_vec(), vec![key[0]], key);
366425

367426
let (result, _) = session
368427
.execute_single_page(&self.contains_key, &values, PagingState::start())
@@ -387,7 +446,7 @@ impl ScyllaDbClient {
387446
match get_upper_bound_option(&key_prefix) {
388447
None => {
389448
let prepared_statement = &self.write_batch_delete_prefix_unbounded;
390-
let values = (root_key.clone(), key_prefix);
449+
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix);
391450
futures.push(Box::pin(async move {
392451
session
393452
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -398,7 +457,7 @@ impl ScyllaDbClient {
398457
}
399458
Some(upper) => {
400459
let prepared_statement = &self.write_batch_delete_prefix_bounded;
401-
let values = (root_key.clone(), key_prefix, upper);
460+
let values = (root_key.clone(), vec![key_prefix[0]], key_prefix, upper);
402461
futures.push(Box::pin(async move {
403462
session
404463
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -415,7 +474,7 @@ impl ScyllaDbClient {
415474
for key in batch.simple_unordered_batch.deletions {
416475
Self::check_key_size(&key)?;
417476
let prepared_statement = &self.write_batch_deletion;
418-
let values = (root_key.clone(), key);
477+
let values = (root_key.clone(), vec![key[0]], key);
419478
futures.push(Box::pin(async move {
420479
session
421480
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -429,7 +488,7 @@ impl ScyllaDbClient {
429488
Self::check_key_size(&key)?;
430489
Self::check_value_size(&value)?;
431490
let prepared_statement = &self.write_batch_insertion;
432-
let values = (root_key.clone(), key, value);
491+
let values = (root_key.clone(), vec![key[0]], key, value);
433492
futures.push(Box::pin(async move {
434493
session
435494
.execute_single_page(prepared_statement, values, PagingState::start())
@@ -456,13 +515,18 @@ impl ScyllaDbClient {
456515
let query_bounded = &self.find_keys_by_prefix_bounded;
457516
let rows = match get_upper_bound_option(&key_prefix) {
458517
None => {
459-
let values = (root_key.to_vec(), key_prefix.clone());
518+
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
460519
session
461520
.execute_iter(query_unbounded.clone(), values)
462521
.await?
463522
}
464523
Some(upper_bound) => {
465-
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
524+
let values = (
525+
root_key.to_vec(),
526+
vec![key_prefix[0]],
527+
key_prefix.clone(),
528+
upper_bound,
529+
);
466530
session.execute_iter(query_bounded.clone(), values).await?
467531
}
468532
};
@@ -489,13 +553,18 @@ impl ScyllaDbClient {
489553
let query_bounded = &self.find_key_values_by_prefix_bounded;
490554
let rows = match get_upper_bound_option(&key_prefix) {
491555
None => {
492-
let values = (root_key.to_vec(), key_prefix.clone());
556+
let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone());
493557
session
494558
.execute_iter(query_unbounded.clone(), values)
495559
.await?
496560
}
497561
Some(upper_bound) => {
498-
let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
562+
let values = (
563+
root_key.to_vec(),
564+
vec![key_prefix[0]],
565+
key_prefix.clone(),
566+
upper_bound,
567+
);
499568
session.execute_iter(query_bounded.clone(), values).await?
500569
}
501570
};
@@ -912,9 +981,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal {
912981
.prepare(format!(
913982
"CREATE TABLE kv.{} (\
914983
root_key blob, \
984+
type_id blob, \
915985
k blob, \
916986
v blob, \
917-
PRIMARY KEY (root_key, k) \
987+
PRIMARY KEY ((root_key, type_id), k) \
918988
) \
919989
WITH compaction = {{ \
920990
'class' : 'SizeTieredCompactionStrategy', \

0 commit comments

Comments
 (0)