Skip to content

Commit b947df8

Browse files
committed
feat: pipeline modifications
1 parent e8b4ca6 commit b947df8

File tree

4 files changed

+68
-43
lines changed

4 files changed

+68
-43
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/derivation-pipeline/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ rollup-node-providers.workspace = true
2626
scroll-codec.workspace = true
2727

2828
# misc
29+
futures.workspace = true
2930
thiserror.workspace = true
3031

3132
[dev-dependencies]

crates/derivation-pipeline/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub enum DerivationPipelineError {
1010
/// Missing L1 messages cursor.
1111
#[error("missing l1 message queue cursor")]
1212
MissingL1MessageQueueCursor,
13+
/// Missing L1 message.
14+
#[error("missing l1 message")]
15+
MissingL1Message,
1316
/// An error at the L1 provider.
1417
#[error(transparent)]
1518
L1Provider(#[from] L1ProviderError),

crates/derivation-pipeline/src/lib.rs

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,33 @@ mod error;
1212
extern crate alloc as std;
1313

1414
use crate::{data_source::CodecDataSource, error::DerivationPipelineError};
15-
use std::vec::Vec;
15+
use std::{sync::Arc, vec::Vec};
1616

1717
use alloy_primitives::B256;
1818
use alloy_rpc_types_engine::PayloadAttributes;
19+
use futures::{stream, Stream, StreamExt};
1920
use reth_scroll_chainspec::SCROLL_FEE_VAULT_ADDRESS;
2021
use rollup_node_primitives::BatchCommitData;
21-
use rollup_node_providers::{L1Provider, L1ProviderError};
22+
use rollup_node_providers::L1Provider;
2223
use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes;
2324
use scroll_codec::Codec;
2425

2526
/// Returns an iterator over [`ScrollPayloadAttributes`] from the [`BatchCommitData`] and a
2627
/// [`L1Provider`].
27-
pub async fn derive<P: L1Provider<Error = L1ProviderError>>(
28+
pub async fn derive<P: L1Provider>(
2829
batch: BatchCommitData,
2930
l1_provider: &mut P,
30-
) -> Result<impl Iterator<Item = ScrollPayloadAttributes> + use<'_, P>, DerivationPipelineError> {
31+
) -> Result<
32+
impl Stream<Item = Result<ScrollPayloadAttributes, DerivationPipelineError>> + use<'_, P>,
33+
DerivationPipelineError,
34+
> {
3135
// fetch the blob then decode the input batch.
3236
let blob = if let Some(hash) = batch.blob_versioned_hash {
3337
l1_provider.blob(batch.block_timestamp, hash).await?
3438
} else {
3539
None
3640
};
37-
let data =
38-
CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_ref().map(|v| &**v) };
41+
let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_deref() };
3942
let decoded = Codec::decode(&data)?;
4043

4144
// set the cursor for the l1 provider.
@@ -46,39 +49,45 @@ pub async fn derive<P: L1Provider<Error = L1ProviderError>>(
4649
l1_provider.set_hash_cursor(*hash);
4750
// we skip the first l1 message, as we are interested in the one starting after
4851
// prev_l1_message_queue_hash.
49-
let _ = l1_provider.next_l1_message();
52+
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
5053
} else {
5154
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
5255
}
5356

54-
let iter = decoded.data.into_l2_blocks().into_iter().map(|mut block| {
55-
// query the appropriate amount of l1 messages.
56-
let mut txs = (0..block.context.num_l1_messages)
57-
.map(|_| l1_provider.next_l1_message())
58-
.map(|tx| {
57+
let provider = Arc::new(&*l1_provider);
58+
let iter = stream::iter(decoded.data.into_l2_blocks())
59+
.map(move |data| (data, provider.clone()))
60+
.then(|(mut block, provider)| async move {
61+
// query the appropriate amount of l1 messages.
62+
let mut txs = Vec::with_capacity(block.context.num_l1_messages as usize);
63+
for _ in 0..block.context.num_l1_messages {
64+
let l1_message = provider
65+
.next_l1_message()
66+
.await
67+
.map_err(Into::into)?
68+
.ok_or(DerivationPipelineError::MissingL1Message)?;
5969
let mut bytes = Vec::new();
60-
tx.eip2718_encode(&mut bytes);
61-
bytes.into()
70+
l1_message.eip2718_encode(&mut bytes);
71+
txs.push(bytes.into());
72+
}
73+
74+
// add the block transactions.
75+
txs.append(&mut block.transactions);
76+
77+
// construct the payload attributes.
78+
Ok(ScrollPayloadAttributes {
79+
payload_attributes: PayloadAttributes {
80+
timestamp: block.context.timestamp,
81+
prev_randao: B256::ZERO,
82+
// TODO: this should be based off the current configuration value.
83+
suggested_fee_recipient: SCROLL_FEE_VAULT_ADDRESS,
84+
withdrawals: None,
85+
parent_beacon_block_root: None,
86+
},
87+
transactions: Some(txs),
88+
no_tx_pool: true,
6289
})
63-
.collect::<Vec<_>>();
64-
65-
// add the block transactions.
66-
txs.append(&mut block.transactions);
67-
68-
// construct the payload attributes.
69-
ScrollPayloadAttributes {
70-
payload_attributes: PayloadAttributes {
71-
timestamp: block.context.timestamp,
72-
prev_randao: B256::ZERO,
73-
// TODO: this should be based off the current configuration value.
74-
suggested_fee_recipient: SCROLL_FEE_VAULT_ADDRESS,
75-
withdrawals: None,
76-
parent_beacon_block_root: None,
77-
},
78-
transactions: Some(txs),
79-
no_tx_pool: true,
80-
}
81-
});
90+
});
8291

8392
Ok(iter)
8493
}
@@ -90,7 +99,7 @@ mod tests {
9099

91100
use alloy_eips::eip4844::Blob;
92101
use alloy_primitives::{address, b256, bytes, U256};
93-
use rollup_node_providers::L1MessageProvider;
102+
use rollup_node_providers::{L1MessageProvider, L1ProviderError};
94103
use scroll_alloy_consensus::TxL1Message;
95104
use scroll_codec::decoding::test_utils::read_to_bytes;
96105
use tokio::sync::Mutex;
@@ -99,22 +108,30 @@ mod tests {
99108
messages: Arc<Mutex<Vec<TxL1Message>>>,
100109
}
101110

111+
struct Infallible;
112+
impl From<Infallible> for L1ProviderError {
113+
fn from(_value: Infallible) -> Self {
114+
Self::Other("infallible")
115+
}
116+
}
117+
102118
#[async_trait::async_trait]
103119
impl L1BlobProvider for TestL1MessageProvider {
104-
type Error = L1ProviderError;
105-
106120
async fn blob(
107121
&self,
108122
_block_timestamp: u64,
109123
_hash: B256,
110-
) -> Result<Option<Arc<Blob>>, Self::Error> {
124+
) -> Result<Option<Arc<Blob>>, L1ProviderError> {
111125
Ok(None)
112126
}
113127
}
114128

129+
#[async_trait::async_trait]
115130
impl L1MessageProvider for TestL1MessageProvider {
116-
fn next_l1_message(&self) -> TxL1Message {
117-
self.messages.try_lock().expect("lock is free").remove(0)
131+
type Error = Infallible;
132+
133+
async fn next_l1_message(&self) -> Result<Option<TxL1Message>, Self::Error> {
134+
Ok(Some(self.messages.try_lock().expect("lock is free").remove(0)))
118135
}
119136

120137
fn set_index_cursor(&mut self, _index: u64) {}
@@ -152,8 +169,10 @@ mod tests {
152169
}];
153170
let mut provider = TestL1MessageProvider { messages: Arc::new(Mutex::new(l1_messages)) };
154171

155-
let mut attributes = derive(batch_data, &mut provider).await?;
156-
let attribute = attributes.find(|a| a.payload_attributes.timestamp == 1696935384).unwrap();
172+
let attributes: Vec<_> = derive(batch_data, &mut provider).await?.collect().await;
173+
let attributes = attributes.into_iter().collect::<Result<Vec<_>, _>>()?;
174+
let attribute =
175+
attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap();
157176

158177
let expected = ScrollPayloadAttributes{
159178
payload_attributes: PayloadAttributes{
@@ -166,7 +185,7 @@ mod tests {
166185
transactions: Some(vec![bytes!("7ef901b7218302904094781e90f1c8fc4611c9b7497c3b47f99ef6969cbc80b901848ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf0000000000000000000000000000000000000000000000000006a94d74f430000000000000000000000000000000000000000000000000000000000000000002100000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000000000000000000000000000006a94d74f4300000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000947885bcbd5cecef1336b5300fb5186a12ddd8c478"), bytes!("7ef901b7228302904094781e90f1c8fc4611c9b7497c3b47f99ef6969cbc80b901848ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf000000000000000000000000000000000000000000000000000470de4df820000000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f00000000000000000000000000000000000000000000000000470de4df8200000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000947885bcbd5cecef1336b5300fb5186a12ddd8c478")]),
167186
no_tx_pool: true,
168187
};
169-
assert_eq!(attribute, expected);
188+
assert_eq!(attribute, &expected);
170189

171190
let attribute = attributes.last().unwrap();
172191
let expected = ScrollPayloadAttributes{
@@ -180,7 +199,7 @@ mod tests {
180199
transactions: Some(vec![bytes!("f88c8202658417d7840082a4f294530000000000000000000000000000000000000280a4bede39b500000000000000000000000000000000000000000000000000000001669aa2f583104ec4a07461e6555f927393ebdf5f183738450c3842bc3b86a1db7549d9bee21fadd0b1a06d7ba96897bd9fb8e838a327d3ca34be66da11955f10d1fb2264949071e9e8cd")]),
181200
no_tx_pool: true,
182201
};
183-
assert_eq!(attribute, expected);
202+
assert_eq!(attribute, &expected);
184203

185204
Ok(())
186205
}

0 commit comments

Comments
 (0)