Skip to content

Commit 0fec9c6

Browse files
authored
Merge pull request #70 from bilinearlabs/async_trait
Refactor the Storage trait and implementors to use async_trait #66
2 parents 5214276 + 1852b49 commit 0fec9c6

File tree

5 files changed

+592
-730
lines changed

5 files changed

+592
-730
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ test-utils = ["dep:fake"]
1515
[dependencies]
1616
alloy = { version = "1.4.0", features = ["rpc-types", "json", "rpc-client", "json-rpc"] }
1717
anyhow = "1.0.100"
18+
async-trait = "0.1"
1819
axum = { version = "0.8.7", features = ["default", "http2"] }
1920
hex = "0.4"
2021
clap = { version = "4.5.51", features = ["derive"] }

src/storage/storage_api.rs

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,30 @@ use crate::{
77
};
88
use alloy::{json_abi::Event, primitives::B256, rpc::types::Log};
99
use anyhow::Result;
10+
use async_trait::async_trait;
1011
use serde_json::Value;
11-
use std::{any::Any, future::Future, pin::Pin};
12+
use std::any::Any;
1213

1314
/// Trait that defines the API between the producer task and the storage.
15+
#[async_trait]
1416
pub trait Storage: Send + Sync + 'static + Any {
1517
/// Adds a list of events to the storage for a specific chain.
16-
fn add_events(
17-
&self,
18-
chain_id: u64,
19-
events: &[Log],
20-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
18+
async fn add_events(&self, chain_id: u64, events: &[Log]) -> Result<()>;
2119

2220
/// Lists the events that are registered in the storage along their indexing status.
23-
fn list_indexed_events(
24-
&self,
25-
) -> Pin<Box<dyn Future<Output = Result<Vec<EventDescriptorDb>>> + Send + '_>>;
21+
async fn list_indexed_events(&self) -> Result<Vec<EventDescriptorDb>>;
2622

2723
/// Get the status of an event in the database for a specific chain.
28-
fn event_index_status(
29-
&self,
30-
chain_id: u64,
31-
event: &Event,
32-
) -> Pin<Box<dyn Future<Output = Result<Option<EventStatus>>> + Send + '_>>;
24+
async fn event_index_status(&self, chain_id: u64, event: &Event)
25+
-> Result<Option<EventStatus>>;
3326

3427
/// Includes a list of events in the storage for a specific chain.
3528
///
3629
/// # Description
3730
///
3831
/// This method shall be used once at the beginning of the application to include the events that
3932
/// are to going be indexed.
40-
fn include_events(
41-
&self,
42-
chain_id: u64,
43-
events: &[Event],
44-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
33+
async fn include_events(&self, chain_id: u64, events: &[Event]) -> Result<()>;
4534

4635
/// Gets the full signature of an event type by its hash.
4736
///
@@ -52,32 +41,16 @@ pub trait Storage: Send + Sync + 'static + Any {
5241
/// "Transfer(address indexed from,address indexed to,uint256 value)".
5342
///
5443
/// This signature is used to build a an event object.
55-
fn get_event_signature(
56-
&self,
57-
event_hash: &str,
58-
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + '_>>;
44+
async fn get_event_signature(&self, event_hash: &str) -> Result<String>;
5945

6046
/// Gets the latest block number that has been indexed for a specific chain.
61-
fn last_block(
62-
&self,
63-
chain_id: u64,
64-
event: &Event,
65-
) -> Pin<Box<dyn Future<Output = Result<u64>> + Send + '_>>;
47+
async fn last_block(&self, chain_id: u64, event: &Event) -> Result<u64>;
6648

6749
/// Gets the first block number that has been indexed for a specific chain.
68-
fn first_block(
69-
&self,
70-
chain_id: u64,
71-
event: &Event,
72-
) -> Pin<Box<dyn Future<Output = Result<u64>> + Send + '_>>;
50+
async fn first_block(&self, chain_id: u64, event: &Event) -> Result<u64>;
7351

7452
/// Sets the first block number for an event on a specific chain.
75-
fn set_first_block(
76-
&self,
77-
chain_id: u64,
78-
event: &Event,
79-
block_number: u64,
80-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
53+
async fn set_first_block(&self, chain_id: u64, event: &Event, block_number: u64) -> Result<()>;
8154

8255
/// Sets the last block number for the specified events on a specific chain.
8356
///
@@ -93,26 +66,21 @@ pub trait Storage: Send + Sync + 'static + Any {
9366
/// * `event_selectors` - The list of event selectors (topic0 hashes) to update.
9467
/// * `last_processed` - The block number to set as `last_block`. If `None`, uses the
9568
/// maximum `last_block` among the specified events.
96-
fn synchronize_events(
69+
async fn synchronize_events(
9770
&self,
9871
chain_id: u64,
9972
event_selectors: &[B256],
10073
last_processed: Option<u64>,
101-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
74+
) -> Result<()>;
10275

10376
/// Sends a raw SQL query to the storage and returns a JSON value.
104-
fn send_raw_query(
105-
&self,
106-
query: &str,
107-
) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>;
77+
async fn send_raw_query(&self, query: &str) -> Result<Value>;
10878

10979
/// Lists the contracts indexed in the storage.
110-
fn list_contracts(
111-
&self,
112-
) -> Pin<Box<dyn Future<Output = Result<Vec<ContractDescriptorDb>>> + Send + '_>>;
80+
async fn list_contracts(&self) -> Result<Vec<ContractDescriptorDb>>;
11381

11482
/// Returns the database schema including all tables and their column definitions.
115-
fn describe_database(&self) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>;
83+
async fn describe_database(&self) -> Result<Value>;
11684
}
11785

11886
/// Trait for creating storage instances.

src/storage/storage_duckdb.rs

Lines changed: 28 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ use alloy::{
1616
rpc::types::Log,
1717
};
1818
use anyhow::{Context, Result};
19+
use async_trait::async_trait;
1920
use duckdb::{Connection, OptionalExt, types::ValueRef};
2021
use serde_json::{Map, Number, Value, json};
2122
use std::{
2223
collections::{HashMap, HashSet},
23-
future::Future,
24-
pin::Pin,
2524
string::ToString,
2625
sync::{Mutex, RwLock},
2726
};
@@ -61,107 +60,63 @@ impl Clone for DuckDBStorage {
6160
}
6261
}
6362

63+
#[async_trait]
6464
impl Storage for DuckDBStorage {
65-
fn add_events(
66-
&self,
67-
chain_id: u64,
68-
events: &[Log],
69-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
70-
Box::pin(std::future::ready(self.add_events_sync(chain_id, events)))
65+
async fn add_events(&self, chain_id: u64, events: &[Log]) -> Result<()> {
66+
self.add_events_sync(chain_id, events)
7167
}
7268

73-
fn list_indexed_events(
74-
&self,
75-
) -> Pin<Box<dyn Future<Output = Result<Vec<EventDescriptorDb>>> + Send + '_>> {
76-
Box::pin(std::future::ready(self.list_indexed_events_sync()))
69+
async fn list_indexed_events(&self) -> Result<Vec<EventDescriptorDb>> {
70+
self.list_indexed_events_sync()
7771
}
7872

79-
fn last_block(
80-
&self,
81-
chain_id: u64,
82-
event: &Event,
83-
) -> Pin<Box<dyn Future<Output = Result<u64>> + Send + '_>> {
84-
Box::pin(std::future::ready(self.last_block_sync(chain_id, event)))
73+
async fn last_block(&self, chain_id: u64, event: &Event) -> Result<u64> {
74+
self.last_block_sync(chain_id, event)
8575
}
8676

87-
fn first_block(
88-
&self,
89-
chain_id: u64,
90-
event: &Event,
91-
) -> Pin<Box<dyn Future<Output = Result<u64>> + Send + '_>> {
92-
Box::pin(std::future::ready(self.first_block_sync(chain_id, event)))
77+
async fn first_block(&self, chain_id: u64, event: &Event) -> Result<u64> {
78+
self.first_block_sync(chain_id, event)
9379
}
9480

95-
fn include_events(
96-
&self,
97-
chain_id: u64,
98-
events: &[Event],
99-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
100-
Box::pin(std::future::ready(
101-
self.include_events_sync(chain_id, events),
102-
))
81+
async fn include_events(&self, chain_id: u64, events: &[Event]) -> Result<()> {
82+
self.include_events_sync(chain_id, events)
10383
}
10484

105-
fn get_event_signature(
106-
&self,
107-
event_hash: &str,
108-
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + '_>> {
109-
Box::pin(std::future::ready(
110-
self.get_event_signature_sync(event_hash),
111-
))
85+
async fn get_event_signature(&self, event_hash: &str) -> Result<String> {
86+
self.get_event_signature_sync(event_hash)
11287
}
11388

114-
fn event_index_status(
89+
async fn event_index_status(
11590
&self,
11691
chain_id: u64,
11792
event: &Event,
118-
) -> Pin<Box<dyn Future<Output = Result<Option<EventStatus>>> + Send + '_>> {
119-
Box::pin(std::future::ready(
120-
self.event_index_status_sync(chain_id, event),
121-
))
93+
) -> Result<Option<EventStatus>> {
94+
self.event_index_status_sync(chain_id, event)
12295
}
12396

124-
fn synchronize_events(
97+
async fn synchronize_events(
12598
&self,
12699
chain_id: u64,
127100
event_selectors: &[B256],
128101
last_processed: Option<u64>,
129-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
130-
Box::pin(std::future::ready(self.synchronize_events_sync(
131-
chain_id,
132-
event_selectors,
133-
last_processed,
134-
)))
102+
) -> Result<()> {
103+
self.synchronize_events_sync(chain_id, event_selectors, last_processed)
135104
}
136105

137-
fn send_raw_query(
138-
&self,
139-
query: &str,
140-
) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
141-
Box::pin(std::future::ready(self.send_raw_query_sync(query)))
106+
async fn send_raw_query(&self, query: &str) -> Result<Value> {
107+
self.send_raw_query_sync(query)
142108
}
143109

144-
fn list_contracts(
145-
&self,
146-
) -> Pin<Box<dyn Future<Output = Result<Vec<ContractDescriptorDb>>> + Send + '_>> {
147-
Box::pin(std::future::ready(self.list_contracts_sync()))
110+
async fn list_contracts(&self) -> Result<Vec<ContractDescriptorDb>> {
111+
self.list_contracts_sync()
148112
}
149113

150-
fn set_first_block(
151-
&self,
152-
chain_id: u64,
153-
event: &Event,
154-
block_number: u64,
155-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
156-
Box::pin(std::future::ready(self.set_first_block_sync(
157-
chain_id,
158-
event,
159-
block_number,
160-
)))
114+
async fn set_first_block(&self, chain_id: u64, event: &Event, block_number: u64) -> Result<()> {
115+
self.set_first_block_sync(chain_id, event, block_number)
161116
}
162117

163-
fn describe_database(&self) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
164-
Box::pin(std::future::ready(self.describe_database_sync()))
118+
async fn describe_database(&self) -> Result<Value> {
119+
self.describe_database_sync()
165120
}
166121
}
167122

0 commit comments

Comments
 (0)