Skip to content

Commit 4eb15d6

Browse files
prestwichclaude
andcommitted
feat(cold): add eth_getLogs support and first_log_index to ReceiptContext
Add log filtering to cold storage following eth_getLogs semantics, and include block-level log indexing for RPC response construction. - Add `LogFilter` type with block range, address, and topic filters - Add `RichLog` type with full block/tx context and block_log_index - Add `ColdStorage::get_logs` with implementations for in-memory, SQLite, and PostgreSQL backends - Add `first_log_index` to `ReceiptContext` (cherry-picked from #23) - Replace `idx_logs_address` with composite `idx_logs_address_block` - Factor out `row_to_log_row` helper in SQL backend - Wire through task channel plumbing (request, handle, runner) - Comprehensive conformance tests covering all filter combinations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a1c0e80 commit 4eb15d6

File tree

12 files changed

+445
-44
lines changed

12 files changed

+445
-44
lines changed

crates/cold-sql/migrations/001_initial.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ CREATE TABLE IF NOT EXISTS logs (
7979
PRIMARY KEY (block_number, tx_index, log_index)
8080
);
8181

82-
CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
82+
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
8383
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);
8484

8585
CREATE TABLE IF NOT EXISTS signet_events (

crates/cold-sql/migrations/001_initial_pg.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS logs (
7777
PRIMARY KEY (block_number, tx_index, log_index)
7878
);
7979

80-
CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
80+
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
8181
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);
8282

8383
CREATE TABLE IF NOT EXISTS signet_events (

crates/cold-sql/src/backend.rs

Lines changed: 116 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::convert::{
1111
};
1212
use alloy::{consensus::Header, primitives::BlockNumber};
1313
use signet_cold::{
14-
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier,
15-
ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
14+
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier, LogFilter,
15+
ReceiptSpecifier, RichLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
1616
};
1717
use signet_storage_types::{
1818
ConfirmationMeta, DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned,
@@ -269,20 +269,7 @@ impl SqlColdBackend {
269269
.fetch_all(&self.pool)
270270
.await?;
271271

272-
let logs = log_rows
273-
.into_iter()
274-
.map(|r| LogRow {
275-
block_number: r.get("block_number"),
276-
tx_index: r.get("tx_index"),
277-
log_index: r.get("log_index"),
278-
address: r.get("address"),
279-
topic0: r.get("topic0"),
280-
topic1: r.get("topic1"),
281-
topic2: r.get("topic2"),
282-
topic3: r.get("topic3"),
283-
data: r.get("data"),
284-
})
285-
.collect();
272+
let logs = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
286273

287274
receipt_from_rows(receipt, logs).map(Some)
288275
}
@@ -538,6 +525,20 @@ fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
538525
}
539526
}
540527

528+
fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
529+
LogRow {
530+
block_number: r.get("block_number"),
531+
tx_index: r.get("tx_index"),
532+
log_index: r.get("log_index"),
533+
address: r.get("address"),
534+
topic0: r.get("topic0"),
535+
topic1: r.get("topic1"),
536+
topic2: r.get("topic2"),
537+
topic3: r.get("topic3"),
538+
data: r.get("data"),
539+
}
540+
}
541+
541542
fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
542543
ZenithHeaderRow {
543544
block_number: r.get("block_number"),
@@ -655,20 +656,7 @@ impl ColdStorage for SqlColdBackend {
655656
.await
656657
.map_err(SqlColdError::from)?;
657658

658-
let logs = log_rows
659-
.into_iter()
660-
.map(|r| LogRow {
661-
block_number: r.get("block_number"),
662-
tx_index: r.get("tx_index"),
663-
log_index: r.get("log_index"),
664-
address: r.get("address"),
665-
topic0: r.get("topic0"),
666-
topic1: r.get("topic1"),
667-
topic2: r.get("topic2"),
668-
topic3: r.get("topic3"),
669-
data: r.get("data"),
670-
})
671-
.collect();
659+
let logs = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
672660

673661
receipts.push(receipt_from_rows(receipt, logs)?);
674662
}
@@ -763,6 +751,104 @@ impl ColdStorage for SqlColdBackend {
763751
.collect()
764752
}
765753

754+
async fn get_logs(&self, filter: LogFilter) -> ColdResult<Vec<RichLog>> {
755+
// Build dynamic SQL with positional $N placeholders.
756+
// The correlated subquery computes block_log_index: the absolute
757+
// position of each log among all logs in its block, leveraging the
758+
// PK index on (block_number, tx_index, log_index).
759+
let mut sql = String::from(
760+
"SELECT l.*, h.block_hash, t.tx_hash, \
761+
(SELECT COUNT(*) FROM logs l2 \
762+
WHERE l2.block_number = l.block_number \
763+
AND (l2.tx_index < l.tx_index \
764+
OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
765+
) AS block_log_index \
766+
FROM logs l \
767+
JOIN headers h ON l.block_number = h.block_number \
768+
JOIN transactions t ON l.block_number = t.block_number \
769+
AND l.tx_index = t.tx_index \
770+
WHERE l.block_number >= $1 AND l.block_number <= $2",
771+
);
772+
let mut params: Vec<Vec<u8>> = Vec::new();
773+
let mut idx = 3u32;
774+
775+
// Address filter
776+
if let Some(ref addrs) = filter.address {
777+
if addrs.len() == 1 {
778+
sql.push_str(&format!(" AND l.address = ${idx}"));
779+
params.push(addrs[0].as_slice().to_vec());
780+
idx += 1;
781+
} else if !addrs.is_empty() {
782+
let placeholders: String = addrs
783+
.iter()
784+
.enumerate()
785+
.map(|(i, _)| format!("${}", idx + i as u32))
786+
.collect::<Vec<_>>()
787+
.join(", ");
788+
sql.push_str(&format!(" AND l.address IN ({placeholders})"));
789+
for addr in addrs {
790+
params.push(addr.as_slice().to_vec());
791+
}
792+
idx += addrs.len() as u32;
793+
}
794+
}
795+
796+
// Topic filters
797+
let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
798+
for (i, topic_filter) in filter.topics.iter().enumerate() {
799+
let Some(values) = topic_filter else { continue };
800+
if values.is_empty() {
801+
continue;
802+
}
803+
if values.len() == 1 {
804+
sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
805+
params.push(values[0].as_slice().to_vec());
806+
idx += 1;
807+
} else {
808+
let placeholders: String = values
809+
.iter()
810+
.enumerate()
811+
.map(|(j, _)| format!("${}", idx + j as u32))
812+
.collect::<Vec<_>>()
813+
.join(", ");
814+
sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
815+
for v in values {
816+
params.push(v.as_slice().to_vec());
817+
}
818+
idx += values.len() as u32;
819+
}
820+
}
821+
822+
sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
823+
824+
// Bind parameters and execute.
825+
let mut query =
826+
sqlx::query(&sql).bind(to_i64(filter.from_block)).bind(to_i64(filter.to_block));
827+
for param in &params {
828+
query = query.bind(param.as_slice());
829+
}
830+
831+
let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
832+
833+
rows.into_iter()
834+
.map(|r| {
835+
let log = row_to_log_row(&r).into_log();
836+
let block_number = from_i64(r.get::<i64, _>("block_number"));
837+
let block_hash_bytes: Vec<u8> = r.get("block_hash");
838+
let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
839+
Ok(RichLog {
840+
log,
841+
block_number,
842+
block_hash: alloy::primitives::B256::from_slice(&block_hash_bytes),
843+
tx_hash: alloy::primitives::B256::from_slice(&tx_hash_bytes),
844+
tx_index: from_i64(r.get::<i64, _>("tx_index")),
845+
block_log_index: from_i64(r.get::<i64, _>("block_log_index")),
846+
tx_log_index: from_i64(r.get::<i64, _>("log_index")),
847+
})
848+
})
849+
.collect::<ColdResult<Vec<_>>>()
850+
}
851+
766852
async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
767853
let row = sqlx::query("SELECT block_number FROM metadata WHERE key = $1")
768854
.bind("latest_block")

0 commit comments

Comments
 (0)