Skip to content

Commit bab93de

Browse files
committed
Merge remote-tracking branch 'origin/staging' into feat/integrate-agg-mode-with-db
2 parents dd500f5 + d7830a2 commit bab93de

File tree

11 files changed

+242
-74
lines changed

11 files changed

+242
-74
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,10 @@ proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids
306306
@cd aggregation_mode/proof_aggregator && ./scripts/build_programs.sh
307307

308308
agg_mode_docker_up:
309-
@cd aggregation_mode && docker-compose up -d
309+
@cd aggregation_mode && docker compose up -d
310310

311311
agg_mode_docker_down:
312-
@cd aggregation_mode && docker-compose down
312+
@cd aggregation_mode && docker compose down
313313

314314
agg_mode_docker_clean: agg_mode_docker_down
315315
docker volume rm aggregation-mode_postgres_data

aggregation_mode/batcher/abi/AggregationModePaymentService.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

aggregation_mode/batcher/src/db.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,23 @@ pub enum DbError {
1414
ConnectError(String),
1515
}
1616

17+
#[derive(Debug, Clone, sqlx::Type, serde::Serialize)]
18+
#[sqlx(type_name = "task_status")]
19+
#[sqlx(rename_all = "lowercase")]
20+
pub enum TaskStatus {
21+
Pending,
22+
Processing,
23+
Verified,
24+
}
25+
26+
#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)]
27+
pub struct Receipt {
28+
pub status: TaskStatus,
29+
pub merkle_path: Option<Vec<u8>>,
30+
pub nonce: i64,
31+
pub address: String,
32+
}
33+
1734
impl Db {
1835
pub async fn try_new(connection_url: &str) -> Result<Self, DbError> {
1936
let pool = PgPoolOptions::new()
@@ -45,29 +62,66 @@ impl Db {
4562
.map(|res| res.flatten())
4663
}
4764

65+
pub async fn get_tasks_by_address_and_nonce(
66+
&self,
67+
address: &str,
68+
nonce: i64,
69+
) -> Result<Vec<Receipt>, sqlx::Error> {
70+
sqlx::query_as::<_, Receipt>(
71+
"SELECT status,merkle_path,nonce,address FROM tasks
72+
WHERE address = $1
73+
AND nonce = $2
74+
ORDER BY nonce DESC",
75+
)
76+
.bind(address.to_lowercase())
77+
.bind(nonce)
78+
.fetch_all(&self.pool)
79+
.await
80+
}
81+
82+
pub async fn get_tasks_by_address_with_limit(
83+
&self,
84+
address: &str,
85+
limit: i64,
86+
) -> Result<Vec<Receipt>, sqlx::Error> {
87+
sqlx::query_as::<_, Receipt>(
88+
"SELECT status,merkle_path,nonce,address FROM tasks
89+
WHERE address = $1
90+
ORDER BY nonce DESC
91+
LIMIT $2",
92+
)
93+
.bind(address.to_lowercase())
94+
.bind(limit)
95+
.fetch_all(&self.pool)
96+
.await
97+
}
98+
4899
pub async fn insert_task(
49100
&self,
50101
address: &str,
51102
proving_system_id: i64,
52103
proof: &[u8],
53104
program_commitment: &[u8],
54105
merkle_path: Option<&[u8]>,
106+
nonce: i64,
55107
) -> Result<Uuid, sqlx::Error> {
56108
sqlx::query_scalar::<_, Uuid>(
57109
"INSERT INTO tasks (
58110
address,
59111
proving_system_id,
60112
proof,
61113
program_commitment,
62-
merkle_path
63-
) VALUES ($1, $2, $3, $4, $5)
114+
merkle_path,
115+
nonce
116+
) VALUES ($1, $2, $3, $4, $5, $6)
64117
RETURNING task_id",
65118
)
66119
.bind(address.to_lowercase())
67120
.bind(proving_system_id)
68121
.bind(proof)
69122
.bind(program_commitment)
70123
.bind(merkle_path)
124+
.bind(nonce)
71125
.fetch_one(&self.pool)
72126
.await
73127
}

aggregation_mode/batcher/src/server/http.rs

Lines changed: 79 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ use sqlx::types::BigDecimal;
1414

1515
use super::{
1616
helpers::format_merkle_path,
17-
types::{AppResponse, GetProofMerklePathQueryParams},
17+
types::{AppResponse, GetReceiptsQueryParams},
1818
};
1919

2020
use crate::{
2121
config::Config,
2222
db::Db,
23-
server::types::{SubmitProofRequestRisc0, SubmitProofRequestSP1},
23+
server::types::{GetReceiptsResponse, SubmitProofRequestRisc0, SubmitProofRequestSP1},
2424
verifiers::{verify_sp1_proof, VerificationError},
2525
};
2626

@@ -47,7 +47,7 @@ impl BatcherServer {
4747
// Note: this is temporary and should be lowered when we accept proofs via multipart form data instead of json
4848
.app_data(web::JsonConfig::default().limit(50 * 1024 * 1024)) // 50mb
4949
.route("/nonce/{address}", web::get().to(Self::get_nonce))
50-
.route("/proof/merkle", web::get().to(Self::get_proof_merkle_path))
50+
.route("/receipts", web::get().to(Self::get_receipts))
5151
.route("/proof/sp1", web::post().to(Self::post_proof_sp1))
5252
.route("/proof/risc0", web::post().to(Self::post_proof_risc0))
5353
})
@@ -58,20 +58,28 @@ impl BatcherServer {
5858
.expect("Server to never end");
5959
}
6060

61+
// Returns the nonce (number of submitted tasks) for a given address
6162
async fn get_nonce(req: HttpRequest) -> impl Responder {
62-
let Some(address) = req.match_info().get("address") else {
63+
let Some(address_raw) = req.match_info().get("address") else {
6364
return HttpResponse::BadRequest()
6465
.json(AppResponse::new_unsucessfull("Missing address", 400));
6566
};
6667

67-
// TODO: validate valid ethereum address
68+
// Check that the address is a valid ethereum address
69+
if alloy::primitives::Address::from_str(address_raw.trim()).is_err() {
70+
return HttpResponse::BadRequest()
71+
.json(AppResponse::new_unsucessfull("Invalid address", 400));
72+
}
73+
74+
let address = address_raw.to_lowercase();
75+
6876
let Some(state) = req.app_data::<Data<BatcherServer>>() else {
6977
return HttpResponse::InternalServerError()
7078
.json(AppResponse::new_unsucessfull("Internal server error", 500));
7179
};
7280

7381
let state = state.get_ref();
74-
match state.db.count_tasks_by_address(address).await {
82+
match state.db.count_tasks_by_address(&address).await {
7583
Ok(count) => HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!(
7684
{
7785
"nonce": count
@@ -82,6 +90,7 @@ impl BatcherServer {
8290
}
8391
}
8492

93+
// Posts an SP1 proof to the batcher, recovering the address from the signature
8594
async fn post_proof_sp1(
8695
req: HttpRequest,
8796
MultipartForm(data): MultipartForm<SubmitProofRequestSP1>,
@@ -174,6 +183,7 @@ impl BatcherServer {
174183
&proof_content,
175184
&vk_content,
176185
None,
186+
data.nonce.0 as i64,
177187
)
178188
.await
179189
{
@@ -186,65 +196,89 @@ impl BatcherServer {
186196
}
187197

188198
/// TODO: complete for risc0 (see `post_proof_sp1`)
199+
// Posts a Risc0 proof to the batcher, recovering the address from the signature
189200
async fn post_proof_risc0(
190201
_req: HttpRequest,
191202
MultipartForm(_): MultipartForm<SubmitProofRequestRisc0>,
192203
) -> impl Responder {
193204
HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({})))
194205
}
195206

196-
async fn get_proof_merkle_path(
207+
// Returns the last 100 receipt merkle proofs for the address received in the URL.
208+
// In case of also receiving a nonce on the query param, it returns only the merkle proof for that nonce.
209+
async fn get_receipts(
197210
req: HttpRequest,
198-
params: web::Query<GetProofMerklePathQueryParams>,
211+
params: web::Query<GetReceiptsQueryParams>,
199212
) -> impl Responder {
200213
let Some(state) = req.app_data::<Data<BatcherServer>>() else {
201-
return HttpResponse::InternalServerError()
202-
.json(AppResponse::new_unsucessfull("Internal server error", 500));
214+
return HttpResponse::InternalServerError().json(AppResponse::new_unsucessfull(
215+
"Internal server error: Failed to get app data",
216+
500,
217+
));
203218
};
204219

205220
let state = state.get_ref();
206221

207-
// TODO: maybe also accept proof commitment in query param
208-
let Some(id) = params.id.clone() else {
209-
return HttpResponse::BadRequest().json(AppResponse::new_unsucessfull(
210-
"Provide task `id` query param",
211-
400,
212-
));
213-
};
214-
215-
if id.is_empty() {
216-
return HttpResponse::BadRequest().json(AppResponse::new_unsucessfull(
217-
"Proof id cannot be empty",
218-
400,
219-
));
222+
if alloy::primitives::Address::from_str(params.address.clone().trim()).is_err() {
223+
return HttpResponse::BadRequest()
224+
.json(AppResponse::new_unsucessfull("Invalid address", 400));
220225
}
221226

222-
let Ok(proof_id) = sqlx::types::Uuid::parse_str(&id) else {
223-
return HttpResponse::BadRequest()
224-
.json(AppResponse::new_unsucessfull("Proof id invalid uuid", 400));
227+
let limit = match params.limit {
228+
Some(received_limit) => received_limit.min(100),
229+
None => 100,
225230
};
226231

227-
let db_result = state.db.get_merkle_path_by_task_id(proof_id).await;
228-
let merkle_path = match db_result {
229-
Ok(Some(merkle_path)) => merkle_path,
230-
Ok(None) => {
231-
return HttpResponse::NotFound().json(AppResponse::new_unsucessfull(
232-
"Proof merkle path not found",
233-
404,
234-
))
235-
}
236-
Err(_) => {
237-
return HttpResponse::InternalServerError()
238-
.json(AppResponse::new_unsucessfull("Internal server error", 500));
239-
}
232+
let address = params.address.to_lowercase();
233+
234+
let query = if let Some(nonce) = params.nonce {
235+
state
236+
.db
237+
.get_tasks_by_address_and_nonce(&address, nonce)
238+
.await
239+
} else {
240+
state
241+
.db
242+
.get_tasks_by_address_with_limit(&address, limit)
243+
.await
240244
};
241245

242-
match format_merkle_path(&merkle_path) {
243-
Ok(merkle_path) => {
244-
HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({
245-
"merkle_path": merkle_path
246-
})))
247-
}
246+
let Ok(receipts) = query else {
247+
return HttpResponse::InternalServerError().json(AppResponse::new_unsucessfull(
248+
"Internal server error: Failed to get tasks by address and nonce",
249+
500,
250+
));
251+
};
252+
253+
let responses: Result<Vec<GetReceiptsResponse>, String> = receipts
254+
.into_iter()
255+
.map(|receipt| {
256+
let Some(merkle_path) = receipt.merkle_path else {
257+
return Ok(GetReceiptsResponse {
258+
status: receipt.status,
259+
merkle_path: Vec::new(),
260+
nonce: receipt.nonce,
261+
address: receipt.address,
262+
});
263+
};
264+
265+
let Ok(formatted) = format_merkle_path(&merkle_path) else {
266+
return Err("Error formatting merkle path".into());
267+
};
268+
269+
Ok(GetReceiptsResponse {
270+
status: receipt.status,
271+
merkle_path: formatted,
272+
nonce: receipt.nonce,
273+
address: receipt.address,
274+
})
275+
})
276+
.collect();
277+
278+
match responses {
279+
Ok(resp) => HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({
280+
"receipts": resp
281+
}))),
248282
Err(_) => HttpResponse::InternalServerError()
249283
.json(AppResponse::new_unsucessfull("Internal server error", 500)),
250284
}

aggregation_mode/batcher/src/server/types.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use actix_multipart::form::{tempfile::TempFile, text::Text, MultipartForm};
22
use serde::{Deserialize, Serialize};
33
use serde_json::Value;
44

5+
use crate::db::TaskStatus;
6+
57
#[derive(Serialize, Deserialize)]
68
pub(super) struct AppResponse {
79
status: u16,
@@ -27,10 +29,14 @@ impl AppResponse {
2729
}
2830
}
2931

30-
/// Query parameters accepted by `GET /proof/merkle`, containing an optional proof id.
32+
/// Query parameters accepted by `GET /receipts`. Requires an address, and accepts a nonce
33+
/// and a limit for the amount of tasks included in the query (the maximum value is 100).
34+
/// Note: The limit value will only be taken into account if nonce is None.
3135
#[derive(Deserialize, Clone)]
32-
pub(super) struct GetProofMerklePathQueryParams {
33-
pub id: Option<String>,
36+
pub(super) struct GetReceiptsQueryParams {
37+
pub address: String,
38+
pub nonce: Option<i64>,
39+
pub limit: Option<i64>,
3440
}
3541

3642
#[derive(Debug, MultipartForm)]
@@ -48,3 +54,11 @@ pub(super) struct SubmitProofRequestRisc0 {
4854
pub _program_image_id_hex: Text<String>,
4955
pub _signature_hex: Text<String>,
5056
}
57+
58+
#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)]
59+
pub struct GetReceiptsResponse {
60+
pub status: TaskStatus,
61+
pub merkle_path: Vec<String>,
62+
pub nonce: i64,
63+
pub address: String,
64+
}

aggregation_mode/db/migrations/001_init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ CREATE TABLE tasks (
77
proof BYTEA,
88
program_commitment BYTEA,
99
merkle_path BYTEA,
10-
status task_status DEFAULT 'pending'
10+
status task_status DEFAULT 'pending',
11+
nonce BIGINT NOT NULL
1112
);
1213

1314
CREATE TABLE payment_events (

0 commit comments

Comments
 (0)