|
1 | 1 | #![allow(dead_code, unused_variables, unused_mut)] |
2 | 2 |
|
3 | | -use apollo_committer_types::communication::SharedCommitterClient; |
| 3 | +use apollo_committer_types::committer_types::{CommitBlockRequest, RevertBlockRequest}; |
| 4 | +use apollo_committer_types::communication::{CommitterRequest, SharedCommitterClient}; |
| 5 | +use apollo_committer_types::errors::{CommitterClientError, CommitterClientResult}; |
4 | 6 | use tokio::sync::mpsc::{Receiver, Sender}; |
5 | 7 | use tokio::task::JoinHandle; |
6 | 8 |
|
7 | | -use crate::commitment_manager::types::{CommitterTaskInput, CommitterTaskOutput}; |
| 9 | +use crate::commitment_manager::types::{ |
| 10 | + CommitmentTaskOutput, |
| 11 | + CommitterTaskInput, |
| 12 | + CommitterTaskOutput, |
| 13 | + RevertTaskOutput, |
| 14 | +}; |
8 | 15 |
|
9 | 16 | /// Commits state changes by calling the committer. |
10 | 17 | pub(crate) trait StateCommitterTrait { |
@@ -39,13 +46,65 @@ impl StateCommitterTrait for StateCommitter { |
39 | 46 | } |
40 | 47 |
|
41 | 48 | impl StateCommitter { |
| 49 | + /// Repeatedly performs any task in the channel. |
42 | 50 | pub(crate) async fn perform_commitment_tasks( |
43 | 51 | mut tasks_receiver: Receiver<CommitterTaskInput>, |
44 | 52 | mut results_sender: Sender<CommitterTaskOutput>, |
45 | 53 | committer_client: SharedCommitterClient, |
46 | 54 | ) { |
47 | | - // Placeholder: simply drain the receiver and do nothing. |
48 | | - // TODO(Amos): Implement the actual commitment tasks logic. |
49 | | - while let Some(_task) = tasks_receiver.recv().await {} |
| 55 | + while let Some(CommitterTaskInput(request)) = tasks_receiver.recv().await { |
| 56 | + let output = perform_commitment_task(request, &committer_client).await; |
| 57 | + // TODO(Yoav): wait for task channel by config. |
| 58 | + results_sender.send(output).await.unwrap(); |
| 59 | + } |
50 | 60 | } |
51 | 61 | } |
| 62 | + |
| 63 | +/// Performs a commitment task by calling the committer. |
| 64 | +/// Retries at recoverable errors. |
| 65 | +async fn perform_commitment_task( |
| 66 | + request: CommitterRequest, |
| 67 | + committer_client: &SharedCommitterClient, |
| 68 | +) -> CommitterTaskOutput { |
| 69 | + loop { |
| 70 | + let result = match &request { |
| 71 | + CommitterRequest::CommitBlock(commit_block_request) => { |
| 72 | + perform_commit_block_task(commit_block_request.clone(), committer_client).await |
| 73 | + } |
| 74 | + CommitterRequest::RevertBlock(revert_block_request) => { |
| 75 | + perform_revert_block_task(revert_block_request.clone(), committer_client).await |
| 76 | + } |
| 77 | + }; |
| 78 | + match result { |
| 79 | + Ok(output) => return output, |
| 80 | + Err(err) => { |
| 81 | + handle_task_error(err).await; |
| 82 | + continue; |
| 83 | + } |
| 84 | + } |
| 85 | + } |
| 86 | +} |
| 87 | + |
| 88 | +async fn perform_commit_block_task( |
| 89 | + commit_block_request: CommitBlockRequest, |
| 90 | + committer_client: &SharedCommitterClient, |
| 91 | +) -> CommitterClientResult<CommitterTaskOutput> { |
| 92 | + let height = commit_block_request.height; |
| 93 | + let response = committer_client.commit_block(commit_block_request).await?; |
| 94 | + Ok(CommitterTaskOutput::Commit(CommitmentTaskOutput { response, height })) |
| 95 | +} |
| 96 | + |
| 97 | +async fn perform_revert_block_task( |
| 98 | + revert_block_request: RevertBlockRequest, |
| 99 | + committer_client: &SharedCommitterClient, |
| 100 | +) -> CommitterClientResult<CommitterTaskOutput> { |
| 101 | + let height = revert_block_request.height; |
| 102 | + let response = committer_client.revert_block(revert_block_request).await?; |
| 103 | + Ok(CommitterTaskOutput::Revert(RevertTaskOutput { response, height })) |
| 104 | +} |
| 105 | + |
| 106 | +/// Panics on unrecoverable errors. |
| 107 | +async fn handle_task_error(error: CommitterClientError) { |
| 108 | + // TODO(Amos): Handle errors. |
| 109 | + unimplemented!(); |
| 110 | +} |
0 commit comments