diff --git a/lgn-messages/src/types/mod.rs b/lgn-messages/src/types/mod.rs index 79d1627..15556df 100644 --- a/lgn-messages/src/types/mod.rs +++ b/lgn-messages/src/types/mod.rs @@ -16,8 +16,8 @@ use v1::query::tasks::ProofInputKind; use v1::query::tasks::QueryStep; use v1::query::tasks::RevelationInput; -use crate::routing::RoutingKey; use crate::KeyedPayload; +use crate::routing::RoutingKey; pub mod v1; @@ -35,6 +35,148 @@ pub enum ReplyType { V1Groth16(WorkerReply), } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(tag = "version")] +pub enum RequestVersioned { + /// Version 1 of the request. + #[serde(rename = "1")] + V1(V1), + + /// Variant for backwards compatibility. + /// + /// TODO: Remove this once phased out. + #[serde(untagged)] + Legacy(MessageEnvelope), +} + +impl RequestVersioned { + /// Returns a reference to the requested mp2 library version. + pub fn mp2_version(&self) -> &str { + match self { + RequestVersioned::V1(v1) => v1.mp2_version(), + RequestVersioned::Legacy(message_envelope) => message_envelope.mp2_version(), + } + } + + /// Returns a unique identifier for this task. + /// + /// Note: For legacy messages, the unique identifier is created by adjoining the task and query + /// ids. + pub fn id(&self) -> String { + match self { + RequestVersioned::V1(v1) => v1.id(), + RequestVersioned::Legacy(message_envelope) => message_envelope.id(), + } + } + + /// Returns a reference to this task's id. + pub fn task_id(&self) -> &str { + match self { + RequestVersioned::V1(v1) => v1.task_id(), + RequestVersioned::Legacy(message_envelope) => message_envelope.task_id(), + } + } + + /// Returns [ProverType] which supports proving this [TaskType]. + /// + /// This is used to dispatch the message to the correct underlying prover. + pub fn to_prover_type(&self) -> ProverType { + match self { + RequestVersioned::V1(v1) => v1.to_prover_type(), + RequestVersioned::Legacy(message_envelope) => message_envelope.to_prover_type(), + } + } + + /// Returns the task's type name. + /// + /// This is used to classify the tasks for metrics. + pub fn to_task_type(&self) -> &str { + match self { + RequestVersioned::V1(v1) => v1.to_task_type(), + RequestVersioned::Legacy(message_envelope) => message_envelope.to_task_type(), + } + } + + /// Returns the inner [TaskType]. + pub fn inner(&self) -> &TaskType { + match self { + RequestVersioned::V1(v1) => v1.inner(), + RequestVersioned::Legacy(message_envelope) => message_envelope.inner(), + } + } + + /// Consumes self and returns the inner [TaskType]. + pub fn into_inner(self) -> TaskType { + match self { + RequestVersioned::V1(v1) => v1.into_inner(), + RequestVersioned::Legacy(message_envelope) => message_envelope.into_inner(), + } + } + + /// Returns the `query_id` part of the message. + /// + /// Note: This is only available for legacy messages. + pub fn query_id(&self) -> &str { + match self { + RequestVersioned::V1(_v1) => "", + RequestVersioned::Legacy(message_envelope) => message_envelope.query_id(), + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct V1 { + /// Task id is unique for each task and helps to map replies to tasks. + pub task_id: String, + + /// Details of the task to be executed. + pub inner: TaskType, + + /// The proving system version. + pub mp2_version: String, +} + +impl V1 { + /// Returns a reference to the requested mp2 library version. + pub fn mp2_version(&self) -> &str { + &self.mp2_version + } + + /// Returns a copy of this task's id. + pub fn id(&self) -> String { + self.task_id.to_owned() + } + + /// Returns a reference to this task's id. + pub fn task_id(&self) -> &str { + &self.task_id + } + + /// Returns [ProverType] which supports proving this [TaskType]. + /// + /// This is used to dispatch the message to the correct underlying prover. + pub fn to_prover_type(&self) -> ProverType { + self.inner.to_prover_type() + } + + /// Returns the task's type name. + /// + /// This is used to classify the tasks for metrics. + pub fn to_task_type(&self) -> &str { + self.inner.to_task_type() + } + + /// Returns the inner [TaskType]. + pub fn inner(&self) -> &TaskType { + &self.inner + } + + /// Consumes self and returns the inner [TaskType]. + fn into_inner(self) -> TaskType { + self.inner + } +} + #[derive(Clone, Deserialize, Serialize)] pub struct MessageEnvelope { /// Query id is unique for each query and shared between all its tasks @@ -113,6 +255,7 @@ impl MessageEnvelope { format!("{}-{}", self.query_id, self.task_id) } + /// Returns the inner [TaskType]. pub fn inner(&self) -> &TaskType { &self.inner } @@ -128,6 +271,11 @@ impl MessageEnvelope { self.inner.to_prover_type() } + /// Returns a reference to the requested mp2 library version. + pub fn mp2_version(&self) -> &str { + &self.version + } + /// Returns the task's type name. /// /// This is used to classify the tasks for metrics. @@ -210,6 +358,12 @@ impl MessageEnvelope { }, TaskType::V1Groth16(..) => "groth16", } + self.inner.to_task_type() + } + + /// Consumes self and returns the inner [TaskType]. + fn into_inner(self) -> TaskType { + self.inner } } @@ -391,4 +545,87 @@ impl TaskType { TaskType::V1Groth16(_) => ProverType::V1Groth16, } } + + /// Returns the task's type name. + /// + /// This is used to classify the tasks for metrics. + fn to_task_type(&self) -> &str { + match self { + TaskType::V1Preprocessing(worker_task) => { + match &worker_task.task_type { + v1::preprocessing::WorkerTaskType::Extraction(extraction_type) => { + match extraction_type { + ExtractionType::MptExtraction(mpt) => { + match mpt.mpt_type { + MptType::MappingLeaf(..) => "mapping_leaf", + MptType::MappingBranch(..) => "mapping_branch", + MptType::VariableLeaf(..) => "multi_var_leaf", + MptType::VariableBranch(..) => "multi_var_branch", + } + }, + ExtractionType::LengthExtraction(..) => "length", + ExtractionType::ContractExtraction(..) => "contract", + ExtractionType::BlockExtraction(..) => "block", + ExtractionType::FinalExtraction(final_extraction) => { + match &**final_extraction { + FinalExtraction::Single(single_table_extraction) => { + match single_table_extraction.extraction_type { + FinalExtractionType::Simple(..) => "final_extraction", + FinalExtractionType::Lengthed => { + "final_extraction_lengthed" + }, + } + }, + FinalExtraction::Merge(..) => "final_extraction_merge", + } + }, + } + }, + v1::preprocessing::WorkerTaskType::Database(database_type) => { + match database_type { + DatabaseType::Cell(db_cell_type) => { + match db_cell_type { + DbCellType::Leaf(..) => "cell_leaf", + DbCellType::Partial(..) => "cell_partial", + DbCellType::Full(..) => "cell_full", + } + }, + DatabaseType::Row(db_row_type) => { + match db_row_type { + DbRowType::Leaf(..) => "row_leaf", + DbRowType::Partial(..) => "row_partial", + DbRowType::Full(..) => "row_full", + } + }, + DatabaseType::Index(..) => "index", + DatabaseType::IVC(..) => "ivc", + } + }, + } + }, + TaskType::V1Query(worker_task) => { + match &worker_task.task_type { + v1::query::WorkerTaskType::Query(query_input) => { + match &query_input.query_step { + QueryStep::Tabular(..) => "tabular", + QueryStep::Aggregation(aggregation_input) => { + match aggregation_input.input_kind { + ProofInputKind::RowsChunk(..) => "rows_chunk", + ProofInputKind::ChunkAggregation(..) => "chunk_aggregation", + ProofInputKind::NonExistence(..) => "non_existence", + } + }, + QueryStep::Revelation(revelation_input) => { + match revelation_input { + RevelationInput::Aggregated { .. } => "revelation_aggregated", + RevelationInput::Tabular { .. } => "revelation_tabular", + } + }, + } + }, + } + }, + TaskType::V1Groth16(..) => "groth16", + } + } } diff --git a/lgn-provers/src/provers/mod.rs b/lgn-provers/src/provers/mod.rs index 4b5d015..15462b0 100644 --- a/lgn-provers/src/provers/mod.rs +++ b/lgn-provers/src/provers/mod.rs @@ -1,5 +1,5 @@ -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; +use lgn_messages::types::RequestVersioned; pub mod v1; @@ -10,6 +10,6 @@ pub mod v1; pub trait LgnProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result; } diff --git a/lgn-provers/src/provers/v1/groth16/dummy_prover.rs b/lgn-provers/src/provers/v1/groth16/dummy_prover.rs index 61646db..059faf2 100644 --- a/lgn-provers/src/provers/v1/groth16/dummy_prover.rs +++ b/lgn-provers/src/provers/v1/groth16/dummy_prover.rs @@ -1,9 +1,9 @@ use anyhow::bail; use lgn_messages::types::v1::groth16::keys::ProofKey; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; @@ -18,17 +18,18 @@ pub struct Groth16DummyProver; impl LgnProver for Groth16DummyProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); + let query_id = envelope.query_id(); + let task_id = envelope.task_id(); if let TaskType::V1Groth16(task) = envelope.inner() { let key = ProofKey(query_id.to_string()).to_string(); let proof = dummy_proof(PROOF_SIZE); let reply = WorkerReply::new(task.chain_id, Some((key, proof)), ProofCategory::Querying); let reply_type = ReplyType::V1Groth16(reply); - let reply_envelope = MessageReplyEnvelope::new(query_id, task_id, reply_type); + let reply_envelope = + MessageReplyEnvelope::new(query_id.to_owned(), task_id.to_owned(), reply_type); Ok(reply_envelope) } else { bail!("Unexpected task: {:?}", envelope); diff --git a/lgn-provers/src/provers/v1/groth16/task.rs b/lgn-provers/src/provers/v1/groth16/task.rs index ee308cb..144812d 100644 --- a/lgn-provers/src/provers/v1/groth16/task.rs +++ b/lgn-provers/src/provers/v1/groth16/task.rs @@ -1,16 +1,11 @@ -use std::time::Instant; - use anyhow::bail; -use anyhow::Context; use lgn_messages::types::v1::groth16::keys::ProofKey; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; -use tracing::debug; -use tracing::info; use super::euclid_prover::Groth16EuclidProver; use crate::provers::LgnProver; @@ -18,51 +13,23 @@ use crate::provers::LgnProver; impl LgnProver for Groth16EuclidProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); + let query_id = envelope.query_id(); + let task_id = envelope.task_id(); + if let TaskType::V1Groth16(task) = envelope.inner() { - let proof = self.generate_proof( - &query_id, - &task_id, - task.revelation_proof.proof().as_slice(), - )?; - let reply = WorkerReply::new(task.chain_id, Some(proof), ProofCategory::Querying); + let key = ProofKey(query_id.to_string()).to_string(); + let proof = self.prove(task.revelation_proof.proof().as_slice())?; + + let reply = + WorkerReply::new(task.chain_id, Some((key, proof)), ProofCategory::Querying); let reply_type = ReplyType::V1Groth16(reply); - let reply_envelope = MessageReplyEnvelope::new(query_id, task_id, reply_type); + let reply_envelope = + MessageReplyEnvelope::new(query_id.to_owned(), task_id.to_owned(), reply_type); Ok(reply_envelope) } else { bail!("Unexpected task: {:?}", envelope); } } } - -impl Groth16EuclidProver { - /// Generate the Groth proof. - fn generate_proof( - &self, - query_id: &str, - task_id: &str, - revelation: &[u8], - ) -> anyhow::Result<(String, Vec)> { - // Generate the Groth16 proof. - let now = Instant::now(); - let key = ProofKey(query_id.to_string()).to_string(); - let proof = self.prove(revelation).with_context(|| { - format!( - " Failed to generate the Groth16 proof: query_id = {query_id}, task_id = {task_id}" - ) - })?; - debug!("Finish generating the Groth16 proof: query_id = {query_id}, task_id = {task_id}",); - - info!( - time = now.elapsed().as_secs_f32(), - proof_type = "groth16", - "proof generation time: {:?}", - now.elapsed() - ); - - Ok((key, proof)) - } -} diff --git a/lgn-provers/src/provers/v1/preprocessing/dummy_prover.rs b/lgn-provers/src/provers/v1/preprocessing/dummy_prover.rs index d4e2cdc..77e9aea 100644 --- a/lgn-provers/src/provers/v1/preprocessing/dummy_prover.rs +++ b/lgn-provers/src/provers/v1/preprocessing/dummy_prover.rs @@ -3,10 +3,10 @@ use lgn_messages::types::v1::preprocessing::db_keys; use lgn_messages::types::v1::preprocessing::ext_keys; use lgn_messages::types::v1::preprocessing::WorkerTask; use lgn_messages::types::v1::preprocessing::WorkerTaskType; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; @@ -21,30 +21,31 @@ pub struct PreprocessingDummyProver; impl LgnProver for PreprocessingDummyProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); - if let TaskType::V1Preprocessing(task @ WorkerTask { chain_id, .. }) = &envelope.inner { + let query_id = envelope.query_id().to_owned(); + let task_id = envelope.task_id().to_owned(); + if let TaskType::V1Preprocessing(task @ WorkerTask { chain_id, .. }) = envelope.into_inner() + { let key = match &task.task_type { WorkerTaskType::Extraction(_) => { - let key: ext_keys::ProofKey = task.into(); + let key: ext_keys::ProofKey = (&task).into(); key.to_string() }, WorkerTaskType::Database(_) => { - let key: db_keys::ProofKey = task.into(); + let key: db_keys::ProofKey = (&task).into(); key.to_string() }, }; let result = dummy_proof(PROOF_SIZE); let reply_type = ReplyType::V1Preprocessing(WorkerReply::new( - *chain_id, + chain_id, Some((key, result)), ProofCategory::Querying, )); Ok(MessageReplyEnvelope::new(query_id, task_id, reply_type)) } else { - bail!("Unexpected task: {:?}", envelope); + bail!("Unexpected task. task_id: {}", task_id); } } } diff --git a/lgn-provers/src/provers/v1/preprocessing/task.rs b/lgn-provers/src/provers/v1/preprocessing/task.rs index 55db74a..2374cdd 100644 --- a/lgn-provers/src/provers/v1/preprocessing/task.rs +++ b/lgn-provers/src/provers/v1/preprocessing/task.rs @@ -11,10 +11,10 @@ use lgn_messages::types::v1::preprocessing::ext_tasks::FinalExtractionType; use lgn_messages::types::v1::preprocessing::ext_tasks::MptType; use lgn_messages::types::v1::preprocessing::WorkerTask; use lgn_messages::types::v1::preprocessing::WorkerTaskType; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; use mp2_v1::api::TableRow; @@ -25,11 +25,13 @@ use crate::provers::LgnProver; impl LgnProver for PreprocessingEuclidProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); - if let TaskType::V1Preprocessing(task @ WorkerTask { chain_id, .. }) = envelope.inner { + let query_id = envelope.query_id().to_owned(); + let task_id = envelope.task_id().to_owned(); + + if let TaskType::V1Preprocessing(task @ WorkerTask { chain_id, .. }) = envelope.into_inner() + { let key = match &task.task_type { WorkerTaskType::Extraction(_) => { let key: ext_keys::ProofKey = (&task).into(); @@ -46,9 +48,13 @@ impl LgnProver for PreprocessingEuclidProver { Some((key, result)), ProofCategory::Querying, )); - Ok(MessageReplyEnvelope::new(query_id, task_id, reply_type)) + Ok(MessageReplyEnvelope::new( + query_id.to_owned(), + task_id.to_owned(), + reply_type, + )) } else { - bail!("Unexpected task: {:?}", envelope); + bail!("Unexpected task. task_id: {}", task_id); } } } diff --git a/lgn-provers/src/provers/v1/query/dummy_prover.rs b/lgn-provers/src/provers/v1/query/dummy_prover.rs index a1544b7..a54eec2 100644 --- a/lgn-provers/src/provers/v1/query/dummy_prover.rs +++ b/lgn-provers/src/provers/v1/query/dummy_prover.rs @@ -1,10 +1,10 @@ use anyhow::bail; use lgn_messages::types::v1::query::keys::ProofKey; use lgn_messages::types::v1::query::WorkerTask; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; @@ -19,16 +19,16 @@ pub struct QueryDummyProver; impl LgnProver for QueryDummyProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); + let query_id = envelope.query_id().to_owned(); + let task_id = envelope.task_id().to_owned(); - if let TaskType::V1Query(ref task @ WorkerTask { chain_id, .. }) = envelope.inner { + if let TaskType::V1Query(ref task @ WorkerTask { chain_id, .. }) = envelope.inner() { let key: ProofKey = task.into(); let result = dummy_proof(PROOF_SIZE); let reply_type = ReplyType::V1Query(WorkerReply::new( - chain_id, + *chain_id, Some((key.to_string(), result)), ProofCategory::Querying, )); diff --git a/lgn-provers/src/provers/v1/query/task.rs b/lgn-provers/src/provers/v1/query/task.rs index 7e9dc03..9ed56a5 100644 --- a/lgn-provers/src/provers/v1/query/task.rs +++ b/lgn-provers/src/provers/v1/query/task.rs @@ -7,10 +7,10 @@ use lgn_messages::types::v1::query::tasks::QueryStep; use lgn_messages::types::v1::query::tasks::RevelationInput; use lgn_messages::types::v1::query::WorkerTask; use lgn_messages::types::v1::query::WorkerTaskType; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProofCategory; use lgn_messages::types::ReplyType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskType; use lgn_messages::types::WorkerReply; use parsil::assembler::DynamicCircuitPis; @@ -21,20 +21,24 @@ use crate::provers::LgnProver; impl LgnProver for QueryEuclidProver { fn run( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let query_id = envelope.query_id.clone(); - let task_id = envelope.task_id.clone(); + let query_id = envelope.query_id(); + let task_id = envelope.task_id(); - if let TaskType::V1Query(ref task @ WorkerTask { chain_id, .. }) = envelope.inner { + if let TaskType::V1Query(ref task @ WorkerTask { chain_id, .. }) = envelope.inner() { let key: ProofKey = task.into(); let result = self.run_inner(task)?; let reply_type = ReplyType::V1Query(WorkerReply::new( - chain_id, + *chain_id, Some((key.to_string(), result)), ProofCategory::Querying, )); - Ok(MessageReplyEnvelope::new(query_id, task_id, reply_type)) + Ok(MessageReplyEnvelope::new( + query_id.to_owned(), + task_id.to_owned(), + reply_type, + )) } else { bail!("Unexpected task: {:?}", envelope); } diff --git a/lgn-worker/src/main.rs b/lgn-worker/src/main.rs index 00b769f..ec26d8b 100755 --- a/lgn-worker/src/main.rs +++ b/lgn-worker/src/main.rs @@ -11,14 +11,14 @@ use std::path::Path; use std::process::ExitCode; use std::result::Result::Ok; use std::str::FromStr; +use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use anyhow::bail; use anyhow::Context; +use anyhow::bail; use backtrace::Backtrace; use checksum::fetch_checksums; use clap::Parser; @@ -26,13 +26,14 @@ use ethers::signers::Wallet; use jwt::Claims; use jwt::RegisteredClaims; use k256::ecdsa::SigningKey; -use lagrange::worker_done::Reply; use lagrange::WorkerDone; use lagrange::WorkerToGwRequest; use lagrange::WorkerToGwResponse; +use lagrange::worker_done::Reply; use lgn_auth::jwt::JWTAuth; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; +use lgn_messages::types::ProverType; +use lgn_messages::types::RequestVersioned; use lgn_worker::avs::utils::read_keystore; use metrics::counter; use metrics::histogram; @@ -41,18 +42,18 @@ use semver::Version; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::StreamExt; -use tonic::metadata::MetadataValue; -use tonic::transport::ClientTlsConfig; use tonic::Request; use tonic::Streaming; +use tonic::metadata::MetadataValue; +use tonic::transport::ClientTlsConfig; +use tracing::Level; use tracing::debug; use tracing::error; use tracing::info; use tracing::level_filters::LevelFilter; use tracing::span; -use tracing::Level; -use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::EnvFilter; +use tracing_subscriber::fmt::format::FmtSpan; use warp::Filter; use crate::config::Config; @@ -554,19 +555,18 @@ async fn process_message_from_gateway( .map_err(|uuid| Error::UUIDInvalid { uuid })?; let uuid = uuid::Uuid::from_bytes_le(uuid); - let envelope = serde_json::from_slice::(&message.task) - .map_err(|err| Error::EnvelopeParseFailed { uuid, err, message })?; + let request = serde_json::from_slice::(&message.task) + .map_err(|err| Error::EnvelopeParseFailed { uuid, err })?; - let envelope_version = semver::Version::parse(&envelope.version) + let envelope_version = semver::Version::parse(request.mp2_version()) .map_err(|err| Error::EnvelopeInvalidMP2Version { uuid, err })?; + let task_id = request.id(); let span = span!( Level::INFO, "msg", %uuid, - task_id = envelope.task_id, - query_id = envelope.query_id, - db_id = ?envelope.db_task_id, + task_id, ); let _guard = span.enter(); @@ -578,14 +578,10 @@ async fn process_message_from_gateway( }); }; - let task_type = envelope.to_task_type().to_owned(); - let task_id = envelope.task_id().to_string(); - let query_id = envelope.query_id().to_string(); + let task_type = request.to_task_type().to_owned(); + let task_id = request.id().to_string(); - info!( - "Received Task. uuid: {} task_id: {} query_id: {}", - uuid, task_id, query_id - ); + info!("Received Task. uuid: {} task_id: {}", uuid, task_id); counter!( "zkmr_worker_tasks_received_total", @@ -599,7 +595,7 @@ async fn process_message_from_gateway( // gateway. std::panic::catch_unwind(|| { provers_manager - .delegate_proving(envelope) + .delegate_proving(request) .map_err(|err| Error::ProofFailed { uuid, err }) }) .map_err(|panic| { @@ -648,10 +644,9 @@ async fn process_message_from_gateway( .record(serialised.len() as f64); info!( - "Processed task. uuid: {} task_id: {} query_id: {} time: {:?}", + "Processed task. uuid: {} task_id: {} time: {:?}", uuid, task_id, - query_id, start_time.elapsed(), ); Ok(serialised) @@ -669,10 +664,9 @@ async fn process_message_from_gateway( .record(start_time.elapsed().as_secs_f64()); error!( - "Failed to process task. uuid: {} task_id: {} query_id: {} time: {:?} err: {:?}", + "Failed to process task. uuid: {} task_id: {} time: {:?} err: {:?}", uuid, task_id, - query_id, start_time.elapsed(), err, ); diff --git a/lgn-worker/src/manager.rs b/lgn-worker/src/manager.rs index 1647880..12babb1 100644 --- a/lgn-worker/src/manager.rs +++ b/lgn-worker/src/manager.rs @@ -4,9 +4,9 @@ use std::panic::UnwindSafe; use anyhow::bail; use anyhow::Context; -use lgn_messages::types::MessageEnvelope; use lgn_messages::types::MessageReplyEnvelope; use lgn_messages::types::ProverType; +use lgn_messages::types::RequestVersioned; use lgn_messages::types::TaskDifficulty; use lgn_provers::provers::LgnProver; use tracing::info; @@ -92,9 +92,9 @@ impl ProversManager { /// A message reply envelope containing the result of the proving task pub(crate) fn delegate_proving( &self, - envelope: MessageEnvelope, + envelope: RequestVersioned, ) -> anyhow::Result { - let prover_type: ProverType = envelope.inner.to_prover_type(); + let prover_type: ProverType = envelope.to_prover_type(); match self.provers.get(&prover_type) { Some(prover) => prover.run(envelope), diff --git a/lgn-worker/src/one-shot.rs b/lgn-worker/src/one-shot.rs index a428820..e3a8fdc 100644 --- a/lgn-worker/src/one-shot.rs +++ b/lgn-worker/src/one-shot.rs @@ -4,7 +4,7 @@ use anyhow::*; use checksum::fetch_checksums; use clap::Parser; -use lgn_messages::types::MessageEnvelope; +use lgn_messages::types::RequestVersioned; use manager::ProversManager; use tracing::error; use tracing::level_filters::LevelFilter; @@ -81,7 +81,7 @@ async fn main() -> Result<()> { let envelope = std::fs::read_to_string(&cli.input) .with_context(|| format!("failed to open `{}`", cli.input)) .and_then(|content| { - serde_json::from_str::(&content).context("failed to parse input JSON") + serde_json::from_str::(&content).context("failed to parse input JSON") })?; provers_manager