Skip to content

Commit 3410592

Browse files
committed
updated
1 parent 2b48eea commit 3410592

37 files changed

+2087
-1671
lines changed

offchain/_aggregator/Cargo.toml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[package]
2+
name = "square-number-dss-aggregator"
3+
description = "Square Number DSS Aggregator"
4+
version = { workspace = true }
5+
authors = { workspace = true }
6+
repository = { workspace = true }
7+
edition = { workspace = true }
8+
9+
[dependencies]
10+
alloy = { version = "0.2.1", features = ["full"] }
11+
axum = { version = "0.7.5", features = ["macros"] }
12+
chrono = { version = "0.4", features = ["serde"] }
13+
dotenvy = "0.15.7"
14+
envy = "0.4.2"
15+
eyre = "0.6.12"
16+
karak-rs = "=0.1.1"
17+
serde = { version = "1.0.207", features = ["derive"] }
18+
serde_json = "1.0.124"
19+
thiserror = "1.0"
20+
tokio = { version = "1.39.2", features = ["full"] }
21+
tower = { version = "0.5.0", features = ["buffer", "limit", "util"] }
22+
tower-http = { version = "0.5.2", features = ["full"] }
23+
tower_governor = { version = "0.4.2" }
24+
tracing = "0.1.40"
25+
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
26+
url = { version = "2.5.2", features = ["serde"] }

offchain/_aggregator/Dockerfile

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
FROM rust:1.83-slim AS builder
2+
3+
WORKDIR /app
4+
5+
COPY . .
6+
7+
WORKDIR /app/aggregator
8+
9+
# Install necessary dependencies in a single step
10+
RUN apt-get update && \
11+
apt-get install -y --no-install-recommends \
12+
pkg-config \
13+
libssl-dev \
14+
ca-certificates \
15+
openssl \
16+
libterm-readline-perl-perl && \
17+
rm -rf /var/lib/apt/lists/*
18+
19+
RUN cargo build --release --bin square-number-dss-aggregator
20+
21+
# ---- Runtime stage ----
22+
FROM debian:bookworm-slim
23+
24+
WORKDIR /app
25+
26+
# Set environment to avoid interactive prompts
27+
ENV DEBIAN_FRONTEND=noninteractive
28+
29+
# Install necessary runtime dependencies
30+
RUN apt-get update && \
31+
apt-get install -y --no-install-recommends curl ca-certificates && \
32+
rm -rf /var/lib/apt/lists/*
33+
34+
COPY --from=builder /app/target/release/square-number-dss-aggregator /app/square-number-dss-aggregator
35+
COPY --from=builder /app/contracts /app/contracts
36+
37+
ENTRYPOINT ["/app/square-number-dss-aggregator"]

offchain/_aggregator/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Aggregator
2+
3+
The aggregator, functioning as an offline entity within the DSS, acts as a trusted central figure. Its primary role is to monitor the DSS contract for any new task requests. As soon as a task request is detected, the aggregator disseminates this request to all [Operators](../operators/README.md) registered in the DSS. After the operators finish executing the requests, the aggregator collects all responses from the operators and verifies their signatures to confirm that the responses are genuinely from the registered operators. Once verified, the aggregator calculates the median of all the received responses. The median is chosen to mitigate the impact of any outliers or erroneous calculations.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use alloy::primitives::Address;
2+
use axum::{extract::State, routing::post, Json, Router};
3+
use eyre::Result;
4+
use serde::{Deserialize, Serialize};
5+
use std::{
6+
collections::HashSet,
7+
sync::{Arc, RwLock},
8+
};
9+
use tracing::info;
10+
use url::Url;
11+
12+
use crate::error::AppError;
13+
14+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
15+
#[serde(rename_all = "camelCase")]
16+
pub struct Operator {
17+
public_key: Address,
18+
url: Url,
19+
}
20+
21+
impl Operator {
22+
pub fn new(public_key: Address, url: Url) -> Self {
23+
Self { public_key, url }
24+
}
25+
26+
pub fn url(&self) -> &Url {
27+
&self.url
28+
}
29+
30+
pub fn public_key(&self) -> &Address {
31+
&self.public_key
32+
}
33+
}
34+
35+
#[derive(Clone, Debug, Default)]
36+
pub struct OperatorState {
37+
pub operators: Arc<RwLock<HashSet<Operator>>>,
38+
}
39+
40+
impl OperatorState {
41+
pub fn new() -> Self {
42+
Self {
43+
operators: Arc::new(RwLock::new(HashSet::new())),
44+
}
45+
}
46+
47+
pub fn register_operator(&self, operator: Operator) -> Result<()> {
48+
49+
50+
info!("register_operator calledc : {:?}", operator);
51+
52+
let mut operators = self
53+
.operators
54+
.write()
55+
.map_err(|_| eyre::eyre!("Could not lock"))?;
56+
if operators.insert(operator.clone()) {
57+
info!("Operator registered: {}", serde_json::to_string(&operator)?);
58+
} else {
59+
info!(
60+
"Operator already registered: {}",
61+
serde_json::to_string(&operator)?
62+
);
63+
}
64+
Ok(())
65+
}
66+
67+
pub fn is_operator_registered(&self, operator: Operator) -> Result<bool> {
68+
Ok(self
69+
.operators
70+
.read()
71+
.map_err(|_| eyre::eyre!("Could not lock"))?
72+
.contains(&operator))
73+
}
74+
}
75+
76+
pub async fn register_operator(
77+
State(operators): State<Arc<OperatorState>>,
78+
Json(operator): Json<Operator>,
79+
) -> Result<Json<bool>, AppError> {
80+
81+
info!("register_operator {:?}", operators);
82+
83+
operators.register_operator(operator)?;
84+
Ok(Json(true))
85+
}
86+
87+
pub async fn is_operator_registered(
88+
State(operators): State<Arc<OperatorState>>,
89+
Json(operator): Json<Operator>,
90+
) -> Result<Json<bool>, AppError> {
91+
info!("is_operator_registered {:?}", operators);
92+
93+
let registered = operators.is_operator_registered(operator)?;
94+
info!("is_operator_registered_after {:?}", registered);
95+
96+
Ok(Json(registered))
97+
}
98+
99+
pub fn aggregator_router(operator_state: Arc<OperatorState>) -> Router {
100+
Router::new()
101+
.route("/registerOperator", post(register_operator))
102+
.route("/isOperatorRegistered", post(is_operator_registered))
103+
.with_state(operator_state)
104+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use alloy::network::{Ethereum, EthereumWallet};
2+
use alloy::primitives::Address;
3+
use alloy::providers::fillers::{FillProvider, JoinFill, RecommendedFiller, WalletFiller};
4+
use alloy::providers::{ProviderBuilder, ReqwestProvider};
5+
use alloy::sol;
6+
use alloy::transports::http::ReqwestTransport;
7+
use karak_rs::contracts::Core::CoreInstance;
8+
use url::Url;
9+
use SquareNumberDSS::{TaskRequest, TaskResponse};
10+
use TxnVerifier::{Task,OperatorResponse};
11+
use tracing::info;
12+
13+
use crate::Config;
14+
use crate::TaskError;
15+
16+
sol!(
17+
#[sol(rpc)]
18+
SquareNumberDSS,
19+
"../abi/SquareNumberDSS.json",
20+
);
21+
22+
23+
sol!(
24+
#[sol(rpc)]
25+
TxnVerifier,
26+
"../abi/TxnVerifier.json",
27+
);
28+
29+
sol!(
30+
#[sol(rpc)]
31+
#[allow(clippy::too_many_arguments)]
32+
VaultAbi,
33+
"../abi/Vault.json",
34+
);
35+
36+
type RecommendedProvider = FillProvider<
37+
JoinFill<RecommendedFiller, WalletFiller<EthereumWallet>>,
38+
ReqwestProvider,
39+
ReqwestTransport,
40+
Ethereum,
41+
>;
42+
43+
pub struct ContractManager {
44+
pub dss_instance:
45+
TxnVerifier::TxnVerifierInstance<ReqwestTransport, RecommendedProvider>,
46+
pub core_instance: CoreInstance<ReqwestTransport, RecommendedProvider>,
47+
pub provider: RecommendedProvider,
48+
}
49+
50+
impl ContractManager {
51+
pub fn new(config: &Config) -> Result<Self, TaskError> {
52+
let rpc_url = config
53+
.get_rpc_url()
54+
.map_err(|e| TaskError::CustomUrlError(e.to_string()))?;
55+
let private_key = config.get_private_key()?;
56+
57+
let provider = ProviderBuilder::new()
58+
.with_recommended_fillers()
59+
.wallet(EthereumWallet::from(private_key))
60+
.on_http(rpc_url);
61+
62+
let txn_verifier_address = config.txn_verifier_address;
63+
let dss_instance = TxnVerifier::new(txn_verifier_address, provider.clone());
64+
65+
let core_address = config.core_address;
66+
let core_instance = CoreInstance::new(core_address, provider.clone());
67+
68+
Ok(Self {
69+
dss_instance,
70+
core_instance,
71+
provider,
72+
})
73+
}
74+
75+
pub async fn fetch_vaults_staked_in_dss(
76+
&self,
77+
operator: Address,
78+
dss_address: Address,
79+
) -> Result<Vec<Address>, TaskError> {
80+
let result = self
81+
.core_instance
82+
.fetchVaultsStakedInDSS(operator, dss_address)
83+
.call()
84+
.await
85+
.map_err(|_| TaskError::ContractCallError)?;
86+
87+
Ok(result.vaults)
88+
}
89+
90+
pub async fn submit_task_response(
91+
&self,
92+
dss_task_request: Task,
93+
task_response: OperatorResponse,
94+
) -> Result<(), TaskError> {
95+
96+
// info!("submit_task_response {:?}",dss_task_request);
97+
info!("submit_task_responseafter {:?}",dss_task_request.transaction_hash);
98+
99+
100+
let _ = self
101+
.dss_instance
102+
.submitTaskResponse(dss_task_request.transaction_hash, task_response)
103+
.send()
104+
.await
105+
.map_err(|_| TaskError::ContractCallError)?;
106+
107+
Ok(())
108+
}
109+
}
110+
111+
pub struct VaultContract {
112+
pub vault_instance: VaultAbi::VaultAbiInstance<ReqwestTransport, RecommendedProvider>,
113+
pub provider: RecommendedProvider,
114+
}
115+
116+
impl VaultContract {
117+
pub fn new(
118+
rpc_url: Url,
119+
private_key: alloy::signers::local::PrivateKeySigner,
120+
vault_address: Address,
121+
) -> Result<Self, TaskError> {
122+
let provider = ProviderBuilder::new()
123+
.with_recommended_fillers()
124+
.wallet(EthereumWallet::from(private_key))
125+
.on_http(rpc_url);
126+
127+
let vault_instance = VaultAbi::new(vault_address, provider.clone());
128+
129+
Ok(Self {
130+
vault_instance,
131+
provider,
132+
})
133+
}
134+
}

offchain/_aggregator/src/error.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use axum::{
2+
http::StatusCode,
3+
response::{IntoResponse, Response},
4+
Json,
5+
};
6+
use serde_json::json;
7+
8+
pub struct AppError(eyre::Report);
9+
10+
impl IntoResponse for AppError {
11+
fn into_response(self) -> Response {
12+
tracing::error!(error = %self.0);
13+
let status = StatusCode::INTERNAL_SERVER_ERROR;
14+
(status, Json(json!({ "error": self.0.to_string() }))).into_response()
15+
}
16+
}
17+
18+
impl From<eyre::Report> for AppError {
19+
fn from(error: eyre::Report) -> Self {
20+
Self(error)
21+
}
22+
}
23+
24+
pub type Result<T> = std::result::Result<T, AppError>;

offchain/_aggregator/src/health.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use axum::{
2+
http::StatusCode,
3+
response::{IntoResponse, Response},
4+
Json,
5+
};
6+
use serde::{Deserialize, Serialize};
7+
8+
#[derive(Debug, Serialize, Deserialize)]
9+
pub enum Status {
10+
Ok,
11+
Warn,
12+
Fail,
13+
}
14+
15+
#[derive(Debug, Serialize, Deserialize)]
16+
pub struct HealthCheck {
17+
status: Status,
18+
}
19+
20+
impl IntoResponse for HealthCheck {
21+
fn into_response(self) -> Response {
22+
let code = match self.status {
23+
Status::Ok | Status::Warn => StatusCode::OK,
24+
Status::Fail => StatusCode::INTERNAL_SERVER_ERROR,
25+
};
26+
(code, Json(self)).into_response()
27+
}
28+
}
29+
30+
pub async fn health_check() -> HealthCheck {
31+
HealthCheck { status: Status::Ok }
32+
}

0 commit comments

Comments
 (0)