Skip to content

Commit be58b49

Browse files
committed
apollo_batcher: perform commitment tasks
1 parent 904480a commit be58b49

File tree

2 files changed

+94
-7
lines changed

2 files changed

+94
-7
lines changed

crates/apollo_batcher/src/commitment_manager/state_committer.rs

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
#![allow(dead_code, unused_variables, unused_mut)]
22

3-
use apollo_committer_types::communication::SharedCommitterClient;
3+
use apollo_committer_types::committer_types::{CommitBlockRequest, RevertBlockRequest};
4+
use apollo_committer_types::communication::{CommitterRequest, SharedCommitterClient};
5+
use apollo_committer_types::errors::{CommitterClientError, CommitterClientResult};
6+
use tokio::sync::mpsc::error::TrySendError;
47
use tokio::sync::mpsc::{Receiver, Sender};
58
use tokio::task::JoinHandle;
9+
use tracing::{error, info};
610

7-
use crate::commitment_manager::types::{CommitterTaskInput, CommitterTaskOutput};
11+
use crate::commitment_manager::types::{
12+
CommitmentTaskOutput,
13+
CommitterTaskInput,
14+
CommitterTaskOutput,
15+
RevertTaskOutput,
16+
};
817

918
/// Commits state changes by calling the committer.
1019
pub(crate) trait StateCommitterTrait {
@@ -29,7 +38,7 @@ impl StateCommitterTrait for StateCommitter {
2938
committer_client: SharedCommitterClient,
3039
) -> Self {
3140
let handle = tokio::spawn(async move {
32-
Self::perform_commitment_tasks(tasks_receiver, results_sender, committer_client).await;
41+
Self::perform_tasks(tasks_receiver, results_sender, committer_client).await;
3342
});
3443
Self { task_performer_handle: handle }
3544
}
@@ -39,13 +48,84 @@ impl StateCommitterTrait for StateCommitter {
3948
}
4049

4150
impl StateCommitter {
42-
pub(crate) async fn perform_commitment_tasks(
51+
/// Performs the tasks in the channel. Retries at recoverable errors.
52+
pub(crate) async fn perform_tasks(
4353
mut tasks_receiver: Receiver<CommitterTaskInput>,
4454
mut results_sender: Sender<CommitterTaskOutput>,
4555
committer_client: SharedCommitterClient,
4656
) {
47-
// Placeholder: simply drain the receiver and do nothing.
48-
// TODO(Amos): Implement the actual commitment tasks logic.
49-
while let Some(_task) = tasks_receiver.recv().await {}
57+
// TODO(Yoav): Test this function.
58+
while let Some(CommitterTaskInput(request)) = tasks_receiver.recv().await {
59+
let output = perform_task(request, &committer_client).await;
60+
let height = output.height();
61+
match results_sender.try_send(output.clone()) {
62+
Ok(_) => {
63+
info!(
64+
"Successfully sent the committer result to the results channel: \
65+
{output:?}."
66+
);
67+
}
68+
Err(TrySendError::Full(_)) => {
69+
panic!("Results channel is full for height {height}.")
70+
}
71+
Err(err) => panic!("Failed to send results for height {height}. error: {err}"),
72+
}
73+
}
74+
}
75+
}
76+
77+
/// Performs a commitment task by calling the committer.
78+
/// Retries at recoverable errors.
79+
async fn perform_task(
80+
request: CommitterRequest,
81+
committer_client: &SharedCommitterClient,
82+
) -> CommitterTaskOutput {
83+
loop {
84+
let result = match &request {
85+
CommitterRequest::CommitBlock(commit_block_request) => {
86+
perform_commit_block_task(commit_block_request.clone(), committer_client).await
87+
}
88+
CommitterRequest::RevertBlock(revert_block_request) => {
89+
perform_revert_block_task(revert_block_request.clone(), committer_client).await
90+
}
91+
};
92+
match result {
93+
Ok(output) => return output,
94+
Err(err) => {
95+
log_error_and_maybe_panic(err).await;
96+
continue;
97+
}
98+
}
99+
}
100+
}
101+
102+
async fn perform_commit_block_task(
103+
commit_block_request: CommitBlockRequest,
104+
committer_client: &SharedCommitterClient,
105+
) -> CommitterClientResult<CommitterTaskOutput> {
106+
let height = commit_block_request.height;
107+
let response = committer_client.commit_block(commit_block_request).await?;
108+
Ok(CommitterTaskOutput::Commit(CommitmentTaskOutput { response, height }))
109+
}
110+
111+
async fn perform_revert_block_task(
112+
revert_block_request: RevertBlockRequest,
113+
committer_client: &SharedCommitterClient,
114+
) -> CommitterClientResult<CommitterTaskOutput> {
115+
let height = revert_block_request.height;
116+
let response = committer_client.revert_block(revert_block_request).await?;
117+
Ok(CommitterTaskOutput::Revert(RevertTaskOutput { response, height }))
118+
}
119+
120+
/// Panics on unrecoverable errors.
121+
async fn log_error_and_maybe_panic(error: CommitterClientError) {
122+
// TODO(Amos): Handle errors.
123+
match error {
124+
CommitterClientError::ClientError(client_error) => {
125+
error!("Committer client error: {client_error}. retrying...");
126+
}
127+
CommitterClientError::CommitterError(committer_error) => {
128+
panic!("Committer internal error: {committer_error}");
129+
}
50130
}
51131
}

crates/apollo_batcher/src/commitment_manager/types.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ impl CommitterTaskOutput {
3333
Self::Revert(_) => panic!("Got revert output: {self:?}"),
3434
}
3535
}
36+
37+
pub(crate) fn height(&self) -> BlockNumber {
38+
match self {
39+
Self::Commit(CommitmentTaskOutput { height, .. })
40+
| Self::Revert(RevertTaskOutput { height, .. }) => *height,
41+
}
42+
}
3643
}
3744

3845
pub(crate) struct FinalBlockCommitment {

0 commit comments

Comments
 (0)