|
3 | 3 | use std::sync::Arc; |
4 | 4 |
|
5 | 5 | use apollo_batcher_config::config::{BatcherConfig, CommitmentManagerConfig}; |
6 | | -use apollo_committer_types::committer_types::{CommitBlockRequest, CommitBlockResponse}; |
| 6 | +use apollo_committer_types::committer_types::{ |
| 7 | + CommitBlockRequest, |
| 8 | + CommitBlockResponse, |
| 9 | + RevertBlockRequest, |
| 10 | +}; |
7 | 11 | use apollo_committer_types::communication::SharedCommitterClient; |
8 | 12 | use starknet_api::block::BlockNumber; |
9 | 13 | use starknet_api::block_hash::block_hash_calculator::{ |
@@ -73,57 +77,63 @@ impl<S: StateCommitterTrait> CommitmentManager<S> { |
73 | 77 | } |
74 | 78 |
|
75 | 79 | /// Adds a commitment task to the state committer. If the task height does not match the |
76 | | - /// task offset, an error is returned. If the tasks channel is full, the behavior depends on |
77 | | - /// the config: if `wait_for_tasks_channel` is true, it will wait until there is space in the |
78 | | - /// channel; otherwise, it will panic. Any other error when sending the task will also cause a |
79 | | - /// panic. |
| 80 | + /// task offset, an error is returned. |
80 | 81 | pub(crate) async fn add_commitment_task( |
81 | 82 | &mut self, |
82 | 83 | height: BlockNumber, |
83 | 84 | state_diff: ThinStateDiff, |
84 | 85 | state_diff_commitment: Option<StateDiffCommitment>, |
85 | 86 | ) -> CommitmentManagerResult<()> { |
86 | 87 | if height != self.commitment_task_offset { |
87 | | - return Err(CommitmentManagerError::WrongTaskHeight { |
| 88 | + return Err(CommitmentManagerError::WrongCommitmentTaskHeight { |
88 | 89 | expected: self.commitment_task_offset, |
89 | 90 | actual: height, |
90 | 91 | state_diff_commitment, |
91 | 92 | }); |
92 | 93 | } |
93 | | - let commitment_task_input = CommitterTaskInput::Commit(CommitBlockRequest { |
94 | | - height, |
95 | | - state_diff, |
96 | | - state_diff_commitment, |
97 | | - }); |
98 | | - let error_message = format!( |
99 | | - "Failed to send commitment task to state committer. Block: {height}, state diff \ |
100 | | - commitment: {state_diff_commitment:?}", |
101 | | - ); |
| 94 | + let commitment_task_input = |
| 95 | + CommitterTaskInput::Commit(CommitBlockRequest { |
| 96 | + height, |
| 97 | + state_diff, |
| 98 | + state_diff_commitment, |
| 99 | + }); |
| 100 | + let task_details = |
| 101 | + format!("Commit block {height}, state diff commitment: {state_diff_commitment:?}",); |
| 102 | + self.add_task(commitment_task_input, &task_details).await?; |
| 103 | + self.successfully_added_commitment_task(height, state_diff_commitment); |
| 104 | + Ok(()) |
| 105 | + } |
| 106 | + |
| 107 | + /// If the tasks channel is full, the behavior depends on the config: if |
| 108 | + /// `wait_for_tasks_channel` is true, it will wait until there is space in the channel; |
| 109 | + /// otherwise, it will panic. Any other error when sending the task will also cause a panic. |
| 110 | + async fn add_task( |
| 111 | + &self, |
| 112 | + task_input: CommitterTaskInput, |
| 113 | + task_details: &str, |
| 114 | + ) -> CommitmentManagerResult<()> { |
| 115 | + let error_message = format!("Failed to send task to state committer: {task_details}.",); |
102 | 116 |
|
103 | 117 | if self.config.wait_for_tasks_channel { |
104 | | - info!( |
105 | | - "Waiting to send commitment task for block {height} and state diff \ |
106 | | - {state_diff_commitment:?} to state committer." |
107 | | - ); |
108 | | - match self.tasks_sender.send(commitment_task_input).await { |
109 | | - Ok(_) => self.successfully_added_commitment_task(height, state_diff_commitment), |
110 | | - Err(err) => panic!("{error_message}. error: {err}"), |
111 | | - } |
| 118 | + info!("Waiting to send task for {task_details} to state committer."); |
| 119 | + self.tasks_sender |
| 120 | + .send(task_input) |
| 121 | + .await |
| 122 | + .unwrap_or_else(|err| panic!("{error_message}. error: {err}")); |
112 | 123 | } else { |
113 | | - match self.tasks_sender.try_send(commitment_task_input) { |
114 | | - Ok(_) => self.successfully_added_commitment_task(height, state_diff_commitment), |
| 124 | + match self.tasks_sender.try_send(task_input) { |
| 125 | + Ok(_) => (), |
115 | 126 | Err(TrySendError::Full(_)) => { |
116 | 127 | let channel_size = self.tasks_sender.max_capacity(); |
117 | 128 | panic!( |
118 | | - "Failed to send commitment task to state committer because the channel is \ |
119 | | - full. Block: {height}, state diff commitment: {state_diff_commitment:?}, \ |
120 | | - channel size: {channel_size}. Consider increasing the channel size or \ |
121 | | - enabling waiting in the config.", |
| 129 | + "{error_message}. The channel is full. channel size: {channel_size}. \ |
| 130 | + Consider increasing the channel size or enabling waiting in the config.", |
122 | 131 | ); |
123 | 132 | } |
124 | 133 | Err(err) => panic!("{error_message}. error: {err}"), |
125 | 134 | } |
126 | 135 | } |
| 136 | + Ok(()) |
127 | 137 | } |
128 | 138 |
|
129 | 139 | /// Fetches all ready commitment results from the state committer. Panics if any task is a |
@@ -168,13 +178,12 @@ impl<S: StateCommitterTrait> CommitmentManager<S> { |
168 | 178 | &mut self, |
169 | 179 | height: BlockNumber, |
170 | 180 | state_diff_commitment: Option<StateDiffCommitment>, |
171 | | - ) -> CommitmentManagerResult<()> { |
| 181 | + ) { |
172 | 182 | info!( |
173 | 183 | "Sent commitment task for block {height} and state diff {state_diff_commitment:?} to \ |
174 | 184 | state committer." |
175 | 185 | ); |
176 | 186 | self.increase_commitment_task_offset(); |
177 | | - Ok(()) |
178 | 187 | } |
179 | 188 |
|
180 | 189 | /// Initializes the CommitmentManager. This includes starting the state committer task. |
@@ -256,8 +265,28 @@ impl<S: StateCommitterTrait> CommitmentManager<S> { |
256 | 265 |
|
257 | 266 | // Associated functions. |
258 | 267 |
|
259 | | - pub(crate) async fn revert_block(height: BlockNumber, reversed_state_diff: ThinStateDiff) { |
260 | | - unimplemented!() |
| 268 | + pub(crate) async fn add_revert_task( |
| 269 | + &mut self, |
| 270 | + height: BlockNumber, |
| 271 | + reversed_state_diff: ThinStateDiff, |
| 272 | + ) -> CommitmentManagerResult<()> { |
| 273 | + let expected_height = |
| 274 | + self.commitment_task_offset.prev().expect("Can't revert before the genesis block."); |
| 275 | + if height != expected_height { |
| 276 | + return Err(CommitmentManagerError::WrongRevertTaskHeight { |
| 277 | + expected: expected_height, |
| 278 | + actual: height, |
| 279 | + }); |
| 280 | + } |
| 281 | + let revert_task_input = |
| 282 | + CommitterTaskInput::Revert(RevertBlockRequest { |
| 283 | + height, |
| 284 | + reversed_state_diff, |
| 285 | + }); |
| 286 | + let task_details = format!("Revert block {height}",); |
| 287 | + self.add_task(revert_task_input, &task_details).await?; |
| 288 | + info!("Sent revert task for block {height}."); |
| 289 | + Ok(()) |
261 | 290 | } |
262 | 291 |
|
263 | 292 | /// Returns the final commitment output for a given commitment task output. |
|
0 commit comments