Skip to content

Commit 4e65a04

Browse files
prestwichclaude
andcommitted
feat(cold): add eth_getLogs support and first_log_index to ReceiptContext
Add log filtering to cold storage following eth_getLogs semantics, and include block-level log indexing for RPC response construction. - Add `LogFilter` type with block range, address, and topic filters - Add `RichLog` type with full block/tx context and block_log_index - Add `ColdStorage::get_logs` with implementations for in-memory, SQLite, and PostgreSQL backends - Add `first_log_index` to `ReceiptContext` (cherry-picked from #23) - Replace `idx_logs_address` with composite `idx_logs_address_block` - Factor out `row_to_log_row` helper in SQL backend - Wire through task channel plumbing (request, handle, runner) - Comprehensive conformance tests covering all filter combinations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9dce356 commit 4e65a04

File tree

12 files changed

+519
-28
lines changed

12 files changed

+519
-28
lines changed

crates/cold-sql/migrations/001_initial.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ CREATE TABLE IF NOT EXISTS logs (
7979
PRIMARY KEY (block_number, tx_index, log_index)
8080
);
8181

82-
CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
82+
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
8383
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);
8484

8585
CREATE TABLE IF NOT EXISTS signet_events (

crates/cold-sql/migrations/001_initial_pg.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS logs (
7777
PRIMARY KEY (block_number, tx_index, log_index)
7878
);
7979

80-
CREATE INDEX IF NOT EXISTS idx_logs_address ON logs (address);
80+
CREATE INDEX IF NOT EXISTS idx_logs_address_block ON logs (address, block_number);
8181
CREATE INDEX IF NOT EXISTS idx_logs_topic0 ON logs (topic0);
8282

8383
CREATE TABLE IF NOT EXISTS signet_events (

crates/cold-sql/src/backend.rs

Lines changed: 190 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::convert::{
1111
};
1212
use alloy::{consensus::Header, primitives::BlockNumber};
1313
use signet_cold::{
14-
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier,
15-
ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
14+
BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier, LogFilter,
15+
ReceiptSpecifier, RichLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
1616
};
1717
use signet_storage_types::{
1818
ConfirmationMeta, DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned,
@@ -150,6 +150,78 @@ impl SqlColdBackend {
150150
.transpose()
151151
}
152152

153+
async fn fetch_block_hash(
154+
&self,
155+
block_num: BlockNumber,
156+
) -> Result<Option<alloy::primitives::B256>, SqlColdError> {
157+
let bn = to_i64(block_num);
158+
let row = sqlx::query("SELECT block_hash FROM headers WHERE block_number = $1")
159+
.bind(bn)
160+
.fetch_optional(&self.pool)
161+
.await?;
162+
Ok(row.map(|r| {
163+
let bytes: Vec<u8> = r.get("block_hash");
164+
alloy::primitives::B256::from_slice(&bytes)
165+
}))
166+
}
167+
168+
async fn fetch_tx_by_location(
169+
&self,
170+
block: BlockNumber,
171+
index: u64,
172+
) -> Result<Option<TransactionSigned>, SqlColdError> {
173+
let bn = to_i64(block);
174+
let idx = to_i64(index);
175+
let row =
176+
sqlx::query("SELECT * FROM transactions WHERE block_number = $1 AND tx_index = $2")
177+
.bind(bn)
178+
.bind(idx)
179+
.fetch_optional(&self.pool)
180+
.await?;
181+
182+
row.map(|r| row_to_tx_row(&r).into_tx()).transpose()
183+
}
184+
185+
async fn fetch_receipt_by_location(
186+
&self,
187+
block: BlockNumber,
188+
index: u64,
189+
) -> Result<Option<Receipt>, SqlColdError> {
190+
let bn = to_i64(block);
191+
let idx = to_i64(index);
192+
193+
let receipt_row =
194+
sqlx::query("SELECT * FROM receipts WHERE block_number = $1 AND tx_index = $2")
195+
.bind(bn)
196+
.bind(idx)
197+
.fetch_optional(&self.pool)
198+
.await?;
199+
200+
let Some(rr) = receipt_row else {
201+
return Ok(None);
202+
};
203+
204+
let receipt = ReceiptRow {
205+
block_number: rr.get("block_number"),
206+
tx_index: rr.get("tx_index"),
207+
tx_type: rr.get::<i32, _>("tx_type") as i16,
208+
success: rr.get::<i32, _>("success") != 0,
209+
cumulative_gas_used: rr.get("cumulative_gas_used"),
210+
};
211+
212+
let log_rows = sqlx::query(
213+
"SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
214+
)
215+
.bind(bn)
216+
.bind(idx)
217+
.fetch_all(&self.pool)
218+
.await?;
219+
220+
let logs = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
221+
222+
receipt_from_rows(receipt, logs).map(Some)
223+
}
224+
153225
// ========================================================================
154226
// Write helpers
155227
// ========================================================================
@@ -370,6 +442,20 @@ fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
370442
}
371443
}
372444

445+
fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
446+
LogRow {
447+
block_number: r.get("block_number"),
448+
tx_index: r.get("tx_index"),
449+
log_index: r.get("log_index"),
450+
address: r.get("address"),
451+
topic0: r.get("topic0"),
452+
topic1: r.get("topic1"),
453+
topic2: r.get("topic2"),
454+
topic3: r.get("topic3"),
455+
data: r.get("data"),
456+
}
457+
}
458+
373459
fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
374460
ZenithHeaderRow {
375461
block_number: r.get("block_number"),
@@ -574,18 +660,10 @@ impl ColdStorage for SqlColdBackend {
574660
let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
575661
std::collections::BTreeMap::new();
576662
for r in all_log_rows {
577-
let tx_idx: i64 = r.get("tx_index");
578-
logs_by_tx.entry(tx_idx).or_default().push(LogRow {
579-
block_number: r.get("block_number"),
580-
tx_index: tx_idx,
581-
log_index: r.get("log_index"),
582-
address: r.get("address"),
583-
topic0: r.get("topic0"),
584-
topic1: r.get("topic1"),
585-
topic2: r.get("topic2"),
586-
topic3: r.get("topic3"),
587-
data: r.get("data"),
588-
});
663+
logs_by_tx
664+
.entry(r.get::<i64, _>("tx_index"))
665+
.or_default()
666+
.push(row_to_log_row(&r));
589667
}
590668

591669
receipt_rows
@@ -692,6 +770,104 @@ impl ColdStorage for SqlColdBackend {
692770
.collect()
693771
}
694772

773+
async fn get_logs(&self, filter: LogFilter) -> ColdResult<Vec<RichLog>> {
774+
// Build dynamic SQL with positional $N placeholders.
775+
// The correlated subquery computes block_log_index: the absolute
776+
// position of each log among all logs in its block, leveraging the
777+
// PK index on (block_number, tx_index, log_index).
778+
let mut sql = String::from(
779+
"SELECT l.*, h.block_hash, t.tx_hash, \
780+
(SELECT COUNT(*) FROM logs l2 \
781+
WHERE l2.block_number = l.block_number \
782+
AND (l2.tx_index < l.tx_index \
783+
OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
784+
) AS block_log_index \
785+
FROM logs l \
786+
JOIN headers h ON l.block_number = h.block_number \
787+
JOIN transactions t ON l.block_number = t.block_number \
788+
AND l.tx_index = t.tx_index \
789+
WHERE l.block_number >= $1 AND l.block_number <= $2",
790+
);
791+
let mut params: Vec<Vec<u8>> = Vec::new();
792+
let mut idx = 3u32;
793+
794+
// Address filter
795+
if let Some(ref addrs) = filter.address {
796+
if addrs.len() == 1 {
797+
sql.push_str(&format!(" AND l.address = ${idx}"));
798+
params.push(addrs[0].as_slice().to_vec());
799+
idx += 1;
800+
} else if !addrs.is_empty() {
801+
let placeholders: String = addrs
802+
.iter()
803+
.enumerate()
804+
.map(|(i, _)| format!("${}", idx + i as u32))
805+
.collect::<Vec<_>>()
806+
.join(", ");
807+
sql.push_str(&format!(" AND l.address IN ({placeholders})"));
808+
for addr in addrs {
809+
params.push(addr.as_slice().to_vec());
810+
}
811+
idx += addrs.len() as u32;
812+
}
813+
}
814+
815+
// Topic filters
816+
let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
817+
for (i, topic_filter) in filter.topics.iter().enumerate() {
818+
let Some(values) = topic_filter else { continue };
819+
if values.is_empty() {
820+
continue;
821+
}
822+
if values.len() == 1 {
823+
sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
824+
params.push(values[0].as_slice().to_vec());
825+
idx += 1;
826+
} else {
827+
let placeholders: String = values
828+
.iter()
829+
.enumerate()
830+
.map(|(j, _)| format!("${}", idx + j as u32))
831+
.collect::<Vec<_>>()
832+
.join(", ");
833+
sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
834+
for v in values {
835+
params.push(v.as_slice().to_vec());
836+
}
837+
idx += values.len() as u32;
838+
}
839+
}
840+
841+
sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
842+
843+
// Bind parameters and execute.
844+
let mut query =
845+
sqlx::query(&sql).bind(to_i64(filter.from_block)).bind(to_i64(filter.to_block));
846+
for param in &params {
847+
query = query.bind(param.as_slice());
848+
}
849+
850+
let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
851+
852+
rows.into_iter()
853+
.map(|r| {
854+
let log = row_to_log_row(&r).into_log();
855+
let block_number = from_i64(r.get::<i64, _>("block_number"));
856+
let block_hash_bytes: Vec<u8> = r.get("block_hash");
857+
let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
858+
Ok(RichLog {
859+
log,
860+
block_number,
861+
block_hash: alloy::primitives::B256::from_slice(&block_hash_bytes),
862+
tx_hash: alloy::primitives::B256::from_slice(&tx_hash_bytes),
863+
tx_index: from_i64(r.get::<i64, _>("tx_index")),
864+
block_log_index: from_i64(r.get::<i64, _>("block_log_index")),
865+
tx_log_index: from_i64(r.get::<i64, _>("log_index")),
866+
})
867+
})
868+
.collect::<ColdResult<Vec<_>>>()
869+
}
870+
695871
async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
696872
let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
697873
.fetch_one(&self.pool)

0 commit comments

Comments
 (0)