@@ -180,7 +180,7 @@ impl L1Committer {
180180 Self :: get_checkpoint_from_path (
181181 genesis. clone ( ) ,
182182 blockchain. options . clone ( ) ,
183- & checkpoints_dir. join ( format ! ( "checkpoint_batch_{}" , last_committed_batch) ) ,
183+ & checkpoints_dir. join ( batch_checkpoint_name ( last_committed_batch) ) ,
184184 & rollup_store,
185185 )
186186 . await ?;
@@ -261,114 +261,14 @@ impl L1Committer {
261261 let batch = match self . rollup_store . get_batch ( batch_to_commit) . await ? {
262262 Some ( batch) => batch,
263263 None => {
264- let last_committed_blocks = self
265- . rollup_store
266- . get_block_numbers_by_batch ( last_committed_batch_number)
267- . await ?
268- . ok_or (
269- CommitterError :: RetrievalError ( format ! ( "Failed to get batch with batch number {last_committed_batch_number}. Batch is missing when it should be present. This is a bug" ) )
270- ) ?;
271- let last_block = last_committed_blocks
272- . last ( )
273- . ok_or (
274- CommitterError :: RetrievalError ( format ! ( "Last committed batch ({last_committed_batch_number}) doesn't have any blocks. This is probably a bug." ) )
275- ) ?;
276- let first_block_to_commit = last_block + 1 ;
277-
278- // We need to guarantee that the checkpoint path is new
279- // to avoid causing a lock error under rocksdb feature.
280- let rand_suffix: u32 = rand:: thread_rng ( ) . r#gen ( ) ;
281- let one_time_checkpoint_path = self
282- . checkpoints_dir
283- . join ( format ! ( "temp_checkpoint_{batch_to_commit}_{rand_suffix}" ) ) ;
284-
285- // For re-execution we need to use a checkpoint to the previous state
286- // (i.e. checkpoint of the state to the latest block from the previous
287- // batch, or the state of the genesis if this is the first batch).
288- // We already have this initial checkpoint as part of the L1Committer
289- // struct, but we need to create a one-time copy of it because
290- // we still need to use the current checkpoint store later for witness
291- // generation.
292-
293- let ( one_time_checkpoint_store, one_time_checkpoint_blockchain) = self
294- . create_checkpoint (
295- & self . current_checkpoint_store ,
296- & one_time_checkpoint_path,
297- & self . rollup_store ,
298- )
299- . await ?;
300-
301- // Try to prepare batch
302- let result = self
303- . prepare_batch_from_block (
304- * last_block,
305- batch_to_commit,
306- one_time_checkpoint_store,
307- one_time_checkpoint_blockchain,
308- )
309- . await ;
310-
311- if one_time_checkpoint_path. exists ( ) {
312- let _ = remove_dir_all ( & one_time_checkpoint_path) . inspect_err ( |e| {
313- error ! (
314- "Failed to remove one-time checkpoint directory at path {one_time_checkpoint_path:?}. Should be removed manually. Error: {}" , e. to_string( )
315- )
316- } ) ;
317- }
318-
319- let (
320- blobs_bundle,
321- new_state_root,
322- message_hashes,
323- privileged_transactions_hash,
324- last_block_of_batch,
325- ) = result?;
326-
327- if * last_block == last_block_of_batch {
328- debug ! ( "No new blocks to commit, skipping" ) ;
264+ let Some ( batch) = self . produce_batch ( batch_to_commit) . await ? else {
265+ // The batch is empty (there's no new blocks from last batch)
329266 return Ok ( ( ) ) ;
330- }
331-
332- let batch = Batch {
333- number : batch_to_commit,
334- first_block : first_block_to_commit,
335- last_block : last_block_of_batch,
336- state_root : new_state_root,
337- privileged_transactions_hash,
338- message_hashes,
339- blobs_bundle,
340- commit_tx : None ,
341- verify_tx : None ,
342267 } ;
343-
344- self . rollup_store . seal_batch ( batch. clone ( ) ) . await ?;
345-
346- debug ! (
347- first_block = batch. first_block,
348- last_block = batch. last_block,
349- "Batch {} stored in database" ,
350- batch. number
351- ) ;
352-
353268 batch
354269 }
355270 } ;
356271
357- info ! (
358- first_block = batch. first_block,
359- last_block = batch. last_block,
360- "Generating and storing witness for batch {}" ,
361- batch. number,
362- ) ;
363-
364- self . generate_and_store_batch_prover_input ( & batch) . await ?;
365-
366- // We need to update the current checkpoint after generating the witness
367- // with it, and before sending the commitment.
368- // The actual checkpoint store directory is not pruned until the batch
369- // it served in is verified on L1.
370- self . update_current_checkpoint ( & batch) . await ?;
371-
372272 info ! (
373273 first_block = batch. first_block,
374274 last_block = batch. last_block,
@@ -409,12 +309,116 @@ impl L1Committer {
409309 }
410310 }
411311
312+ async fn produce_batch ( & mut self , batch_number : u64 ) -> Result < Option < Batch > , CommitterError > {
313+ let last_committed_blocks = self
314+ . rollup_store
315+ . get_block_numbers_by_batch ( batch_number-1 )
316+ . await ?
317+ . ok_or (
318+ CommitterError :: RetrievalError ( format ! ( "Failed to get batch with batch number {}. Batch is missing when it should be present. This is a bug" , batch_number) )
319+ ) ?;
320+ let last_block = last_committed_blocks
321+ . last ( )
322+ . ok_or ( CommitterError :: RetrievalError ( format ! (
323+ "Last committed batch ({}) doesn't have any blocks. This is probably a bug." ,
324+ batch_number
325+ ) ) ) ?;
326+ let first_block_to_commit = last_block + 1 ;
327+
328+ // We need to guarantee that the checkpoint path is new
329+ // to avoid causing a lock error under rocksdb feature.
330+ let new_checkpoint_path = self
331+ . checkpoints_dir
332+ . join ( batch_checkpoint_name ( batch_number) ) ;
333+
334+ // For re-execution we need to use a checkpoint to the previous state
335+ // (i.e. checkpoint of the state to the latest block from the previous
336+ // batch, or the state of the genesis if this is the first batch).
337+ // We already have this initial checkpoint as part of the L1Committer
338+ // struct, but we need to create a one-time copy of it because
339+ // we still need to use the current checkpoint store later for witness
340+ // generation.
341+
342+ let ( new_checkpoint_store, new_checkpoint_blockchain) = self
343+ . create_checkpoint (
344+ & self . current_checkpoint_store ,
345+ & new_checkpoint_path,
346+ & self . rollup_store ,
347+ )
348+ . await ?;
349+
350+ // Try to prepare batch
351+ let result = self
352+ . prepare_batch_from_block (
353+ * last_block,
354+ batch_number,
355+ new_checkpoint_store. clone ( ) ,
356+ new_checkpoint_blockchain. clone ( ) ,
357+ )
358+ . await ;
359+
360+ let (
361+ blobs_bundle,
362+ new_state_root,
363+ message_hashes,
364+ privileged_transactions_hash,
365+ last_block_of_batch,
366+ ) = result?;
367+
368+ if * last_block == last_block_of_batch {
369+ debug ! ( "No new blocks to commit, skipping" ) ;
370+ return Ok ( None ) ;
371+ }
372+
373+ let batch = Batch {
374+ number : batch_number,
375+ first_block : first_block_to_commit,
376+ last_block : last_block_of_batch,
377+ state_root : new_state_root,
378+ privileged_transactions_hash,
379+ message_hashes,
380+ blobs_bundle,
381+ commit_tx : None ,
382+ verify_tx : None ,
383+ } ;
384+
385+ self . rollup_store . seal_batch ( batch. clone ( ) ) . await ?;
386+
387+ debug ! (
388+ first_block = batch. first_block,
389+ last_block = batch. last_block,
390+ "Batch {} stored in database" ,
391+ batch. number
392+ ) ;
393+
394+ info ! (
395+ first_block = batch. first_block,
396+ last_block = batch. last_block,
397+ "Generating and storing witness for batch {}" ,
398+ batch. number,
399+ ) ;
400+
401+ self . generate_and_store_batch_prover_input ( & batch) . await ?;
402+
403+ // We need to update the current checkpoint after generating the witness
404+ // with it, and before sending the commitment.
405+ // The actual checkpoint store directory is not pruned until the batch
406+ // it served in is verified on L1.
407+ // The reference to the previous checkpoint is lost after this operation,
408+ // but the directory is not deleted until the batch it serves in is verified
409+ // on L1.
410+ self . current_checkpoint_store = new_checkpoint_store;
411+ self . current_checkpoint_blockchain = new_checkpoint_blockchain;
412+
413+ Ok ( Some ( batch) )
414+ }
415+
412416 async fn prepare_batch_from_block (
413417 & mut self ,
414418 mut last_added_block_number : BlockNumber ,
415419 batch_number : u64 ,
416- one_time_checkpoint_store : Store ,
417- one_time_checkpoint_blockchain : Arc < Blockchain > ,
420+ checkpoint_store : Store ,
421+ checkpoint_blockchain : Arc < Blockchain > ,
418422 ) -> Result < ( BlobsBundle , H256 , Vec < H256 > , H256 , BlockNumber ) , CommitterError > {
419423 let first_block_of_batch = last_added_block_number + 1 ;
420424 let mut blobs_bundle = BlobsBundle :: default ( ) ;
@@ -426,6 +430,7 @@ impl L1Committer {
426430 let mut privileged_transactions_hashes = vec ! [ ] ;
427431 let mut new_state_root = H256 :: default ( ) ;
428432 let mut acc_gas_used = 0_u64 ;
433+ let mut blocks = vec ! [ ] ;
429434
430435 #[ cfg( feature = "metrics" ) ]
431436 let mut tx_count = 0_u64 ;
@@ -521,7 +526,7 @@ impl L1Committer {
521526 // Here we use the checkpoint store because we need the previous
522527 // state available (i.e. not pruned) for re-execution.
523528 let vm_db = StoreVmDatabase :: new (
524- one_time_checkpoint_store . clone ( ) ,
529+ checkpoint_store . clone ( ) ,
525530 potential_batch_block. header . parent_hash ,
526531 ) ;
527532
@@ -545,7 +550,7 @@ impl L1Committer {
545550 // account updates of each block as we go, to be able to continue
546551 // re-executing the next blocks in the batch.
547552 {
548- let account_updates_list = one_time_checkpoint_store
553+ let account_updates_list = checkpoint_store
549554 . apply_account_updates_batch (
550555 potential_batch_block. header . parent_hash ,
551556 & account_updates,
@@ -554,7 +559,7 @@ impl L1Committer {
554559 "no account updated" . to_owned ( ) ,
555560 ) ) ?;
556561
557- one_time_checkpoint_blockchain . store_block (
562+ checkpoint_blockchain . store_block (
558563 potential_batch_block. clone ( ) ,
559564 account_updates_list,
560565 BlockExecutionResult {
@@ -588,8 +593,7 @@ impl L1Committer {
588593
589594 // Again, here the VM database should be instantiated from the checkpoint
590595 // store to have access to the previous state
591- let parent_db =
592- StoreVmDatabase :: new ( one_time_checkpoint_store. clone ( ) , parent_block_hash) ;
596+ let parent_db = StoreVmDatabase :: new ( checkpoint_store. clone ( ) , parent_block_hash) ;
593597
594598 let acc_privileged_txs_len: u64 = acc_privileged_txs. len ( ) . try_into ( ) ?;
595599 if acc_privileged_txs_len > PRIVILEGED_TX_BUDGET {
@@ -645,7 +649,7 @@ impl L1Committer {
645649
646650 message_hashes. extend ( messages. iter ( ) . map ( get_l1_message_hash) ) ;
647651
648- new_state_root = one_time_checkpoint_store
652+ new_state_root = checkpoint_store
649653 . state_trie ( potential_batch_block. hash ( ) ) ?
650654 . ok_or ( CommitterError :: FailedToGetInformationFromStorage (
651655 "Failed to get state root from storage" . to_owned ( ) ,
@@ -654,6 +658,7 @@ impl L1Committer {
654658
655659 last_added_block_number += 1 ;
656660 acc_gas_used += current_block_gas_used;
661+ blocks. push ( ( last_added_block_number, potential_batch_block. hash ( ) ) ) ;
657662 } // end loop
658663
659664 metrics ! ( if let ( Ok ( privileged_transaction_count) , Ok ( messages_count) ) = (
@@ -687,6 +692,23 @@ impl L1Committer {
687692 let privileged_transactions_hash =
688693 compute_privileged_transactions_hash ( privileged_transactions_hashes) ?;
689694
695+ let last_block_hash = blocks
696+ . last ( )
697+ . ok_or ( CommitterError :: Unreachable (
698+ "There should always be blocks" . to_string ( ) ,
699+ ) ) ?
700+ . 1 ;
701+
702+ checkpoint_store
703+ . forkchoice_update (
704+ Some ( blocks) ,
705+ last_added_block_number,
706+ last_block_hash,
707+ None ,
708+ None ,
709+ )
710+ . await ?;
711+
690712 Ok ( (
691713 blobs_bundle,
692714 new_state_root,
@@ -794,41 +816,6 @@ impl L1Committer {
794816 Ok ( ( ) )
795817 }
796818
797- /// Updates the current checkpoint store and blockchain to the state at the
798- /// given latest batch.
799- ///
800- /// The reference to the previous checkpoint is lost after this operation,
801- /// but the directory is not deleted until the batch it serves in is verified
802- /// on L1.
803- async fn update_current_checkpoint (
804- & mut self ,
805- latest_batch : & Batch ,
806- ) -> Result < ( ) , CommitterError > {
807- let new_checkpoint_path = self
808- . checkpoints_dir
809- . join ( format ! ( "checkpoint_batch_{}" , latest_batch. number) ) ;
810-
811- // CAUTION
812- // We need to skip checkpoint creation if the directory already exists.
813- // Sometimes the commit_next_batch task is retried after a failure, and in
814- // that case we would try to create a checkpoint again at the same path,
815- // causing a lock error under rocksdb feature.
816- if new_checkpoint_path. exists ( ) {
817- // TODO: we should validate that the existing checkpoint is correct. otherwise we may want to update our `current_checkpoint_store` to point to the correct one.
818- return Ok ( ( ) ) ;
819- }
820-
821- let ( new_checkpoint_store, new_checkpoint_blockchain) = self
822- . create_checkpoint ( & self . store , & new_checkpoint_path, & self . rollup_store )
823- . await ?;
824-
825- self . current_checkpoint_store = new_checkpoint_store;
826-
827- self . current_checkpoint_blockchain = new_checkpoint_blockchain;
828-
829- Ok ( ( ) )
830- }
831-
832819 /// Creates a checkpoint of the given store at the specified path.
833820 ///
834821 /// This function performs the following steps:
@@ -1326,3 +1313,7 @@ pub async fn regenerate_head_state(
13261313
13271314 Ok ( ( ) )
13281315}
1316+
1317+ fn batch_checkpoint_name ( batch_number : u64 ) -> String {
1318+ format ! ( "checkpoint_batch_{batch_number}" )
1319+ }
0 commit comments