diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 02972ccffb47..ec4e008a8859 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -93,6 +93,9 @@ const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE /// correct. const MAX_BATCH_SIZE: usize = 5000; +type OccurencesMap = HashMap, Vec>; +type OccurencesMapByType = HashMap; + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -126,7 +129,7 @@ impl ScyllaDbClient { .collect::>() .join(","); let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})", self.namespace, markers ); let prepared_statement = self.session.prepare(query).await?; @@ -146,7 +149,7 @@ impl ScyllaDbClient { .collect::>() .join(","); let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})", + "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k IN ({})", self.namespace, markers ); let prepared_statement = self.session.prepare(query).await?; @@ -183,70 +186,70 @@ impl ScyllaDbClient { let namespace = namespace.to_string(); let read_value = session .prepare(format!( - "SELECT v FROM kv.{} WHERE root_key = ? AND k = ?", + "SELECT v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?", namespace )) .await?; let contains_key = session .prepare(format!( - "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?", + "SELECT root_key FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?", namespace )) .await?; let write_batch_delete_prefix_unbounded = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k >= ?", + "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?", namespace )) .await?; let write_batch_delete_prefix_bounded = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?", namespace )) .await?; let write_batch_deletion = session .prepare(format!( - "DELETE FROM kv.{} WHERE root_key = ? AND k = ?", + "DELETE FROM kv.{} WHERE root_key = ? AND type_id = ? AND k = ?", namespace )) .await?; let write_batch_insertion = session .prepare(format!( - "INSERT INTO kv.{} (root_key, k, v) VALUES (?, ?, ?)", + "INSERT INTO kv.{} (root_key, type_id, k, v) VALUES (?, ?, ?, ?)", namespace )) .await?; let find_keys_by_prefix_unbounded = session .prepare(format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?", + "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?", namespace )) .await?; let find_keys_by_prefix_bounded = session .prepare(format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT k FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?", namespace )) .await?; let find_key_values_by_prefix_unbounded = session .prepare(format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ?", namespace )) .await?; let find_key_values_by_prefix_bounded = session .prepare(format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND type_id = ? AND k >= ? AND k < ?", namespace )) .await?; @@ -277,7 +280,7 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (root_key.to_vec(), vec![key[0]], key); let (result, _) = session .execute_single_page(&self.read_value, &values, PagingState::start()) @@ -292,11 +295,16 @@ impl ScyllaDbClient { fn get_occurences_map( keys: Vec>, - ) -> Result, Vec>, ScyllaDbStoreInternalError> { - let mut map = HashMap::, Vec>::new(); + ) -> Result { + let mut map = OccurencesMapByType::new(); for (i_key, key) in keys.into_iter().enumerate() { Self::check_key_size(&key)?; - map.entry(key).or_default().push(i_key); + let type_id = key[0]; + map.entry(type_id) + .or_default() + .entry(key) + .or_default() + .push(i_key); } Ok(map) } @@ -308,21 +316,48 @@ impl ScyllaDbClient { ) -> Result>>, ScyllaDbStoreInternalError> { let mut values = vec![None; keys.len()]; let map = Self::get_occurences_map(keys)?; - let statement = self.get_multi_key_values_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec, Vec)>()?; + let statements = map + .iter() + .map(|(type_id, map)| async { + let statement = self.get_multi_key_values_statement(map.len()).await?; + let mut inputs = vec![root_key.to_vec(), vec![*type_id]]; + inputs.extend(map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = ↦ + for (type_id, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec, Vec)>()?; + let mut value_pairs = Vec::new(); + while let Some(row) = rows.next().await { + let (key, value) = row?; + for i_key in map_ref + .get(&type_id) + .expect("type_id is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + value_pairs.push((*i_key, value.clone())); + } + } - while let Some(row) = rows.next().await { - let (key, value) = row?; - for i_key in map.get(&key).expect("key is supposed to be in map") { - values[*i_key] = Some(value.clone()); - } + Ok::<_, ScyllaDbStoreInternalError>(value_pairs) + }); + } + + let values_pairs = try_join_all(futures).await?; + for (i_key, value) in values_pairs.iter().flatten() { + values[*i_key] = Some(value.clone()); } + Ok(values) } @@ -333,20 +368,46 @@ impl ScyllaDbClient { ) -> Result, ScyllaDbStoreInternalError> { let mut values = vec![false; keys.len()]; let map = Self::get_occurences_map(keys)?; - let statement = self.get_multi_keys_statement(map.len()).await?; - let mut inputs = vec![root_key.to_vec()]; - inputs.extend(map.keys().cloned()); - let mut rows = self - .session - .execute_iter(statement, &inputs) - .await? - .rows_stream::<(Vec,)>()?; + let statements = map + .iter() + .map(|(type_id, map)| async { + let statement = self.get_multi_keys_statement(map.len()).await?; + let mut inputs = vec![root_key.to_vec(), vec![*type_id]]; + inputs.extend(map.keys().cloned()); + Ok::<_, ScyllaDbStoreInternalError>((*type_id, statement, inputs)) + }) + .collect::>(); + let statements = try_join_all(statements).await?; + + let mut futures = Vec::new(); + let map_ref = ↦ + for (type_id, statement, inputs) in statements { + futures.push(async move { + let mut rows = self + .session + .execute_iter(statement, &inputs) + .await? + .rows_stream::<(Vec,)>()?; + let mut keys = Vec::new(); + while let Some(row) = rows.next().await { + let (key,) = row?; + for i_key in map_ref + .get(&type_id) + .expect("type_id is supposed to be in map") + .get(&key) + .expect("key is supposed to be in map") + { + keys.push(*i_key); + } + } - while let Some(row) = rows.next().await { - let (key,) = row?; - for i_key in map.get(&key).expect("key is supposed to be in map") { - values[*i_key] = true; - } + Ok::<_, ScyllaDbStoreInternalError>(keys) + }); + } + let keys = try_join_all(futures).await?; + + for i_key in keys.iter().flatten() { + values[*i_key] = true; } Ok(values) @@ -360,7 +421,7 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; let session = &self.session; // Read the value of a key - let values = (root_key.to_vec(), key); + let values = (root_key.to_vec(), vec![key[0]], key); let (result, _) = session .execute_single_page(&self.contains_key, &values, PagingState::start()) @@ -385,7 +446,7 @@ impl ScyllaDbClient { match get_upper_bound_option(&key_prefix) { None => { let prepared_statement = &self.write_batch_delete_prefix_unbounded; - let values = (root_key.clone(), key_prefix); + let values = (root_key.clone(), vec![key_prefix[0]], key_prefix); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -396,7 +457,7 @@ impl ScyllaDbClient { } Some(upper) => { let prepared_statement = &self.write_batch_delete_prefix_bounded; - let values = (root_key.clone(), key_prefix, upper); + let values = (root_key.clone(), vec![key_prefix[0]], key_prefix, upper); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -413,7 +474,7 @@ impl ScyllaDbClient { for key in batch.simple_unordered_batch.deletions { Self::check_key_size(&key)?; let prepared_statement = &self.write_batch_deletion; - let values = (root_key.clone(), key); + let values = (root_key.clone(), vec![key[0]], key); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -427,7 +488,7 @@ impl ScyllaDbClient { Self::check_key_size(&key)?; Self::check_value_size(&value)?; let prepared_statement = &self.write_batch_insertion; - let values = (root_key.clone(), key, value); + let values = (root_key.clone(), vec![key[0]], key, value); futures.push(Box::pin(async move { session .execute_single_page(prepared_statement, values, PagingState::start()) @@ -454,13 +515,18 @@ impl ScyllaDbClient { let query_bounded = &self.find_keys_by_prefix_bounded; let rows = match get_upper_bound_option(&key_prefix) { None => { - let values = (root_key.to_vec(), key_prefix.clone()); + let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone()); session .execute_iter(query_unbounded.clone(), values) .await? } Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); + let values = ( + root_key.to_vec(), + vec![key_prefix[0]], + key_prefix.clone(), + upper_bound, + ); session.execute_iter(query_bounded.clone(), values).await? } }; @@ -487,13 +553,18 @@ impl ScyllaDbClient { let query_bounded = &self.find_key_values_by_prefix_bounded; let rows = match get_upper_bound_option(&key_prefix) { None => { - let values = (root_key.to_vec(), key_prefix.clone()); + let values = (root_key.to_vec(), vec![key_prefix[0]], key_prefix.clone()); session .execute_iter(query_unbounded.clone(), values) .await? } Some(upper_bound) => { - let values = (root_key.to_vec(), key_prefix.clone(), upper_bound); + let values = ( + root_key.to_vec(), + vec![key_prefix[0]], + key_prefix.clone(), + upper_bound, + ); session.execute_iter(query_bounded.clone(), values).await? } }; @@ -910,9 +981,10 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .prepare(format!( "CREATE TABLE kv.{} (\ root_key blob, \ + type_id blob, \ k blob, \ v blob, \ - PRIMARY KEY (root_key, k) \ + PRIMARY KEY ((root_key, type_id), k) \ ) \ WITH compaction = {{ \ 'class' : 'SizeTieredCompactionStrategy', \