Skip to content

Commit ae1c51b

Browse files
authored
refactor(cold): split ColdStorage into ColdStorageRead + ColdStorageWrite (ENG-1979) (#42)
1 parent 1756951 commit ae1c51b

File tree

9 files changed

+192
-128
lines changed

9 files changed

+192
-128
lines changed

crates/cold-mdbx/src/backend.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use crate::{
1010
};
1111
use alloy::{consensus::transaction::Recovered, primitives::BlockNumber};
1212
use signet_cold::{
13-
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
14-
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
15-
ZenithHeaderSpecifier,
13+
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
14+
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
15+
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
1616
};
1717
use signet_hot::{
1818
KeySer, MAX_KEY_SIZE, ValSer,
@@ -191,6 +191,7 @@ fn produce_log_stream_blocking(
191191
/// This backend stores historical blockchain data in an MDBX database.
192192
/// It implements the [`ColdStorage`] trait for use with the cold storage
193193
/// task runner.
194+
#[derive(Clone)]
194195
pub struct MdbxColdBackend {
195196
/// The MDBX environment.
196197
env: DatabaseEnv,
@@ -646,7 +647,7 @@ impl MdbxColdBackend {
646647
}
647648
}
648649

649-
impl ColdStorage for MdbxColdBackend {
650+
impl ColdStorageRead for MdbxColdBackend {
650651
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
651652
Ok(self.get_header_inner(spec)?)
652653
}
@@ -757,20 +758,24 @@ impl ColdStorage for MdbxColdBackend {
757758
.map_err(MdbxColdError::from)?;
758759
Ok(latest)
759760
}
761+
}
760762

761-
async fn append_block(&self, data: BlockData) -> ColdResult<()> {
763+
impl ColdStorageWrite for MdbxColdBackend {
764+
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
762765
Ok(self.append_block_inner(data)?)
763766
}
764767

765-
async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
768+
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
766769
Ok(self.append_blocks_inner(data)?)
767770
}
768771

769-
async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
772+
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
770773
Ok(self.truncate_above_inner(block)?)
771774
}
775+
}
772776

773-
async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
777+
impl ColdStorage for MdbxColdBackend {
778+
async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
774779
Ok(self.drain_above_inner(block)?)
775780
}
776781
}

crates/cold-sql/src/backend.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ use alloy::{
3535
},
3636
};
3737
use signet_cold::{
38-
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
39-
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
40-
ZenithHeaderSpecifier,
38+
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
39+
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
40+
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
4141
};
4242
use signet_storage_types::{
4343
ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
@@ -971,7 +971,7 @@ fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8
971971
// ColdStorage implementation
972972
// ============================================================================
973973

974-
impl ColdStorage for SqlColdBackend {
974+
impl ColdStorageRead for SqlColdBackend {
975975
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
976976
let Some(block_num) = self.resolve_header_spec(spec).await? else {
977977
return Ok(None);
@@ -1364,12 +1364,14 @@ impl ColdStorage for SqlColdBackend {
13641364
.map_err(SqlColdError::from)?;
13651365
Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
13661366
}
1367+
}
13671368

1368-
async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1369+
impl ColdStorageWrite for SqlColdBackend {
1370+
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
13691371
self.insert_block(data).await.map_err(ColdStorageError::from)
13701372
}
13711373

1372-
async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1374+
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
13731375
let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
13741376
for block_data in data {
13751377
write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
@@ -1378,7 +1380,7 @@ impl ColdStorage for SqlColdBackend {
13781380
Ok(())
13791381
}
13801382

1381-
async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1383+
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
13821384
let bn = to_i64(block);
13831385
let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
13841386

@@ -1398,6 +1400,8 @@ impl ColdStorage for SqlColdBackend {
13981400
}
13991401
}
14001402

1403+
impl ColdStorage for SqlColdBackend {}
1404+
14011405
#[cfg(all(test, feature = "test-utils"))]
14021406
mod tests {
14031407
use super::*;

crates/cold/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pub use cold_receipt::ColdReceipt;
159159
mod stream;
160160
pub use stream::{StreamParams, produce_log_stream_default};
161161
mod traits;
162-
pub use traits::{BlockData, ColdStorage, LogStream};
162+
pub use traits::{BlockData, ColdStorage, ColdStorageRead, ColdStorageWrite, LogStream};
163163

164164
pub mod connect;
165165
pub use connect::ColdConnect;

crates/cold/src/mem.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
//! It is primarily intended for testing and development.
55
66
use crate::{
7-
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
8-
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
9-
ZenithHeaderSpecifier,
7+
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
8+
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
9+
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
1010
};
1111
use alloy::primitives::{B256, BlockNumber};
1212
use signet_storage_types::{
@@ -47,6 +47,7 @@ struct MemColdBackendInner {
4747
///
4848
/// This backend is thread-safe and suitable for concurrent access.
4949
/// All operations are protected by an async read-write lock.
50+
#[derive(Clone)]
5051
pub struct MemColdBackend {
5152
inner: Arc<RwLock<MemColdBackendInner>>,
5253
}
@@ -100,7 +101,7 @@ impl MemColdBackendInner {
100101
}
101102
}
102103

103-
impl ColdStorage for MemColdBackend {
104+
impl ColdStorageRead for MemColdBackend {
104105
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
105106
let inner = self.inner.read().await;
106107
match spec {
@@ -274,8 +275,10 @@ impl ColdStorage for MemColdBackend {
274275
let inner = self.inner.read().await;
275276
Ok(inner.headers.last_key_value().map(|(k, _)| *k))
276277
}
278+
}
277279

278-
async fn append_block(&self, data: BlockData) -> ColdResult<()> {
280+
impl ColdStorageWrite for MemColdBackend {
281+
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
279282
let mut inner = self.inner.write().await;
280283

281284
let block = data.block_number();
@@ -323,20 +326,22 @@ impl ColdStorage for MemColdBackend {
323326
Ok(())
324327
}
325328

326-
async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
329+
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
327330
for block_data in data {
328331
self.append_block(block_data).await?;
329332
}
330333
Ok(())
331334
}
332335

333-
async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
336+
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
334337
let mut inner = self.inner.write().await;
335338
inner.truncate_above(block);
336339
Ok(())
337340
}
341+
}
338342

339-
async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
343+
impl ColdStorage for MemColdBackend {
344+
async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
340345
let mut inner = self.inner.write().await;
341346

342347
// Collect receipts for blocks above `block` in ascending order

crates/cold/src/stream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
//! Log-streaming helper for backends without snapshot semantics.
22
3-
use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog};
3+
use crate::{ColdResult, ColdStorageError, ColdStorageRead, Filter, HeaderSpecifier, RpcLog};
44
use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption};
55
use tokio::sync::mpsc;
66

77
/// Parameters for a log-streaming request.
88
///
99
/// Bundles the block range, limits, channel, and deadline that every
10-
/// [`ColdStorage::produce_log_stream`] implementation needs.
10+
/// [`ColdStorageRead::produce_log_stream`] implementation needs.
1111
#[derive(Debug)]
1212
pub struct StreamParams {
1313
/// First block in range (inclusive).
@@ -28,13 +28,13 @@ pub struct StreamParams {
2828
///
2929
/// Captures an anchor hash from the `to` block at the start and
3030
/// re-checks it before each block to detect reorgs. Uses
31-
/// [`ColdStorage::get_header`] for anchor checks and
32-
/// [`ColdStorage::get_logs`] with single-block filters per block.
31+
/// [`ColdStorageRead::get_header`] for anchor checks and
32+
/// [`ColdStorageRead::get_logs`] with single-block filters per block.
3333
///
3434
/// Backends that hold a consistent read snapshot (MDBX, PostgreSQL
3535
/// with REPEATABLE READ) should provide their own
36-
/// [`ColdStorage::produce_log_stream`] implementation instead.
37-
pub async fn produce_log_stream_default<B: ColdStorage + ?Sized>(
36+
/// [`ColdStorageRead::produce_log_stream`] implementation instead.
37+
pub async fn produce_log_stream_default<B: ColdStorageRead>(
3838
backend: &B,
3939
filter: &Filter,
4040
params: StreamParams,

0 commit comments

Comments
 (0)