Skip to content

Commit 4547bcb

Browse files
committed
Stream<CommitteeInfo> and separating pubsub backend handle
1 parent 76c2277 commit 4547bcb

File tree

5 files changed

+207
-94
lines changed

5 files changed

+207
-94
lines changed

timeboost-contract/src/provider.rs

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
//! Helper functions to build Ethereum [providers](https://docs.rs/alloy/latest/alloy/providers/trait.Provider.html)
22
//! Partial Credit: <https://github.com/EspressoSystems/espresso-network/tree/main/contracts/rust/deployer>
33
4+
use std::{ops::Deref, pin::Pin};
5+
46
use alloy::{
7+
eips::BlockNumberOrTag,
58
network::EthereumWallet,
6-
providers::ProviderBuilder,
9+
primitives::Address,
10+
providers::{Provider, ProviderBuilder},
11+
rpc::types::{Filter, Log},
712
signers::local::{LocalSignerError, MnemonicBuilder, PrivateKeySigner, coins_bip39::English},
8-
transports::http::reqwest::Url,
13+
sol_types::SolEvent,
14+
transports::{http::reqwest::Url, ws::WsConnect},
915
};
10-
use timeboost_types::HttpProviderWithWallet;
16+
use futures::{Stream, StreamExt};
17+
use timeboost_types::{HttpProvider, HttpProviderWithWallet};
18+
use tracing::error;
1119

1220
/// Build a local signer from wallet mnemonic and account index
1321
pub fn build_signer(
@@ -31,3 +39,102 @@ pub fn build_provider(
3139
let wallet = EthereumWallet::from(signer);
3240
Ok(ProviderBuilder::new().wallet(wallet).connect_http(url))
3341
}
42+
43+
/// A PubSub service (with backend handle), disconnect on drop.
44+
#[derive(Clone)]
45+
pub struct PubSubProvider(HttpProvider);
46+
47+
impl Deref for PubSubProvider {
48+
type Target = HttpProvider;
49+
fn deref(&self) -> &Self::Target {
50+
&self.0
51+
}
52+
}
53+
54+
impl PubSubProvider {
55+
pub async fn new(ws_url: Url) -> anyhow::Result<Self> {
56+
let ws = WsConnect::new(ws_url);
57+
let provider = ProviderBuilder::new()
58+
.connect_pubsub_with(ws)
59+
.await
60+
.map_err(|err| {
61+
error!(?err, "event pubsub failed to start");
62+
err
63+
})?;
64+
Ok(Self(provider))
65+
}
66+
67+
pub fn inner(&self) -> &HttpProvider {
68+
&self.0
69+
}
70+
71+
/// create an event stream of event type `E`, subscribing since `from_block` on `contract`
72+
pub async fn event_stream<E: SolEvent>(
73+
&self,
74+
contract: Address,
75+
from_block: BlockNumberOrTag,
76+
) -> anyhow::Result<Pin<Box<dyn Stream<Item = Log<E>>>>> {
77+
let filter = Filter::new()
78+
.address(contract)
79+
.event(E::SIGNATURE)
80+
.from_block(from_block);
81+
82+
let events = self
83+
.subscribe_logs(&filter)
84+
.await
85+
.map_err(|err| {
86+
error!(?err, "pubsub subscription failed");
87+
err
88+
})?
89+
.into_stream();
90+
91+
let validated = events.filter_map(|log| async move {
92+
let Ok(event) = log.log_decode_validate::<E>() else {
93+
error!("fail to parse CommitteeCreated event log");
94+
return None;
95+
};
96+
Some(event)
97+
});
98+
99+
Ok(Box::pin(validated))
100+
}
101+
102+
/// Returns the smallest block number whose timestamp is >= `target_ts` through binary search.
103+
/// Useful to determine `from_block` input of `Self::event_strea()` subscription.
104+
pub async fn get_block_number_by_timestamp(
105+
&self,
106+
target_ts: u64,
107+
) -> anyhow::Result<Option<u64>> {
108+
let latest = self.get_block_number().await?;
109+
let mut lo: u64 = 0;
110+
let mut hi: u64 = latest;
111+
112+
while lo <= hi {
113+
let mid = lo + (hi - lo) / 2;
114+
115+
let block = match self
116+
.0
117+
.get_block_by_number(BlockNumberOrTag::Number(mid))
118+
.await?
119+
{
120+
Some(b) => b,
121+
None => {
122+
lo = mid + 1;
123+
continue;
124+
}
125+
};
126+
127+
if block.header.timestamp >= target_ts {
128+
if mid == 0 {
129+
return Ok(Some(0));
130+
}
131+
hi = mid - 1;
132+
} else {
133+
lo = mid + 1;
134+
}
135+
}
136+
137+
// At this point, `lo` is the smallest index with ts >= target_ts (if any)
138+
if lo > latest { Ok(None) } else { Ok(Some(lo)) }
139+
}
140+
}

timeboost/src/binaries/timeboost.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,20 @@ async fn main() -> Result<()> {
9494

9595
let pubkey = sign_keypair.public_key();
9696

97+
// optionally fetch previous committee info
98+
let cid: u64 = cli.committee_id.into();
99+
let prev_comm = if cid > 0u64 {
100+
let c = &node_config.chain.parent;
101+
let prev_comm =
102+
CommitteeInfo::fetch(c.rpc_url.clone(), c.key_manager_contract, cid - 1).await?;
103+
Some(prev_comm)
104+
} else {
105+
None
106+
};
107+
97108
let config = TimeboostConfig::builder()
98109
.sailfish_committee(sailfish_committee)
110+
.maybe_prev_committee(prev_comm)
99111
.decrypt_committee(decrypt_committee)
100112
.certifier_committee(certifier_committee)
101113
.sign_keypair(sign_keypair)

timeboost/src/committee.rs

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
//! Syncing committee info from the KeyManager contract
22
33
use std::pin::Pin;
4-
use std::task::{Context, Poll};
54

65
use alloy::{
76
eips::BlockNumberOrTag,
87
primitives::Address,
98
providers::{Provider, ProviderBuilder},
10-
rpc::types::Filter,
11-
sol_types::SolEvent,
12-
transports::ws::WsConnect,
139
};
1410
use anyhow::{Context as AnyhowContext, Result};
1511
use cliquenet::AddressableCommittee;
@@ -18,11 +14,15 @@ use itertools::{Itertools, izip};
1814
use multisig::{Committee, CommitteeId, x25519};
1915
use timeboost_config::{CERTIFIER_PORT_OFFSET, DECRYPTER_PORT_OFFSET, ParentChain};
2016
use timeboost_contract::KeyManager::{self, CommitteeCreated};
17+
use timeboost_contract::provider::PubSubProvider;
2118
use timeboost_crypto::prelude::DkgEncKey;
2219
use timeboost_types::{KeyStore, Timestamp};
2320
use tracing::error;
2421
use url::Url;
2522

23+
/// Type alias for the committee stream
24+
pub type NewCommitteeStream = Pin<Box<dyn Stream<Item = CommitteeInfo>>>;
25+
2626
/// The committee info stored on the KeyManager contract, a subset of [`CommitteeConfig`]
2727
/// Keys and hosts are ordered in the same as they were registered (with KeyId from 0..n)
2828
#[derive(Debug, Clone)]
@@ -39,7 +39,14 @@ impl CommitteeInfo {
3939
/// Fetch the committee info for `committee_id` from `key_manager_addr` on chain
4040
pub async fn fetch(rpc: Url, key_manager_addr: Address, committee_id: u64) -> Result<Self> {
4141
let provider = ProviderBuilder::new().connect_http(rpc);
42+
Self::fetch_with(provider, key_manager_addr, committee_id).await
43+
}
4244

45+
pub(crate) async fn fetch_with(
46+
provider: impl Provider,
47+
key_manager_addr: Address,
48+
committee_id: u64,
49+
) -> Result<Self> {
4350
let contract = KeyManager::new(key_manager_addr, &provider);
4451
let c = contract.getCommitteeById(committee_id).call().await?;
4552

@@ -71,6 +78,10 @@ impl CommitteeInfo {
7178
})
7279
}
7380

81+
pub fn id(&self) -> CommitteeId {
82+
self.id
83+
}
84+
7485
pub fn effective_timestamp(&self) -> Timestamp {
7586
self.timestamp
7687
}
@@ -138,64 +149,44 @@ impl CommitteeInfo {
138149
.map(|(i, k)| (i as u8, k.clone())),
139150
)
140151
}
141-
}
142152

143-
/// An pubsub-provider-holding event stream. (the pubsub will close on drop)
144-
pub struct NewCommitteeStream {
145-
_provider: Box<dyn Provider>,
146-
inner: Pin<Box<dyn Stream<Item = u64> + Send>>,
147-
}
148-
149-
impl Stream for NewCommitteeStream {
150-
type Item = u64;
151-
152-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153-
self.inner.as_mut().poll_next(cx)
154-
}
155-
}
156-
157-
impl NewCommitteeStream {
158-
pub async fn create(config: &ParentChain) -> Result<Self> {
159-
// setup the websocket for contract event stream
160-
let ws = WsConnect::new(config.ws_url.clone());
161-
// spawn the pubsub service (and backend) and the frontend is registered at the provider
162-
let provider = ProviderBuilder::new()
163-
.connect_pubsub_with(ws)
153+
/// subscribe an event stream
154+
pub async fn new_committee_stream(
155+
provider: &PubSubProvider,
156+
start_ts: Timestamp,
157+
config: &ParentChain,
158+
) -> Result<NewCommitteeStream> {
159+
let from_block = provider
160+
.get_block_number_by_timestamp(start_ts.into())
161+
.await?
162+
.unwrap_or_default();
163+
let events = provider
164+
.event_stream::<CommitteeCreated>(
165+
config.key_manager_contract,
166+
BlockNumberOrTag::Number(from_block),
167+
)
164168
.await
165-
.map_err(|err| {
166-
error!(?err, "event pubsub failed to start");
167-
err
169+
.map_err(|e| {
170+
error!("Failed to create CommitteeCreated stream: {:?}", e);
171+
e
168172
})?;
169173

170-
let chain_id = config.id;
171-
let tag = if chain_id == 31337 || chain_id == 1337 {
172-
// local test chain, we start scanning from the genesis
173-
BlockNumberOrTag::Number(0)
174-
} else {
175-
config.block_tag
176-
};
177-
178-
let filter = Filter::new()
179-
.address(config.key_manager_contract)
180-
.event(KeyManager::CommitteeCreated::SIGNATURE)
181-
.from_block(tag);
182-
let events = provider
183-
.subscribe_logs(&filter)
184-
.await
185-
.map_err(|err| {
186-
error!(?err, "pubsub subscription failed");
187-
err
188-
})?
189-
.into_stream();
190-
191-
let validated = events.filter_map(|log| async move {
192-
log.log_decode_validate::<CommitteeCreated>()
193-
.ok()
194-
.map(|v| v.data().id)
174+
let provider = provider.clone();
175+
let key_manager_contract = config.key_manager_contract;
176+
let s = events.filter_map(move |log| {
177+
let provider = provider.clone();
178+
async move {
179+
let id = log.data().id;
180+
match CommitteeInfo::fetch_with(provider.inner(), key_manager_contract, id).await {
181+
Ok(comm_info) => Some(comm_info),
182+
Err(_) => {
183+
error!(committee_id = %id, "fail to fetch new CommitteeInfo");
184+
None
185+
}
186+
}
187+
}
195188
});
196-
Ok(Self {
197-
_provider: Box::new(provider),
198-
inner: Box::pin(validated),
199-
})
189+
190+
Ok(Box::pin(s))
200191
}
201192
}

timeboost/src/conf.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,19 @@ use timeboost_crypto::prelude::DkgDecKey;
77
use timeboost_sequencer::SequencerConfig;
88
use timeboost_types::{KeyStore, ThresholdKeyCell};
99

10+
use crate::committee::CommitteeInfo;
11+
1012
#[derive(Debug, Clone, Builder)]
1113
pub struct TimeboostConfig {
1214
/// The sailfish peers that this node will connect to.
1315
pub(crate) sailfish_committee: AddressableCommittee,
1416

15-
/// Previous sailfish peers
16-
pub(crate) prev_sailfish_committee: Option<AddressableCommittee>,
17+
/// Previous committee info stored on chain
18+
pub(crate) prev_committee: Option<CommitteeInfo>,
1719

1820
/// The decrypt peers that this node will connect to.
1921
pub(crate) decrypt_committee: AddressableCommittee,
2022

21-
/// Previous decrypt peers
22-
pub(crate) prev_decrypt_committee: Option<(AddressableCommittee, KeyStore)>,
23-
2423
/// The block certifier peers that this node will connect to.
2524
pub(crate) certifier_committee: AddressableCommittee,
2625

@@ -80,8 +79,14 @@ impl TimeboostConfig {
8079
.sailfish_committee(self.sailfish_committee.clone())
8180
.decrypt_committee((self.decrypt_committee.clone(), self.key_store.clone()))
8281
.recover(self.recover)
83-
.maybe_previous_sailfish_committee(self.prev_sailfish_committee.clone())
84-
.maybe_previous_decrypt_committee(self.prev_decrypt_committee.clone())
82+
.maybe_previous_sailfish_committee(
83+
self.prev_committee.as_ref().map(|c| c.sailfish_committee()),
84+
)
85+
.maybe_previous_decrypt_committee(
86+
self.prev_committee
87+
.as_ref()
88+
.map(|c| (c.decrypt_committee(), c.dkg_key_store())),
89+
)
8590
.leash_len(self.leash_len)
8691
.threshold_dec_key(self.threshold_dec_key.clone())
8792
.chain_config(self.chain_config.clone())

0 commit comments

Comments
 (0)