Skip to content

Commit 080514c

Browse files
committed
add delayed index to forwarder
1 parent 8378cb6 commit 080514c

File tree

4 files changed

+27
-22
lines changed

4 files changed

+27
-22
lines changed

timeboost-sequencer/src/delayed_inbox.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,31 @@ use alloy_rpc_types::{BlockId, BlockNumberOrTag, Filter};
77
use alloy_sol_types::{SolEvent, sol};
88
use multisig::PublicKey;
99
use tokio::time::sleep;
10-
use tracing::{error, info};
10+
use tracing::{error, info, warn};
1111

1212
use crate::queue::BundleQueue;
1313

14-
const INBOX_DELAY: Duration = Duration::from_secs(120);
14+
const INBOX_DELAY: Duration = Duration::from_secs(60);
1515

1616
sol! {
1717
event InboxMessageDelivered(uint256 indexed messageNum, bytes data);
1818
event InboxMessageDeliveredFromOrigin(uint256 indexed messageNum);
1919
}
2020

21-
pub struct DelayedInbox<N: Network> {
21+
pub struct DelayedInbox<'a, N: Network> {
2222
node: PublicKey,
2323
ibox_addr: Address,
2424
provider: RootProvider<N>,
2525
queue: BundleQueue,
26+
url: &'a str,
2627
}
2728

28-
impl<N: Network> DelayedInbox<N> {
29+
impl<'a, N: Network> DelayedInbox<'a, N> {
2930
pub async fn connect(
3031
node: PublicKey,
31-
url: &str,
32+
url: &'a str,
3233
ibox_addr: Address,
33-
l1_chain_id: u64,
34+
parent_chain_id: u64,
3435
queue: BundleQueue,
3536
) -> Result<Self, Error> {
3637
let provider = RootProvider::<N>::connect(url)
@@ -40,22 +41,20 @@ impl<N: Network> DelayedInbox<N> {
4041
.get_chain_id()
4142
.await
4243
.map_err(|e| Error::RpcError(e.to_string()))?;
43-
if l1_chain_id != rpc_chain_id {
44-
error!(%l1_chain_id, %rpc_chain_id, "mismatching chain id");
45-
return Err(Error::MismatchingChainID(l1_chain_id, rpc_chain_id));
44+
if parent_chain_id != rpc_chain_id {
45+
error!(%parent_chain_id, %rpc_chain_id, "mismatching chain id");
46+
return Err(Error::MismatchingChainID(parent_chain_id, rpc_chain_id));
4647
}
4748
Ok(Self {
4849
node,
4950
ibox_addr,
5051
provider,
5152
queue,
53+
url,
5254
})
5355
}
5456

5557
pub async fn go(self) {
56-
let Self {
57-
node, ibox_addr, ..
58-
} = self;
5958
let mut last_finalized = 0;
6059
let mut last_delayed_index = 0;
6160
let events = vec![
@@ -71,6 +70,7 @@ impl<N: Network> DelayedInbox<N> {
7170
{
7271
let finalized = b.header().number();
7372
if finalized == last_finalized {
73+
sleep(INBOX_DELAY).await;
7474
continue;
7575
}
7676
if last_finalized == 0 {
@@ -84,31 +84,33 @@ impl<N: Network> DelayedInbox<N> {
8484
.from_block(last_finalized)
8585
.to_block(finalized)
8686
.events(&events);
87+
last_finalized = finalized;
8788

8889
if let Ok(mut logs) = self.provider.get_logs(&filter).await {
89-
last_finalized = finalized;
9090
// Make sure event logs are in order, we need highest block number first
9191
logs.sort_by(|a, b| {
9292
b.block_number
9393
.cmp(&a.block_number)
9494
.then(b.log_index.cmp(&a.log_index))
9595
});
96-
if let Some((Some(tx_hash), Some(msg_num))) = logs
96+
if let Some((Some(tx_hash), Some(index))) = logs
9797
.first()
9898
.map(|log| (log.transaction_hash, log.topics().get(1)))
9999
{
100100
// Update delayed index if newer
101-
let delayed_index = U256::from_be_bytes(msg_num.0)
101+
let delayed_index = U256::from_be_bytes(index.0)
102102
.try_into()
103103
.expect("valid msg number");
104104
if delayed_index != last_delayed_index {
105105
debug_assert!(delayed_index > last_delayed_index);
106-
info!(%node, ?ibox_addr, %last_finalized, %delayed_index, %tx_hash, "delayed index updated");
106+
info!(node = %self.node, ibox_addr = %self.ibox_addr, parent_finalized_block = %finalized, %delayed_index, %tx_hash, "delayed index updated");
107107
last_delayed_index = delayed_index;
108108
self.queue.set_delayed_inbox_index(delayed_index.into());
109109
}
110110
}
111111
}
112+
} else {
113+
warn!(node = %self.node, ibox_addr = %self.ibox_addr, url = %self.url, "failed to get latest finalized block");
112114
}
113115
sleep(INBOX_DELAY).await;
114116
}

timeboost-sequencer/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use sailfish::consensus::{Consensus, ConsensusMetrics};
1919
use sailfish::rbc::{Rbc, RbcError, RbcMetrics};
2020
use sailfish::types::{Action, ConsensusTime, Evidence, Round, RoundNumber};
2121
use sailfish::{Coordinator, Event};
22-
use timeboost_types::{BundleVariant, Timestamp, Transaction};
22+
use timeboost_types::{BundleVariant, DelayedInboxIndex, Timestamp, Transaction};
2323
use timeboost_types::{CandidateList, CandidateListBytes, InclusionList};
2424
use tokio::select;
2525
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -44,6 +44,7 @@ pub enum Output {
4444
round: RoundNumber,
4545
timestamp: Timestamp,
4646
transactions: Vec<Transaction>,
47+
delayed_inbox_index: DelayedInboxIndex,
4748
},
4849
UseCommittee(Round),
4950
}
@@ -300,9 +301,10 @@ impl Task {
300301
Ok(incl) => {
301302
let round = incl.round();
302303
let timestamp = incl.timestamp();
304+
let delayed_inbox_index = incl.delayed_inbox_index();
303305
let transactions = self.sorter.sort(incl);
304306
if !transactions.is_empty() {
305-
let out = Output::Transactions { round, timestamp, transactions };
307+
let out = Output::Transactions { round, timestamp, transactions, delayed_inbox_index };
306308
self.output.send(out).await.map_err(|_| TimeboostError::ChannelClosed)?;
307309
}
308310
if self.decrypter.has_capacity() {

timeboost/src/forwarder/nitro_forwarder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use multisig::PublicKey;
88
use sailfish::types::RoundNumber;
99
use timeboost_proto::forward::forward_api_client::ForwardApiClient;
1010
use timeboost_proto::inclusion::InclusionList;
11-
use timeboost_types::{Timestamp, Transaction};
11+
use timeboost_types::{DelayedInboxIndex, Timestamp, Transaction};
1212
use tokio::sync::mpsc::{Sender, channel};
1313
use tokio::task::JoinHandle;
1414
use tonic::transport::Endpoint;
@@ -44,6 +44,7 @@ impl NitroForwarder {
4444
round: RoundNumber,
4545
timestamp: Timestamp,
4646
txns: &[Transaction],
47+
index: DelayedInboxIndex,
4748
) -> Result<(), Error> {
4849
let incl = InclusionList {
4950
round: *round,
@@ -56,7 +57,7 @@ impl NitroForwarder {
5657
})
5758
.collect(),
5859
consensus_timestamp: timestamp.into(),
59-
delayed_messages_read: 0,
60+
delayed_messages_read: index.into(),
6061
};
6162
self.incls_tx
6263
.send(incl)

timeboost/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,15 @@ impl Timeboost {
104104
}
105105
},
106106
out = self.sequencer.next() => match out {
107-
Ok(Output::Transactions { round, timestamp, transactions }) => {
107+
Ok(Output::Transactions { round, timestamp, transactions, delayed_inbox_index }) => {
108108
info!(
109109
node = %self.label,
110110
round = %round,
111111
trxs = %transactions.len(),
112112
"sequencer output"
113113
);
114114
if let Some(ref mut f) = self.nitro_forwarder {
115-
f.enqueue(round, timestamp, &transactions).await?;
115+
f.enqueue(round, timestamp, &transactions, delayed_inbox_index).await?;
116116
}
117117
else {
118118
warn!(node = %self.label, %round, "no forwarder => dropping output")

0 commit comments

Comments
 (0)