Skip to content

Commit 115e961

Browse files
starknet_patricia_storage: spawn blocking tasks for rocksDB reads
1 parent 94e1db2 commit 115e961

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

crates/starknet_patricia_storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ serde_json.workspace = true
3131
starknet-types-core.workspace = true
3232
starknet_api.workspace = true
3333
thiserror.workspace = true
34+
tokio.workspace = true
3435
validator = { workspace = true, features = ["derive"] }
3536

3637
[dev-dependencies]

crates/starknet_patricia_storage/src/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ pub enum DeserializationError {
4545
// TODO(Ariel): This is only used for EdgeNode construction failures (path length etc.), add
4646
// error types here and use them instead of the general ValueError.
4747
#[error("Invalid value for deserialization: {0}.")]
48-
ValueError(Box<dyn std::error::Error>),
48+
ValueError(Box<dyn std::error::Error + Send>),
4949
}

crates/starknet_patricia_storage/src/rocksdb_storage.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use rust_rocksdb::{
1717
DB,
1818
};
1919
use serde::{Deserialize, Serialize};
20+
use tokio::task::spawn_blocking;
2021
use validator::Validate;
2122

2223
use crate::storage_trait::{
@@ -278,22 +279,28 @@ impl Storage for RocksDbStorage {
278279
type Config = RocksDbStorageConfig;
279280

280281
async fn get(&self, key: &DbKey) -> PatriciaStorageResult<Option<DbValue>> {
281-
Ok(self.db.get(&key.0)?.map(DbValue))
282+
// TODO(Nimrod): Config should indicate whether to spawn a task or not.
283+
let db = self.db.clone();
284+
let key = key.clone();
285+
Ok(spawn_blocking(move || db.get(&key.0).map(|opt| opt.map(DbValue))).await??)
282286
}
283287

284288
async fn set(&mut self, key: DbKey, value: DbValue) -> PatriciaStorageResult<()> {
285289
Ok(self.db.put_opt(&key.0, &value.0, &self.options.write_options)?)
286290
}
287291

288292
async fn mget(&self, keys: &[&DbKey]) -> PatriciaStorageResult<Vec<Option<DbValue>>> {
289-
let raw_keys = keys.iter().map(|k| &k.0);
290-
let res = self
291-
.db
292-
.multi_get(raw_keys)
293-
.into_iter()
294-
.map(|r| r.map(|opt| opt.map(DbValue)))
295-
.collect::<Result<_, _>>()?;
296-
Ok(res)
293+
// TODO(Nimrod): Config should indicate whether to spawn a task or not.
294+
let db = self.db.clone();
295+
let keys: Vec<Vec<u8>> = keys.iter().map(|k| k.0.clone()).collect();
296+
spawn_blocking(move || {
297+
let raw_keys = keys.iter().map(|k| k.as_slice());
298+
db.multi_get(raw_keys)
299+
.into_iter()
300+
.map(|r| r.map(|opt| opt.map(DbValue)).map_err(|e| e.into()))
301+
.collect::<Result<Vec<_>, PatriciaStorageError>>()
302+
})
303+
.await?
297304
}
298305

299306
async fn mset(&mut self, key_to_value: DbHashMap) -> PatriciaStorageResult<()> {

crates/starknet_patricia_storage/src/storage_trait.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use apollo_config::dumping::{ser_param, SerializeConfig};
77
use apollo_config::{ParamPath, ParamPrivacyInput, SerializedParam};
88
use serde::{Deserialize, Serialize, Serializer};
99
use starknet_types_core::felt::Felt;
10+
use tokio::task::JoinError;
1011
use validator::Validate;
1112

1213
use crate::errors::DeserializationError;
@@ -37,6 +38,8 @@ pub enum PatriciaStorageError {
3738
AerospikeStorage(#[from] crate::aerospike_storage::AerospikeStorageError),
3839
#[error(transparent)]
3940
Deserialization(#[from] DeserializationError),
41+
#[error(transparent)]
42+
Join(#[from] JoinError),
4043
#[cfg(feature = "mdbx_storage")]
4144
#[error(transparent)]
4245
Mdbx(#[from] libmdbx::Error),

0 commit comments

Comments
 (0)