Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.2.0"
version = "0.3.0"
edition = "2024"
rust-version = "1.92"
authors = ["init4"]
Expand Down Expand Up @@ -35,13 +35,13 @@ incremental = false

[workspace.dependencies]
# internal
signet-hot = { version = "0.2.0", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.2.0", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.2.0", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.2.0", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.2.0", path = "./crates/cold-sql" }
signet-storage = { version = "0.2.0", path = "./crates/storage" }
signet-storage-types = { version = "0.2.0", path = "./crates/types" }
signet-hot = { version = "0.3.0", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.3.0", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.3.0", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.3.0", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.3.0", path = "./crates/cold-sql" }
signet-storage = { version = "0.3.0", path = "./crates/storage" }
signet-storage-types = { version = "0.3.0", path = "./crates/types" }

# External, in-house
signet-libmdbx = { version = "0.8.0" }
Expand Down
96 changes: 82 additions & 14 deletions crates/cold-mdbx/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,62 @@ impl MdbxColdBackend {
Ok(())
}

fn get_logs_inner(
&self,
filter: signet_cold::LogFilter,
) -> Result<Vec<signet_cold::RichLog>, MdbxColdError> {
let tx = self.env.tx()?;
let mut results = Vec::new();

for block_num in filter.from_block..=filter.to_block {
let Some(header) = TableTraverse::<ColdHeaders, _>::exact(
&mut tx.new_cursor::<ColdHeaders>()?,
&block_num,
)?
else {
continue;
};
let block_hash = header.hash_slow();
let mut block_log_index = 0u64;

// Walk receipts by index using exact_dual lookups.
for tx_idx in 0u64.. {
let Some(receipt) = DualTableTraverse::<ColdReceipts, _>::exact_dual(
&mut tx.new_cursor::<ColdReceipts>()?,
&block_num,
&tx_idx,
)?
else {
break;
};
let tx_hash = DualTableTraverse::<ColdTransactions, _>::exact_dual(
&mut tx.new_cursor::<ColdTransactions>()?,
&block_num,
&tx_idx,
)?
.map(|t: TransactionSigned| *t.hash())
.unwrap_or_default();

for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
if filter.matches_log(log) {
results.push(signet_cold::RichLog {
log: log.clone(),
block_number: block_num,
block_hash,
tx_hash,
tx_index: tx_idx,
block_log_index: block_log_index + log_idx as u64,
tx_log_index: log_idx as u64,
});
}
}
block_log_index += receipt.inner.logs.len() as u64;
}
}

Ok(results)
}

fn get_receipt_with_context_inner(
&self,
spec: ReceiptSpecifier,
Expand Down Expand Up @@ -443,23 +499,28 @@ impl MdbxColdBackend {
return Ok(None);
};

let prior_cumulative_gas = index
.checked_sub(1)
.map(|prev| {
DualTableTraverse::<ColdReceipts, _>::exact_dual(
&mut tx.new_cursor::<ColdReceipts>()?,
&block,
&prev,
)
})
.transpose()?
.flatten()
.map(|r: Receipt| r.inner.cumulative_gas_used)
.unwrap_or(0);
let mut first_log_index = 0u64;
let mut prior_cumulative_gas = 0u64;
for i in 0..index {
if let Some(r) = DualTableTraverse::<ColdReceipts, _>::exact_dual(
&mut tx.new_cursor::<ColdReceipts>()?,
&block,
&i,
)? {
prior_cumulative_gas = r.inner.cumulative_gas_used;
first_log_index += r.inner.logs.len() as u64;
}
}

let meta = ConfirmationMeta::new(block, header.hash_slow(), index);
let confirmed_receipt = Confirmed::new(receipt, meta);
Ok(Some(ReceiptContext::new(header, transaction, confirmed_receipt, prior_cumulative_gas)))
Ok(Some(ReceiptContext::new(
header,
transaction,
confirmed_receipt,
prior_cumulative_gas,
first_log_index,
)))
}
}

Expand Down Expand Up @@ -537,6 +598,13 @@ impl ColdStorage for MdbxColdBackend {
Ok(headers)
}

async fn get_logs(
&self,
filter: signet_cold::LogFilter,
) -> ColdResult<Vec<signet_cold::RichLog>> {
Ok(self.get_logs_inner(filter)?)
}

async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
let tx = self.env.tx().map_err(MdbxColdError::from)?;
let mut cursor = tx.new_cursor::<ColdHeaders>().map_err(MdbxColdError::from)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/cold-sql/migrations/001_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ CREATE TABLE IF NOT EXISTS logs (
PRIMARY KEY (block_number, tx_index, log_index)
);

CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);

CREATE TABLE IF NOT EXISTS signet_events (
Expand Down
2 changes: 1 addition & 1 deletion crates/cold-sql/migrations/001_initial_pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS logs (
PRIMARY KEY (block_number, tx_index, log_index)
);

CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);

CREATE TABLE IF NOT EXISTS signet_events (
Expand Down
129 changes: 115 additions & 14 deletions crates/cold-sql/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::convert::{
};
use alloy::{consensus::Header, primitives::BlockNumber};
use signet_cold::{
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier,
ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier, LogFilter,
ReceiptSpecifier, RichLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use signet_storage_types::{
ConfirmationMeta, DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned,
Expand Down Expand Up @@ -370,6 +370,20 @@ fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
}
}

fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
LogRow {
block_number: r.get("block_number"),
tx_index: r.get("tx_index"),
log_index: r.get("log_index"),
address: r.get("address"),
topic0: r.get("topic0"),
topic1: r.get("topic1"),
topic2: r.get("topic2"),
topic3: r.get("topic3"),
data: r.get("data"),
}
}

fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
ZenithHeaderRow {
block_number: r.get("block_number"),
Expand Down Expand Up @@ -574,18 +588,7 @@ impl ColdStorage for SqlColdBackend {
let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
std::collections::BTreeMap::new();
for r in all_log_rows {
let tx_idx: i64 = r.get("tx_index");
logs_by_tx.entry(tx_idx).or_default().push(LogRow {
block_number: r.get("block_number"),
tx_index: tx_idx,
log_index: r.get("log_index"),
address: r.get("address"),
topic0: r.get("topic0"),
topic1: r.get("topic1"),
topic2: r.get("topic2"),
topic3: r.get("topic3"),
data: r.get("data"),
});
logs_by_tx.entry(r.get::<i64, _>("tx_index")).or_default().push(row_to_log_row(&r));
}

receipt_rows
Expand Down Expand Up @@ -692,6 +695,104 @@ impl ColdStorage for SqlColdBackend {
.collect()
}

async fn get_logs(&self, filter: LogFilter) -> ColdResult<Vec<RichLog>> {
// Build dynamic SQL with positional $N placeholders.
// The correlated subquery computes block_log_index: the absolute
// position of each log among all logs in its block, leveraging the
// PK index on (block_number, tx_index, log_index).
let mut sql = String::from(
"SELECT l.*, h.block_hash, t.tx_hash, \
(SELECT COUNT(*) FROM logs l2 \
WHERE l2.block_number = l.block_number \
AND (l2.tx_index < l.tx_index \
OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
) AS block_log_index \
FROM logs l \
JOIN headers h ON l.block_number = h.block_number \
JOIN transactions t ON l.block_number = t.block_number \
AND l.tx_index = t.tx_index \
WHERE l.block_number >= $1 AND l.block_number <= $2",
);
let mut params: Vec<Vec<u8>> = Vec::new();
let mut idx = 3u32;

// Address filter
if let Some(ref addrs) = filter.address {
if addrs.len() == 1 {
sql.push_str(&format!(" AND l.address = ${idx}"));
params.push(addrs[0].as_slice().to_vec());
idx += 1;
} else if !addrs.is_empty() {
let placeholders: String = addrs
.iter()
.enumerate()
.map(|(i, _)| format!("${}", idx + i as u32))
.collect::<Vec<_>>()
.join(", ");
sql.push_str(&format!(" AND l.address IN ({placeholders})"));
for addr in addrs {
params.push(addr.as_slice().to_vec());
}
idx += addrs.len() as u32;
}
}

// Topic filters
let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
for (i, topic_filter) in filter.topics.iter().enumerate() {
let Some(values) = topic_filter else { continue };
if values.is_empty() {
continue;
}
if values.len() == 1 {
sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
params.push(values[0].as_slice().to_vec());
idx += 1;
} else {
let placeholders: String = values
.iter()
.enumerate()
.map(|(j, _)| format!("${}", idx + j as u32))
.collect::<Vec<_>>()
.join(", ");
sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
for v in values {
params.push(v.as_slice().to_vec());
}
idx += values.len() as u32;
}
}

sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");

// Bind parameters and execute.
let mut query =
sqlx::query(&sql).bind(to_i64(filter.from_block)).bind(to_i64(filter.to_block));
for param in &params {
query = query.bind(param.as_slice());
}

let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;

rows.into_iter()
.map(|r| {
let log = row_to_log_row(&r).into_log();
let block_number = from_i64(r.get::<i64, _>("block_number"));
let block_hash_bytes: Vec<u8> = r.get("block_hash");
let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
Ok(RichLog {
log,
block_number,
block_hash: alloy::primitives::B256::from_slice(&block_hash_bytes),
tx_hash: alloy::primitives::B256::from_slice(&tx_hash_bytes),
tx_index: from_i64(r.get::<i64, _>("tx_index")),
block_log_index: from_i64(r.get::<i64, _>("block_log_index")),
tx_log_index: from_i64(r.get::<i64, _>("log_index")),
})
})
.collect::<ColdResult<Vec<_>>>()
}

async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
.fetch_one(&self.pool)
Expand Down
Loading