Skip to content

Commit 9b86669

Browse files
greged93frisitano
andauthored
fix: engine and signer panics (#137)
* fix: engine and signer panics * feat: answer comments * fix: lints * feat: add a ticker in the signer loop * update signer control flow and graceful shutdown --------- Co-authored-by: frisitano <35734660+frisitano@users.noreply.github.com> Co-authored-by: frisitano <francesco.risitano95@gmail.com>
1 parent 3e02fc3 commit 9b86669

File tree

8 files changed

+129
-37
lines changed

8 files changed

+129
-37
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/engine/src/future/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,7 @@ where
234234
// this payload has already been passed to the EN in the form of a P2P gossiped
235235
// execution payload. We can advance the safe head by one by issuing a
236236
// forkchoiceUpdated.
237-
let safe_block_info: L2BlockInfoWithL1Messages =
238-
execution_payload.try_into().map_err(EngineDriverError::InvalidExecutionPayload)?;
237+
let safe_block_info: L2BlockInfoWithL1Messages = (&execution_payload).into();
239238
fcs.safe_block_hash = safe_block_info.block_info.hash;
240239
forkchoice_updated(client, fcs, None).await?;
241240
Ok((safe_block_info, false, batch_info))
@@ -255,8 +254,7 @@ where
255254
)
256255
.await?;
257256
// issue the execution payload to the EL
258-
// TODO: remove clone
259-
let safe_block_info: L2BlockInfoWithL1Messages = execution_payload.clone().try_into()?;
257+
let safe_block_info: L2BlockInfoWithL1Messages = (&execution_payload).into();
260258
let result = new_payload(client.clone(), execution_payload.into_v1()).await?;
261259

262260
// we should only have a valid payload when deriving from payload attributes (should not

crates/manager/src/manager/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ where
289289
if let Some(event_sender) = self.event_sender.as_ref() {
290290
event_sender.notify(RollupManagerEvent::BlockImported(block.clone()));
291291
}
292-
self.indexer.handle_block(block.into(), None);
292+
self.indexer.handle_block((&block).into(), None);
293293
}
294294
self.network.handle().block_import_outcome(outcome);
295295
}
@@ -302,7 +302,7 @@ where
302302
event_sender.notify(RollupManagerEvent::BlockSequenced(payload.clone()));
303303
}
304304

305-
self.indexer.handle_block(payload.into(), None);
305+
self.indexer.handle_block((&payload).into(), None);
306306
}
307307
EngineDriverEvent::L1BlockConsolidated((block_info, batch_info)) => {
308308
self.indexer.handle_block(block_info.clone(), Some(batch_info));

crates/node/src/add_ons/handle.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ use reth_node_api::FullNodeComponents;
22
use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
33
use reth_rpc_eth_api::EthApiTypes;
44
use rollup_node_manager::RollupManagerHandle;
5-
use rollup_node_watcher::L1Notification;
6-
use std::sync::Arc;
7-
use tokio::sync::mpsc::Sender;
5+
#[cfg(feature = "test-utils")]
6+
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};
87

98
/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
109
#[derive(Debug, Clone)]

crates/node/src/add_ons/rollup.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
args::{L1ProviderArgs, ScrollRollupNodeConfig},
33
constants::PROVIDER_BLOB_CACHE_SIZE,
44
};
5+
56
use alloy_provider::ProviderBuilder;
67
use alloy_rpc_client::RpcClient;
78
use alloy_signer_local::PrivateKeySigner;
@@ -211,7 +212,7 @@ impl RollupManagerAddOn {
211212
.then_some(ctx.node.network().eth_wire_block_listener().await?);
212213

213214
// Instantiate the signer
214-
let signer = self.config.test.then_some(Signer::spawn(PrivateKeySigner::random()).await);
215+
let signer = self.config.test.then_some(Signer::spawn(PrivateKeySigner::random()));
215216

216217
// Spawn the rollup node manager
217218
let rnm = RollupNodeManager::new(

crates/primitives/src/block.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use alloy_eips::BlockNumHash;
1+
use alloy_eips::{BlockNumHash, Decodable2718};
22
use alloy_primitives::{B256, U256};
3-
use alloy_rpc_types_engine::{ExecutionPayload, PayloadError};
3+
use alloy_rpc_types_engine::ExecutionPayload;
44
use reth_primitives_traits::transaction::signed::SignedTransaction;
5-
use reth_scroll_primitives::ScrollBlock;
5+
use reth_scroll_primitives::{ScrollBlock, ScrollTransactionSigned};
6+
use scroll_alloy_consensus::L1_MESSAGE_TRANSACTION_TYPE;
67
use std::vec::Vec;
78

89
/// The default block difficulty for a scroll block.
@@ -68,26 +69,37 @@ pub struct L2BlockInfoWithL1Messages {
6869
pub l1_messages: Vec<B256>,
6970
}
7071

71-
impl TryFrom<ExecutionPayload> for L2BlockInfoWithL1Messages {
72-
type Error = PayloadError;
73-
74-
fn try_from(value: ExecutionPayload) -> Result<Self, Self::Error> {
75-
value.try_into_block().map(Into::into)
76-
}
77-
}
78-
79-
impl From<ScrollBlock> for L2BlockInfoWithL1Messages {
80-
// TODO: It would be more efficient to convert payload transactions from bytes to
81-
// ScrollSignedTransaction directly, instead of converting the whole block (we would avoid
82-
// calculating the transactions state root)
83-
fn from(value: ScrollBlock) -> Self {
72+
impl From<&ScrollBlock> for L2BlockInfoWithL1Messages {
73+
fn from(value: &ScrollBlock) -> Self {
8474
let block_number = value.number;
8575
let block_hash = value.hash_slow();
8676
let l1_messages = value
8777
.body
8878
.transactions
89-
.into_iter()
90-
.filter_map(|tx| tx.is_l1_message().then(|| *tx.tx_hash()))
79+
.iter()
80+
.filter(|tx| tx.is_l1_message())
81+
.map(|tx| *tx.tx_hash())
82+
.collect();
83+
Self { block_info: BlockInfo { number: block_number, hash: block_hash }, l1_messages }
84+
}
85+
}
86+
87+
impl From<&ExecutionPayload> for L2BlockInfoWithL1Messages {
88+
fn from(value: &ExecutionPayload) -> Self {
89+
let block_number = value.block_number();
90+
let block_hash = value.block_hash();
91+
let l1_messages = value
92+
.as_v1()
93+
.transactions
94+
.iter()
95+
.filter_map(|raw| {
96+
(raw.as_ref().first() == Some(&L1_MESSAGE_TRANSACTION_TYPE))
97+
.then(|| {
98+
let tx = ScrollTransactionSigned::decode_2718(&mut raw.as_ref()).ok()?;
99+
Some(*tx.tx_hash())
100+
})
101+
.flatten()
102+
})
91103
.collect();
92104
Self { block_info: BlockInfo { number: block_number, hash: block_hash }, l1_messages }
93105
}

crates/signer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ tracing.workspace = true
2828

2929
[dev-dependencies]
3030
alloy-signer-local.workspace = true
31+
reth-tracing.workspace = true
3132

3233
[features]
3334
default = []

crates/signer/src/lib.rs

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ pub struct Signer {
3939
}
4040

4141
impl Signer {
42-
/// Creates a new `Signer` instance.
43-
pub async fn spawn(signer: impl alloy_signer::Signer + Send + Sync + 'static) -> SignerHandle {
42+
/// Creates a new [`Signer`] instance and [`SignerHandle`] with the provided signer.
43+
fn new(signer: impl alloy_signer::Signer + Send + Sync + 'static) -> (Self, SignerHandle) {
4444
let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
4545
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
4646
let signer = Self {
@@ -49,8 +49,14 @@ impl Signer {
4949
in_progress: FuturesOrdered::new(),
5050
sender: event_tx,
5151
};
52+
(signer, SignerHandle::new(req_tx, event_rx.into()))
53+
}
54+
55+
/// Spawns a new `Signer` instance onto the tokio runtime.
56+
pub fn spawn(signer: impl alloy_signer::Signer + Send + Sync + 'static) -> SignerHandle {
57+
let (signer, handle) = Self::new(signer);
5258
tokio::spawn(signer.run());
53-
SignerHandle::new(req_tx, event_rx.into())
59+
handle
5460
}
5561

5662
/// Execution loop for the signer.
@@ -67,16 +73,24 @@ impl Signer {
6773

6874
}
6975
}
70-
Some(result) = self.in_progress.next() => {
76+
Some(result) = self.in_progress.next(), if !self.in_progress.is_empty() => {
7177
match result {
72-
Ok(event) => self.sender.send(event).expect("The event channel is closed"),
78+
Ok(event) => {
79+
if self.sender.send(event).is_err() {
80+
tracing::info!(target: "scroll::signer", "The event channel has been closed - shutting down.");
81+
break;
82+
}
83+
},
7384
Err(err) => {
74-
tracing::error!(target: "rollup_node::signer", ?err, "An error occurred while signing");
85+
tracing::error!(target: "scroll::signer", ?err, "An error occurred while signing");
7586
}
7687
}
77-
7888
}
79-
else => ()
89+
else => {
90+
// The request channel is closed, exit the loop.
91+
tracing::info!(target: "scroll::signer", "Signer request channel has been closed - shutting down.");
92+
break;
93+
}
8094
}
8195
}
8296
}
@@ -102,8 +116,9 @@ mod tests {
102116

103117
#[tokio::test]
104118
async fn test_signer_local() {
119+
reth_tracing::init_test_tracing();
105120
let signer = PrivateKeySigner::random();
106-
let mut handle = Signer::spawn(Box::new(signer.clone())).await;
121+
let mut handle = Signer::spawn(Box::new(signer.clone()));
107122

108123
// Test sending a request
109124
let block = ScrollBlock::default();
@@ -120,4 +135,69 @@ mod tests {
120135
assert_eq!(event_block, block);
121136
assert_eq!(recovered_address, signer.address());
122137
}
138+
139+
// The following tests do not have any assertions, they are just to ensure that the
140+
// shutdown logic works correctly and does not panic. You can observer the logs to see if the
141+
// shutdown logic is executed correctly.
142+
143+
#[tokio::test]
144+
async fn test_drop_signer_handle() {
145+
reth_tracing::init_test_tracing();
146+
147+
// Create a local signer and the signer service
148+
let key = PrivateKeySigner::random();
149+
let (signer, handle) = Signer::new(Box::new(key.clone()));
150+
151+
// Spawn the signer task and capture the JoinHandle
152+
let task = tokio::spawn(signer.run());
153+
154+
// Drop the handle to simulate shutdown
155+
drop(handle);
156+
157+
// Wait for the signer task to complete
158+
task.await.expect("Signer task panicked");
159+
}
160+
161+
#[tokio::test]
162+
async fn test_drop_signer_handle_with_wait() {
163+
reth_tracing::init_test_tracing();
164+
165+
// Create a local signer and the signer service
166+
let key = PrivateKeySigner::random();
167+
let (signer, handle) = Signer::new(Box::new(key.clone()));
168+
169+
// Spawn the signer task and capture the JoinHandle
170+
let task = tokio::spawn(signer.run());
171+
172+
// wait to observe if the else block will be executed.
173+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
174+
175+
// Drop the handle to simulate shutdown
176+
drop(handle);
177+
178+
// Wait for the signer task to complete
179+
task.await.expect("signer task panicked");
180+
}
181+
182+
#[tokio::test]
183+
async fn test_drop_signer_handle_with_request() {
184+
reth_tracing::init_test_tracing();
185+
186+
// Create a local signer and the signer service
187+
let key = PrivateKeySigner::random();
188+
let (signer, handle) = Signer::new(Box::new(key.clone()));
189+
190+
// Spawn the signer task and capture the JoinHandle
191+
let task = tokio::spawn(signer.run());
192+
193+
// Send a signing request through the handle
194+
let block = ScrollBlock::default();
195+
handle.sign_block(block.clone()).unwrap();
196+
197+
// Drop the handle to simulate shutdown
198+
drop(handle);
199+
200+
// Wait for the signer task to complete
201+
task.await.expect("Signer task panicked");
202+
}
123203
}

0 commit comments

Comments
 (0)