Conversation
| // if the futures can still grow, handle the next batch. | ||
| if this.pipeline_futures.len() < MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS { | ||
| this.handle_next_batch(|queue, fut| queue.push_back(fut)); | ||
| } |
There was a problem hiding this comment.
this limitation mechanism could be removed: even we if have up to 1000 pending futures, which all fetch blobs, this would still represent only ~ 128 MB of data.
There was a problem hiding this comment.
In the future we should replace the constant with a value from config so we can modify it at runtime.
| Err((index, err)) => { | ||
| tracing::error!(target: "scroll::node::derivation_pipeline", ?index, ?err, "failed to derive payload attributes for batch"); | ||
| // retry polling the same batch index. | ||
| this.batch_index_queue.push_front(index); | ||
| this.handle_next_batch(|queue, fut| queue.push_front(fut)); | ||
| } |
There was a problem hiding this comment.
I implemented a sort of retry mechanism here, but maybe this is more suited to be moved to the various providers?
There was a problem hiding this comment.
Let's leave it as is for now and then replace with provider rety in the future. Lets open an issue to track it for milestone 5.
2719b60 to
e281745
Compare
e281745 to
5666c83
Compare
frisitano
left a comment
There was a problem hiding this comment.
Looks good, left a few small nits inline. Let me know what you think. CI checks failing.
| // if the futures can still grow, handle the next batch. | ||
| if this.pipeline_futures.len() < MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS { | ||
| this.handle_next_batch(|queue, fut| queue.push_back(fut)); | ||
| } |
There was a problem hiding this comment.
In the future we should replace the constant with a value from config so we can modify it at runtime.
| Err((index, err)) => { | ||
| tracing::error!(target: "scroll::node::derivation_pipeline", ?index, ?err, "failed to derive payload attributes for batch"); | ||
| // retry polling the same batch index. | ||
| this.batch_index_queue.push_front(index); | ||
| this.handle_next_batch(|queue, fut| queue.push_front(fut)); | ||
| } |
There was a problem hiding this comment.
Let's leave it as is for now and then replace with provider rety in the future. Lets open an issue to track it for milestone 5.
| /// Handles the next batch index in the batch index queue, pushing the future in the pipeline | ||
| /// futures. | ||
| fn handle_next_batch< | ||
| F: FnMut(&mut FuturesOrdered<DerivationPipelineFuture>, DerivationPipelineFuture), | ||
| >( | ||
| &mut self, | ||
| mut queue_fut: F, | ||
| ) { | ||
| let database = self.database.clone(); | ||
| let provider = self.l1_provider.clone(); | ||
|
|
||
| if let Some(index) = self.batch_index_queue.pop_front() { | ||
| let fut = Box::pin(async move { | ||
| let batch = database | ||
| .get_batch_by_index(index) | ||
| .await | ||
| .map_err(|err| (index, err.into()))? | ||
| .ok_or((index, DerivationPipelineError::UnknownBatch(index)))?; | ||
|
|
||
| derive(batch, provider).await.map_err(|err| (index, err)) | ||
| }); | ||
| queue_fut(&mut self.pipeline_futures, fut); | ||
| } | ||
| } |
There was a problem hiding this comment.
nit: I think that this function should return a future instead of executing queue_fut. When the caller receives the future they can add it to the back/front of the queue as required. I think queue_fut argument makes the code harder to parse. However, not strongly opinionated if you would prefer to keep as is.
| pub fn new( | ||
| network: NetworkManager, | ||
| engine: EngineDriver<EC, P>, | ||
| l1_provider: L1P, | ||
| l1_notification_rx: Option<Receiver<Arc<L1Notification>>>, | ||
| indexer: Indexer, | ||
| forkchoice_state: ForkchoiceState, | ||
| consensus: C, | ||
| new_block_rx: Option<UnboundedReceiver<NewBlockWithPeer>>, | ||
| ) -> Self { | ||
| let database = indexer.database(); | ||
| let derivation_pipeline = DerivationPipeline::new(l1_provider, database); |
There was a problem hiding this comment.
Should remove the indexer as argument and provide database: Database and we can then instantiate the indexer and pipeline here. What do you think?
| // if the log is missing the block timestamp, we need to fetch it. | ||
| // the block timestamp is necessary in order to derive the beacon | ||
| // slot and query the blobs. | ||
| let block_timestamp = if log_timestamp.is_none() { | ||
| self.execution_provider | ||
| .get_block(block_number.into()) | ||
| .await? | ||
| .map(|b| b.header.timestamp) | ||
| .ok_or(FilterLogError::MissingBlockTimestamp)? | ||
| } else { | ||
| log_timestamp.expect("checked for Some(...)") | ||
| }; |
There was a problem hiding this comment.
@frisitano wanted to have your opinion on this as well
There was a problem hiding this comment.
This looks good to me. I have a slight preference to use if let (so we avoid the expect) but am not strongly opinionated.
af0364a to
38ea6cf
Compare
Integrates the derivation pipeline in the RNM. Adds a
DerivationPipelinestateful structure, which implements theFuturetrait and yields aStream<Item = Result<ScrollPayloadAttributes, DerivationPipelineError>.