-
Notifications
You must be signed in to change notification settings - Fork 14
wip Contract event stream #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # timeboost/src/binaries/timeboost.rs
I merge |
yep same here i tested this as well. thanks for confirming |
} | ||
} | ||
|
||
pub async fn get_committee_created_events( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove all get_
prefix for getter as per rust convention
pub async fn get_committees_for_startup( | ||
&self, | ||
current_id: u64, | ||
previous_id: Option<u64>, | ||
) -> Result<(KeyManager::Committee, Option<KeyManager::Committee>)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i find struct CommitteeManager
an overkill, and i also find this function a bit unnecessary.
usually wherever i need to fetch committee info, i simply have two lines, one line to get a KeyManager
contract instance, another line to .getCommitteeById()
on it. that's it.
if we use this struct, we first need to CommitteManager::new()
, then call this for_startup()
.
imo, the former is clearer and less indirection.
The decision to fetch "which committee" during startup could be logic that resides in timeboost
code, not here.
I vote for dropping this file.
} | ||
|
||
/// Start monitoring contract events in the background | ||
pub async fn start_monitoring(self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i find struct EventMonitor
little superfluous given that we already have KeyManagerEventMonitor
now i realize that maybe how event stream should work in my mind is different. I was thinking of not using Provider.get_logs()
with known starting point and end point, but with event streams Filter().into_stream()
(like this), and then put that async log receiving logic into part of the timeboost main loop to know when to trigger NextCommittee
For this to happen, we also need an extra websocket endpoint beside the current parent.rpc_url
, and when we construct the provider, we need ProviderBuilder::connect_ws()
@akonring @lukeiannucci @twittner care to comment here?
(you guys can take a look at timeboost-contract/events.rs
's tests for how we are getting events atm for the context of my comments above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, alloy_pubsub
, which I never use before, also provides a full-fledged pubsub layer, we can automate the reconnection, duplication handling, reorder etc.
use alloy_primitives::{Address, B256, U256};
use alloy_pubsub::{connect::PubSubConnect, service::PubSubService};
use alloy_rpc_types::{Filter, Log};
use alloy_sol_types::sol;
use alloy_transport_ws::WsConnect;
use serde_json::Value;
use tokio::task;
sol! {
event Transfer(address indexed from, address indexed to, uint256 value);
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
// Example ERC20 (USDC mainnet)
let contract: Address = "A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48".parse()?;
// Topic0: keccak("Transfer(address,address,uint256)")
let transfer_sig: B256 = alloy_primitives::keccak256("Transfer(address,address,uint256)");
// JSON-RPC filter: only this contract + this event signature
let filter = Filter::new().address(contract).topic0(transfer_sig);
// 1. WebSocket provider
let ws = WsConnect::new("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID");
// 2. Setup pubsub frontend + service
let (frontend, service) = PubSubService::new();
// 3. Spawn service in background
task::spawn(async move {
if let Err(err) = PubSubConnect::new(ws).serve(service).await {
eprintln!("pubsub service ended: {err:?}");
}
});
// 4. Subscribe to logs
let mut sub = frontend
.subscribe("eth_subscribe", ["logs", serde_json::to_value(filter)?])
.await?;
// 5. Process messages
while let Some(msg) = sub.recv().await {
// Parse into a Log type
let val: Value = msg;
if let Ok(log) = serde_json::from_value::<Log>(val.clone()) {
// Decode with the Transfer ABI
if let Ok(decoded) = Transfer::decode_log(&log, true) {
println!(
"Transfer: from={} to={} value={}",
decoded.from,
decoded.to,
decoded.value
);
} else {
eprintln!("Failed to decode log: {val:?}");
}
} else {
eprintln!("Unexpected message: {val:?}");
}
}
Ok(())
}
What do we prefer? a simple but manual event stream: https://alloy.rs/examples/subscriptions/event_multiplexer
or using pubsub?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good idea if we can refactor it to use a websocket. i didnt know alloy had this support. it is also worth noting committee changes will be very infrequent per my understanding, so im just wondering if its possible the websocket will constantly be disconnecting and reconnecting, which is still fine. perhaps i can also refactor the delayed inbox task to also use a websocket connection.
EDIT: I missed the pub sub comment, i personally like this approach. but can defer to what others say.
head up: i'm gonna work on a different branch on the |
Closes #<ISSUE_NUMBER>
This PR:
This PR does not:
Key places to review: