Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/coordinator_handler/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub struct GetTaskRequest {
pub prover_height: Option<u64>,
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct GetTaskResponseData {
pub uuid: String,
pub task_id: String,
Expand Down
8 changes: 6 additions & 2 deletions src/prover/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ where
.map(|i| {
let keys_dir = PathBuf::from(&self.cfg.keys_dir);
if !keys_dir.exists() {
std::fs::create_dir_all(&keys_dir)
.map_err(|e| anyhow::anyhow!("failed to create keys directory {}: {e}", keys_dir.display()))?;
std::fs::create_dir_all(&keys_dir).map_err(|e| {
anyhow::anyhow!(
"failed to create keys directory {}: {e}",
keys_dir.display()
)
})?;
}
let key_path = keys_dir.join(i.to_string());
KeySigner::new(&key_path)
Expand Down
42 changes: 38 additions & 4 deletions src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use ethers_providers::Middleware;
use proving_service::{ProveRequest, QueryTaskRequest, TaskStatus};
use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;
use tokio::time::{sleep, Duration};
use tokio::{sync::RwLock, task::JoinSet};
use tracing::{error, info, instrument};
Expand Down Expand Up @@ -60,6 +61,7 @@ where
provers.spawn(async move {
self_clone.working_loop(i).await;
});
thread::sleep(Duration::from_secs(3)); // Sleep for 3 seconds to avoid overwhelming the l2geth/coordinator with requests.
}

tokio::select! {
Expand Down Expand Up @@ -99,7 +101,9 @@ where
let task_id = coordinator_task.clone().task_id;
log::debug!("got previous task from db, task_id: {task_id}");
if self.proving_service.read().await.is_local() {
let proving_task = self.request_proving(&coordinator_task).await?;
let proving_task = self
.request_proving(coordinator_client, &coordinator_task)
.await?;
proving_task_id = proving_task.task_id
}
return self
Expand All @@ -108,7 +112,9 @@ where
}

let coordinator_task = self.get_coordinator_task(coordinator_client).await?;
let proving_task = self.request_proving(&coordinator_task).await?;
let proving_task = self
.request_proving(coordinator_client, &coordinator_task)
.await?;
self.handle_proving_progress(coordinator_client, &coordinator_task, proving_task.task_id)
.await
}
Expand All @@ -135,17 +141,45 @@ where

async fn request_proving(
&self,
coordinator_client: &CoordinatorClient,
coordinator_task: &GetTaskResponseData,
) -> anyhow::Result<proving_service::ProveResponse> {
let proving_input = self.build_proving_input(coordinator_task).await?;
let proving_input = match self.build_proving_input(coordinator_task).await {
Ok(result) => result,
Err(error) => {
self.submit_proof(
coordinator_client,
coordinator_task,
proving_service::QueryTaskResponse::default(),
ProofStatus::Error,
Some(format!("failed to build proving input: error {:?}", error)),
)
.await?;
anyhow::bail!(
"Failed to build proving input. task_type: {:?}, coordinator_task_uuid: {:?}, coordinator_task_id: {:?}, err: {:?}",
coordinator_task.task_type,
coordinator_task.uuid,
coordinator_task.task_id,
error,
);
}
};

let proving_task = self
.proving_service
.write()
.await
.prove(proving_input)
.await;

if let Some(error) = proving_task.error {
self.submit_proof(
coordinator_client,
coordinator_task,
proving_service::QueryTaskResponse::default(),
ProofStatus::Error,
Some(format!("failed to request proving: error {:?}", error)),
)
.await?;
anyhow::bail!(
"Failed to request proving_service to prove. task_type: {:?}, coordinator_task_uuid: {:?}, coordinator_task_id: {:?}, err: {:?}",
coordinator_task.task_type,
Expand Down