Skip to content

Commit 2fb3447

Browse files
committed
apollo_batcher: perform commitment tasks
1 parent fa9e1b7 commit 2fb3447

File tree

5 files changed

+100
-12
lines changed

5 files changed

+100
-12
lines changed

crates/apollo_batcher/src/batcher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,7 @@ impl Batcher {
11591159
let commitment_results = self.commitment_manager.get_commitment_results().await;
11601160
for commitment_task_output in commitment_results.into_iter() {
11611161
let height = commitment_task_output.height;
1162+
info!("Writing commitment results to storage for height {}.", height);
11621163

11631164
// Decide whether to finalize the block hash based on the config.
11641165
let should_finalize_block_hash =

crates/apollo_batcher/src/commitment_manager/commitment_manager_impl.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,12 +263,12 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {
263263
if height == BlockNumber::ZERO {
264264
parent_hash = Some(BlockHash::GENESIS_PARENT_HASH);
265265
}
266-
let parent_hash = parent_hash.ok_or(CommitmentManagerError::MissingBlockHash(
267-
height.prev().expect(
266+
let parent_hash = parent_hash.ok_or_else(|| {
267+
CommitmentManagerError::MissingBlockHash(height.prev().expect(
268268
"For the genesis block, the block hash is constant and should not be \
269269
fetched from storage.",
270-
),
271-
))?;
270+
))
271+
})?;
272272
let partial_block_hash_components = partial_block_hash_components
273273
.ok_or(CommitmentManagerError::MissingPartialBlockHashComponents(height))?;
274274
let block_hash =

crates/apollo_batcher/src/commitment_manager/state_committer.rs

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
#![allow(dead_code, unused_variables, unused_mut)]
22

3-
use apollo_committer_types::communication::SharedCommitterClient;
3+
use apollo_committer_types::committer_types::{CommitBlockRequest, RevertBlockRequest};
4+
use apollo_committer_types::communication::{CommitterRequest, SharedCommitterClient};
5+
use apollo_committer_types::errors::{CommitterClientError, CommitterClientResult};
6+
use tokio::sync::mpsc::error::TrySendError;
47
use tokio::sync::mpsc::{Receiver, Sender};
58
use tokio::task::JoinHandle;
9+
use tracing::{error, info};
610

7-
use crate::commitment_manager::types::{CommitterTaskInput, CommitterTaskOutput};
11+
use crate::commitment_manager::types::{
12+
CommitmentTaskOutput,
13+
CommitterTaskInput,
14+
CommitterTaskOutput,
15+
RevertTaskOutput,
16+
};
817

918
/// Commits state changes by calling the committer.
1019
pub(crate) trait StateCommitterTrait {
@@ -29,7 +38,7 @@ impl StateCommitterTrait for StateCommitter {
2938
committer_client: SharedCommitterClient,
3039
) -> Self {
3140
let handle = tokio::spawn(async move {
32-
Self::perform_commitment_tasks(tasks_receiver, results_sender, committer_client).await;
41+
Self::perform_tasks(tasks_receiver, results_sender, committer_client).await;
3342
});
3443
Self { task_performer_handle: handle }
3544
}
@@ -39,13 +48,84 @@ impl StateCommitterTrait for StateCommitter {
3948
}
4049

4150
impl StateCommitter {
42-
pub(crate) async fn perform_commitment_tasks(
51+
/// Performs the tasks in the channel. Retries at recoverable errors.
52+
pub(crate) async fn perform_tasks(
4353
mut tasks_receiver: Receiver<CommitterTaskInput>,
4454
mut results_sender: Sender<CommitterTaskOutput>,
4555
committer_client: SharedCommitterClient,
4656
) {
47-
// Placeholder: simply drain the receiver and do nothing.
48-
// TODO(Amos): Implement the actual commitment tasks logic.
49-
while let Some(_task) = tasks_receiver.recv().await {}
57+
// TODO(Yoav): Test this function.
58+
while let Some(CommitterTaskInput(request)) = tasks_receiver.recv().await {
59+
let output = perform_task(request, &committer_client).await;
60+
let height = output.height();
61+
match results_sender.try_send(output.clone()) {
62+
Ok(_) => {
63+
info!(
64+
"Successfully sent the committer result to the results channel: \
65+
{output:?}."
66+
);
67+
}
68+
Err(TrySendError::Full(_)) => {
69+
panic!("Results channel is full for height {height}.")
70+
}
71+
Err(err) => panic!("Failed to send results for height {height}. error: {err}"),
72+
}
73+
}
74+
}
75+
}
76+
77+
/// Performs a commitment task by calling the committer.
78+
/// Retries at recoverable errors.
79+
async fn perform_task(
80+
request: CommitterRequest,
81+
committer_client: &SharedCommitterClient,
82+
) -> CommitterTaskOutput {
83+
loop {
84+
let result = match &request {
85+
CommitterRequest::CommitBlock(commit_block_request) => {
86+
perform_commit_block_task(commit_block_request.clone(), committer_client).await
87+
}
88+
CommitterRequest::RevertBlock(revert_block_request) => {
89+
perform_revert_block_task(revert_block_request.clone(), committer_client).await
90+
}
91+
};
92+
match result {
93+
Ok(output) => return output,
94+
Err(err) => {
95+
log_error_and_maybe_panic(err).await;
96+
continue;
97+
}
98+
}
99+
}
100+
}
101+
102+
async fn perform_commit_block_task(
103+
commit_block_request: CommitBlockRequest,
104+
committer_client: &SharedCommitterClient,
105+
) -> CommitterClientResult<CommitterTaskOutput> {
106+
let height = commit_block_request.height;
107+
let response = committer_client.commit_block(commit_block_request).await?;
108+
Ok(CommitterTaskOutput::Commit(CommitmentTaskOutput { response, height }))
109+
}
110+
111+
async fn perform_revert_block_task(
112+
revert_block_request: RevertBlockRequest,
113+
committer_client: &SharedCommitterClient,
114+
) -> CommitterClientResult<CommitterTaskOutput> {
115+
let height = revert_block_request.height;
116+
let response = committer_client.revert_block(revert_block_request).await?;
117+
Ok(CommitterTaskOutput::Revert(RevertTaskOutput { response, height }))
118+
}
119+
120+
/// Panics on unrecoverable errors.
121+
async fn log_error_and_maybe_panic(error: CommitterClientError) {
122+
// TODO(Amos): Handle errors.
123+
match error {
124+
CommitterClientError::ClientError(client_error) => {
125+
error!("Committer client error: {client_error}. retrying...");
126+
}
127+
CommitterClientError::CommitterError(committer_error) => {
128+
panic!("Committer internal error: {committer_error}");
129+
}
50130
}
51131
}

crates/apollo_batcher/src/commitment_manager/types.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ impl CommitterTaskOutput {
3333
Self::Revert(_) => panic!("Got revert output: {self:?}"),
3434
}
3535
}
36+
37+
pub(crate) fn height(&self) -> BlockNumber {
38+
match self {
39+
Self::Commit(CommitmentTaskOutput { height, .. })
40+
| Self::Revert(RevertTaskOutput { height, .. }) => *height,
41+
}
42+
}
3643
}
3744

3845
pub(crate) struct FinalBlockCommitment {

crates/apollo_integration_tests/src/end_to_end_flow_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub async fn end_to_end_flow(args: EndToEndFlowArgs) {
8282
.install_recorder()
8383
.expect("Should be able to install global prometheus recorder");
8484

85-
const TEST_SCENARIO_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(50);
85+
const TEST_SCENARIO_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(100);
8686
// Setup.
8787
let mock_running_system = FlowTestSetup::new_from_tx_generator(
8888
&tx_generator,

0 commit comments

Comments
 (0)