Skip to content

Commit f0d1e7b

Browse files
starknet_patricia_storage: spawn blocking tasks for rocksDB reads
1 parent 25d8851 commit f0d1e7b

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

crates/starknet_patricia_storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ serde_json.workspace = true
3131
starknet-types-core.workspace = true
3232
starknet_api.workspace = true
3333
thiserror.workspace = true
34+
tokio.workspace = true
3435
validator = { workspace = true, features = ["derive"] }
3536

3637
[dev-dependencies]

crates/starknet_patricia_storage/src/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ pub enum DeserializationError {
4545
// TODO(Ariel): This is only used for EdgeNode construction failures (path length etc.), add
4646
// error types here and use them instead of the general ValueError.
4747
#[error("Invalid value for deserialization: {0}.")]
48-
ValueError(Box<dyn std::error::Error>),
48+
ValueError(Box<dyn std::error::Error + Send>),
4949
}

crates/starknet_patricia_storage/src/rocksdb_storage.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use rust_rocksdb::{
1313
WriteOptions,
1414
DB,
1515
};
16+
use tokio::task::spawn_blocking;
1617

1718
use crate::storage_trait::{
1819
AsyncStorage,
@@ -142,22 +143,28 @@ impl Storage for RocksDbStorage {
142143
type Config = EmptyStorageConfig;
143144

144145
async fn get(&self, key: &DbKey) -> PatriciaStorageResult<Option<DbValue>> {
145-
Ok(self.db.get(&key.0)?.map(DbValue))
146+
// TODO(Nimrod): Config should indicate whether to spawn a task or not.
147+
let db = self.db.clone();
148+
let key = key.clone();
149+
Ok(spawn_blocking(move || db.get(&key.0).map(|opt| opt.map(DbValue))).await??)
146150
}
147151

148152
async fn set(&mut self, key: DbKey, value: DbValue) -> PatriciaStorageResult<()> {
149153
Ok(self.db.put_opt(&key.0, &value.0, &self.options.write_options)?)
150154
}
151155

152156
async fn mget(&self, keys: &[&DbKey]) -> PatriciaStorageResult<Vec<Option<DbValue>>> {
153-
let raw_keys = keys.iter().map(|k| &k.0);
154-
let res = self
155-
.db
156-
.multi_get(raw_keys)
157-
.into_iter()
158-
.map(|r| r.map(|opt| opt.map(DbValue)))
159-
.collect::<Result<_, _>>()?;
160-
Ok(res)
157+
// TODO(Nimrod): Config should indicate whether to spawn a task or not.
158+
let db = self.db.clone();
159+
let keys: Vec<Vec<u8>> = keys.iter().map(|k| k.0.clone()).collect();
160+
spawn_blocking(move || {
161+
let raw_keys = keys.iter().map(|k| k.as_slice());
162+
db.multi_get(raw_keys)
163+
.into_iter()
164+
.map(|r| r.map(|opt| opt.map(DbValue)).map_err(|e| e.into()))
165+
.collect::<Result<Vec<_>, PatriciaStorageError>>()
166+
})
167+
.await?
161168
}
162169

163170
async fn mset(&mut self, key_to_value: DbHashMap) -> PatriciaStorageResult<()> {

crates/starknet_patricia_storage/src/storage_trait.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use apollo_config::dumping::{ser_param, SerializeConfig};
77
use apollo_config::{ParamPath, ParamPrivacyInput, SerializedParam};
88
use serde::{Deserialize, Serialize, Serializer};
99
use starknet_types_core::felt::Felt;
10+
use tokio::task::JoinError;
1011
use validator::Validate;
1112

1213
use crate::errors::DeserializationError;
@@ -37,6 +38,8 @@ pub enum PatriciaStorageError {
3738
AerospikeStorage(#[from] crate::aerospike_storage::AerospikeStorageError),
3839
#[error(transparent)]
3940
Deserialization(#[from] DeserializationError),
41+
#[error(transparent)]
42+
Join(#[from] JoinError),
4043
#[cfg(feature = "mdbx_storage")]
4144
#[error(transparent)]
4245
Mdbx(#[from] libmdbx::Error),

0 commit comments

Comments
 (0)