diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 130818ab760..849ccea36b8 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -19,10 +19,11 @@ use futures::{future::join_all, FutureExt as _, StreamExt}; use linera_base::ensure; use scylla::{ batch::BatchStatement, + load_balancing::DefaultPolicy, prepared_statement::PreparedStatement, - statement::batch::BatchType, + statement::{batch::BatchType, Consistency}, transport::errors::{DbError, QueryError}, - Session, SessionBuilder, + ExecutionProfile, Session, SessionBuilder, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -54,8 +55,8 @@ const MAX_MULTI_KEYS: usize = 99; /// So, we set up the maximal size of 16 MB - 10 KB for the values and 10 KB for the keys /// We also arbitrarily decrease the size by 4000 bytes because an amount of size is /// taken internally by the database. -const RAW_MAX_VALUE_SIZE: usize = 16762976; -const MAX_KEY_SIZE: usize = 10240; +const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024; +const MAX_KEY_SIZE: usize = 10 * 1024; const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE; /// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage. @@ -132,13 +133,13 @@ impl ScyllaDbClient { async fn new(session: Session, namespace: &str) -> Result { let namespace = namespace.to_string(); let query = format!( - "SELECT v FROM kv.{} WHERE root_key = ? AND k = ? ALLOW FILTERING", + "SELECT v FROM kv.{} WHERE root_key = ? AND k = ?", namespace ); let read_value = session.prepare(query).await?; let query = format!( - "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ? ALLOW FILTERING", + "SELECT root_key FROM kv.{} WHERE root_key = ? AND k = ?", namespace ); let contains_key = session.prepare(query).await?; @@ -159,23 +160,23 @@ impl ScyllaDbClient { let write_batch_insertion = session.prepare(query).await?.into(); let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? ALLOW FILTERING", + "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ?", namespace ); let find_keys_by_prefix_unbounded = session.prepare(query).await?; let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ? ALLOW FILTERING", + "SELECT k FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", namespace ); let find_keys_by_prefix_bounded = session.prepare(query).await?; let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? ALLOW FILTERING", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ?", namespace ); let find_key_values_by_prefix_unbounded = session.prepare(query).await?; let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ? ALLOW FILTERING", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND k >= ? AND k < ?", namespace ); let find_key_values_by_prefix_bounded = session.prepare(query).await?; @@ -244,7 +245,7 @@ impl ScyllaDbClient { let mut group_query = "?".to_string(); group_query.push_str(&",?".repeat(num_unique_keys - 1)); let query = format!( - "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({}) ALLOW FILTERING", + "SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})", self.namespace, group_query ); @@ -291,7 +292,7 @@ impl ScyllaDbClient { let mut group_query = "?".to_string(); group_query.push_str(&",?".repeat(num_unique_keys - 1)); let query = format!( - "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({}) ALLOW FILTERING", + "SELECT k FROM kv.{} WHERE root_key = ? AND k IN ({})", self.namespace, group_query ); @@ -725,7 +726,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .build() .boxed() .await?; - let query = format!("SELECT root_key FROM kv.{} ALLOW FILTERING", namespace); + let query = format!("SELECT root_key FROM kv.{}", namespace); // Execute the query let rows = session.query_iter(query, &[]).await?; @@ -764,10 +765,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .boxed() .await?; // We check the way the test can fail. It can fail in different ways. - let query = format!( - "SELECT root_key FROM kv.{} LIMIT 1 ALLOW FILTERING", - namespace - ); + let query = format!("SELECT root_key FROM kv.{} LIMIT 1", namespace); // Execute the query let result = session.prepare(&*query).await; @@ -811,6 +809,7 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .boxed() .await?; // Create a keyspace if it doesn't exist + // TODO(#3754): Increase the replication factor to at least 3 before mainnet let query = "CREATE KEYSPACE IF NOT EXISTS kv WITH REPLICATION = { \ 'class' : 'SimpleStrategy', \ 'replication_factor' : 3 \ @@ -820,11 +819,13 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let prepared = session.prepare(query).await?; session.execute_unpaged(&prepared, &[]).await?; - // Create a table if it doesn't exist - // The schema appears too complicated for non-trivial reasons. - // See TODO(#1069). let query = format!( - "CREATE TABLE kv.{} (root_key blob, k blob, v blob, primary key (root_key, k))", + "CREATE TABLE kv.{} ( + root_key blob, + k blob, + v blob, + PRIMARY KEY (root_key, k) + )", namespace );