diff --git a/Cargo.toml b/Cargo.toml index 8bc982d..89c31ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,21 +14,22 @@ members = [ [workspace.dependencies] async-trait = "0.1" -axum = { version = "0.7", features = ["ws"] } +axum = { version = "0.8", features = ["ws"] } bytes = "1.6" -eth2 = { git = "https://github.com/sigp/lighthouse.git", rev = "c33307d70287fd3b7a70785f89dadcb737214903" } +eth2 = { git = "https://github.com/sigp/lighthouse.git", rev = "3bc5f1f2a58b1df9454884672c8100fd5f79ba8b" } ethereum_serde_utils = "0.7" ethereum_ssz = "0.7" ethereum_ssz_derive = "0.7" flate2 = "1.0" futures = "0.3.30" -http = "1" -reqwest = { version = "0.12.5", features = ["json"] } +http = "1.2" +mediatype = "0.19.13" +reqwest = { version = "0.12.12", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } superstruct = "0.8" tokio = { version = "1", default-features = false, features = ["signal", "rt-multi-thread"] } tokio-tungstenite = "0.24.0" tracing = { version = "0.1", features = ["attributes"] } -types = { git = "https://github.com/sigp/lighthouse.git", rev = "c33307d70287fd3b7a70785f89dadcb737214903" } +types = { git = "https://github.com/sigp/lighthouse.git", rev = "3bc5f1f2a58b1df9454884672c8100fd5f79ba8b" } rand = "0.8" diff --git a/builder-client/Cargo.toml b/builder-client/Cargo.toml index 2968923..48f45a5 100644 --- a/builder-client/Cargo.toml +++ b/builder-client/Cargo.toml @@ -9,3 +9,5 @@ ethereum-apis-common = { path = "../common" } reqwest.workspace = true serde.workspace = true serde_json.workspace = true +ethereum_ssz.workspace = true +axum.workspace = true diff --git a/builder-client/src/lib.rs b/builder-client/src/lib.rs index 713520c..64b5e1f 100644 --- a/builder-client/src/lib.rs +++ b/builder-client/src/lib.rs @@ -1,14 +1,21 @@ -pub use builder_api_types::*; +use axum::http::HeaderMap; +use axum::http::HeaderValue; +use builder_api_types::*; pub use builder_bid::SignedBuilderBid; +use ethereum_apis_common::ContentType; pub use ethereum_apis_common::ErrorResponse; +use reqwest::header::{ACCEPT, CONTENT_TYPE}; use reqwest::Client; use reqwest::Url; use serde::de::DeserializeOwned; +use ssz::DecodeError; +use ssz::Encode; #[derive(Debug)] pub enum Error { Reqwest(reqwest::Error), InvalidJson(serde_json::Error, String), + InvalidSsz(DecodeError), ServerMessage(ErrorResponse), StatusCode(reqwest::StatusCode), InvalidUrl(Url), @@ -34,6 +41,34 @@ impl BuilderClient { } } + async fn build_response_with_headers( + &self, + response: reqwest::Response, + content_type: ContentType, + fork_name: ForkName, + ) -> Result + where + T: DeserializeOwned + ForkVersionDecode, + { + let status = response.status(); + let text = response.text().await?; + + if status.is_success() { + match content_type { + ContentType::Json => { + serde_json::from_str(&text).map_err(|e| Error::InvalidJson(e, text)) + } + ContentType::Ssz => { + T::from_ssz_bytes_by_fork(text.as_bytes(), fork_name).map_err(Error::InvalidSsz) + } + } + } else { + Err(Error::ServerMessage( + serde_json::from_str(&text).map_err(|e| Error::InvalidJson(e, text))?, + )) + } + } + async fn build_response(&self, response: reqwest::Response) -> Result where T: DeserializeOwned, @@ -67,15 +102,41 @@ impl BuilderClient { pub async fn submit_blinded_block( &self, block: &SignedBlindedBeaconBlock, + content_type: ContentType, + fork_name: ForkName, ) -> Result, Error> { let mut url = self.base_url.clone(); url.path_segments_mut() .map_err(|_| Error::InvalidUrl(self.base_url.clone()))? .extend(&["eth", "v1", "builder", "blinded_blocks"]); - let response = self.client.post(url).json(block).send().await?; - - self.build_response(response).await + let mut headers = HeaderMap::new(); + headers.insert( + CONTENT_TYPE, + HeaderValue::from_str(&content_type.to_string()).unwrap(), + ); + + let response = match content_type { + ContentType::Json => { + self.client + .post(url) + .headers(headers) + .json(block) + .send() + .await? + } + ContentType::Ssz => { + self.client + .post(url) + .headers(headers) + .body(block.as_ssz_bytes()) + .send() + .await? + } + }; + + self.build_response_with_headers(response, content_type, fork_name) + .await } pub async fn get_header( @@ -83,6 +144,8 @@ impl BuilderClient { slot: Slot, parent_hash: ExecutionBlockHash, pubkey: &PublicKeyBytes, + content_type: ContentType, + fork_name: ForkName, ) -> Result, Error> { let mut url = self.base_url.clone(); url.path_segments_mut() @@ -97,9 +160,16 @@ impl BuilderClient { &pubkey.to_string(), ]); - let response = self.client.get(url).send().await?; + let mut headers = HeaderMap::new(); + headers.insert( + ACCEPT, + HeaderValue::from_str(&content_type.to_string()).unwrap(), + ); - self.build_response(response).await + let response = self.client.get(url).headers(headers).send().await?; + + self.build_response_with_headers(response, content_type, fork_name) + .await } pub async fn get_status(&self) -> Result<(), Error> { diff --git a/builder-server/Cargo.toml b/builder-server/Cargo.toml index 32c2e50..0a47c18 100644 --- a/builder-server/Cargo.toml +++ b/builder-server/Cargo.toml @@ -17,3 +17,6 @@ serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing.workspace = true + +[dev-dependencies] +tower = "0.5.2" diff --git a/builder-server/src/server.rs b/builder-server/src/server.rs index 18b2772..c3913ba 100644 --- a/builder-server/src/server.rs +++ b/builder-server/src/server.rs @@ -1,17 +1,26 @@ +use std::str::FromStr; + use axum::{ body::Body, extract::{Path, State}, http::StatusCode, response::Response, routing::{get, post}, - Json, Router, + Router, }; use builder_api_types::{ - eth_spec::EthSpec, fork_versioned_response::EmptyMetadata, ExecutionBlockHash, - ForkVersionedResponse, FullPayloadContents, PublicKeyBytes, SignedBlindedBeaconBlock, - SignedValidatorRegistrationData, Slot, + EthSpec, ExecutionBlockHash, PublicKeyBytes, SignedBlindedBeaconBlock, + SignedValidatorRegistrationData, Slot, VariableList, +}; +use ethereum_apis_common::{ + build_response, build_response_with_headers, Accept, ContentType, JsonOrSsz, JsonOrSszWithFork, +}; +use http::{ + header::{ACCEPT, CONTENT_TYPE}, + HeaderMap, }; -use ethereum_apis_common::build_response; +pub type ValidatorRegistrations = + VariableList::ValidatorRegistryLimit>; use crate::builder::Builder; @@ -32,7 +41,7 @@ where ) .route("/eth/v1/builder/status", get(get_status)) .route( - "/eth/v1/builder/header/:slot/:parent_hash/:pubkey", + "/eth/v1/builder/header/{slot}/{parent_hash}/{pubkey}", get(get_header::), ) .with_state(api_impl) @@ -40,44 +49,41 @@ where async fn register_validators( State(api_impl): State, - Json(registrations): Json>, + JsonOrSsz(registrations): JsonOrSsz>, ) -> Result, StatusCode> where E: EthSpec, I: AsRef + Send + Sync, A: Builder, { - let res = api_impl.as_ref().register_validators(registrations).await; - build_response(res).await + let res = api_impl + .as_ref() + .register_validators(registrations.to_vec()) + .await; + build_response(res) } async fn submit_blinded_block( + headers: HeaderMap, State(api_impl): State, - Json(block): Json>, + JsonOrSszWithFork(block): JsonOrSszWithFork>, ) -> Result, StatusCode> where E: EthSpec, I: AsRef + Send + Sync, A: Builder, { - let res = api_impl - .as_ref() - .submit_blinded_block(block) - .await - .map(|payload| { - let fork_name = match &payload { - FullPayloadContents::Payload(payload) => payload.fork_name(), - FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => { - payload_and_blobs.execution_payload.fork_name() - } - }; - ForkVersionedResponse { - version: Some(fork_name), - metadata: EmptyMetadata {}, - data: payload, - } - }); - build_response(res).await + let content_type_header = headers.get(CONTENT_TYPE); + let content_type = content_type_header.and_then(|value| value.to_str().ok()); + let content_type = match content_type { + Some("application/octet-stream") => ContentType::Ssz, + _ => ContentType::Json, + }; + let slot = block.slot(); + + let res = api_impl.as_ref().submit_blinded_block(block).await; + + build_response_with_headers(res, content_type, api_impl.as_ref().fork_name_at_slot(slot)) } async fn get_status() -> StatusCode { @@ -85,6 +91,7 @@ async fn get_status() -> StatusCode { } async fn get_header( + headers: HeaderMap, State(api_impl): State, Path((slot, parent_hash, pubkey)): Path<(Slot, ExecutionBlockHash, PublicKeyBytes)>, ) -> Result, StatusCode> @@ -93,14 +100,302 @@ where I: AsRef + Send + Sync, A: Builder, { + let content_type_header = headers.get(ACCEPT); + let content_type_str = content_type_header + .and_then(|value| value.to_str().ok()) + .unwrap_or("application/json"); + let content_type = match Accept::from_str(content_type_str) { + Ok(Accept::Ssz) => ContentType::Ssz, + _ => ContentType::Json, + }; + let res = api_impl .as_ref() .get_header(slot, parent_hash, pubkey) - .await - .map(|signed_bid| ForkVersionedResponse { - version: Some(api_impl.as_ref().fork_name_at_slot(slot)), - metadata: EmptyMetadata {}, - data: signed_bid, + .await; + build_response_with_headers(res, content_type, api_impl.as_ref().fork_name_at_slot(slot)) +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use axum::{body::Body, http::Request}; + use builder_api_types::{ + builder_bid::{BuilderBid, BuilderBidDeneb, SignedBuilderBid}, + Address, BeaconBlock, BeaconBlockDeneb, Blob, BlobsBundle, EmptyBlock, ExecutionPayload, + ExecutionPayloadAndBlobs, ExecutionPayloadDeneb, ForkName, ForkVersionDecode, + ForkVersionedResponse, FullPayloadContents, KzgCommitment, KzgProof, MainnetEthSpec, + Signature, Uint256, ValidatorRegistrationData, + }; + use ethereum_apis_common::{ErrorResponse, CONSENSUS_VERSION_HEADER}; + use http::{HeaderValue, Response}; + use ssz::Encode; + use std::{marker::PhantomData, usize}; + use tower::ServiceExt; + + pub const PREFERENCE_ACCEPT_VALUE: &str = + "application/octet-stream;q=1.0,application/json;q=0.9"; + + #[derive(Clone)] + struct DummyBuilder { + _phantom: PhantomData, + } + + impl AsRef> for DummyBuilder { + fn as_ref(&self) -> &DummyBuilder { + self + } + } + + #[async_trait] + impl Builder for DummyBuilder { + fn fork_name_at_slot(&self, _slot: Slot) -> builder_api_types::ForkName { + ForkName::Deneb + } + + async fn get_header( + &self, + _slot: Slot, + _parent_hash: ExecutionBlockHash, + _pubkey: PublicKeyBytes, + ) -> Result, ErrorResponse> { + Ok(SignedBuilderBid { + message: BuilderBid::Deneb(BuilderBidDeneb { + value: Uint256::from(42), + pubkey: PublicKeyBytes::empty(), + blob_kzg_commitments: vec![KzgCommitment::empty_for_testing(); 5].into(), + header: Default::default(), + }), + signature: Signature::empty(), + }) + } + + async fn register_validators( + &self, + _registrations: Vec, + ) -> Result<(), ErrorResponse> { + Ok(()) + } + + async fn submit_blinded_block( + &self, + _block: SignedBlindedBeaconBlock, + ) -> Result, ErrorResponse> { + let payload_and_blobs: ExecutionPayloadAndBlobs = ExecutionPayloadAndBlobs { + blobs_bundle: BlobsBundle { + commitments: vec![KzgCommitment::empty_for_testing()].into(), + proofs: vec![KzgProof::empty()].into(), + blobs: vec![Blob::::new(vec![42; E::bytes_per_blob()]).unwrap()].into(), + }, + execution_payload: ExecutionPayload::Deneb(ExecutionPayloadDeneb::default()), + }; + let full_payload = FullPayloadContents::PayloadAndBlobs(payload_and_blobs); + Ok(full_payload) + } + } + + async fn send_request_and_assert_response( + request: http::request::Request, + expected_status: StatusCode, + check_headers: impl AsyncFn(Response), + ) { + let app = new(DummyBuilder:: { + _phantom: PhantomData, }); - build_response(res).await + + let response = app.oneshot(request).await.unwrap(); + // Assert status code + assert_eq!(response.status(), expected_status); + // Check headers + check_headers(response).await; + } + + #[tokio::test] + async fn test_registration() { + let dummy_registration: ValidatorRegistrations = VariableList::from(vec![ + SignedValidatorRegistrationData { + message: ValidatorRegistrationData { + fee_recipient: Address::random(), + gas_limit: 100000, + timestamp: 19939149139, + pubkey: PublicKeyBytes::empty(), + }, + signature: Signature::empty() + }; + 100 + ]); + + // Test ssz request + send_request_and_assert_response( + Request::builder() + .uri("/eth/v1/builder/validators") + .method("POST") + .header( + CONTENT_TYPE, + HeaderValue::from_static("application/octet-stream"), + ) + .body(Body::from(dummy_registration.as_ssz_bytes())) + .unwrap(), + StatusCode::OK, + async |_| {}, + ) + .await; + + // Test json request + send_request_and_assert_response( + Request::builder() + .uri("/eth/v1/builder/validators") + .method("POST") + .header(CONTENT_TYPE, HeaderValue::from_static("application/json")) + .body(Body::from(serde_json::to_vec(&dummy_registration).unwrap())) + .unwrap(), + StatusCode::OK, + async |_| {}, + ) + .await; + } + + #[tokio::test] + async fn test_get_header() { + // Test ssz request + send_request_and_assert_response( + Request::builder() + .uri(format!( + "/eth/v1/builder/header/{}/{}/{}", + Slot::new(42), + "0x379b447308533668e5323f45b7d5232259f508e8d61ff5b945c9b016792cd94c", + "0xafda62054797148859d1f277ad04e8129bc767c10dae0e2d116f03b87fe9c2a36093a93eab75b4b5bfd3fd0d48816396" + )) + .method("GET") + .header( + ACCEPT, + HeaderValue::from_static("application/octet-stream"), + ).body(Body::empty()) + .unwrap(), + StatusCode::OK, + async |response| { + let headers = response.headers(); + assert_eq!(headers.get(CONTENT_TYPE).unwrap(), HeaderValue::from_str(&ContentType::Ssz.to_string()).unwrap()); + assert_eq!(headers.get(ethereum_apis_common::CONSENSUS_VERSION_HEADER).unwrap(), HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap()); + } + ) + .await; + + // Test json request + send_request_and_assert_response( + Request::builder() + .uri(format!( + "/eth/v1/builder/header/{}/{}/{}", + Slot::new(42), + "0x379b447308533668e5323f45b7d5232259f508e8d61ff5b945c9b016792cd94c", + "0xafda62054797148859d1f277ad04e8129bc767c10dae0e2d116f03b87fe9c2a36093a93eab75b4b5bfd3fd0d48816396" + )) + .method("GET") + .header( + ACCEPT, + HeaderValue::from_static("application/json"), + ).body(Body::empty()) + .unwrap(), + StatusCode::OK, + async |response| { + let headers = response.headers(); + assert_eq!(headers.get(CONTENT_TYPE).unwrap(), HeaderValue::from_str(&ContentType::Json.to_string()).unwrap()); + assert_eq!(headers.get(ethereum_apis_common::CONSENSUS_VERSION_HEADER).unwrap(), HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap()); + } + ) + .await; + } + + #[tokio::test] + async fn test_submit_blinded_beacon_block() { + let spec = MainnetEthSpec::default_spec(); + let dummy_block = SignedBlindedBeaconBlock::::from_block( + BeaconBlock::Deneb(BeaconBlockDeneb::empty(&spec)), + Signature::empty(), + ); + // Test ssz request + send_request_and_assert_response( + Request::builder() + .uri("/eth/v1/builder/blinded_blocks") + .method("POST") + .header( + CONTENT_TYPE, + HeaderValue::from_static("application/octet-stream"), + ) + .header(ACCEPT, HeaderValue::from_static(&PREFERENCE_ACCEPT_VALUE)) + .header( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap(), + ) + .body(Body::from(dummy_block.as_ssz_bytes())) + .unwrap(), + StatusCode::OK, + async |response: Response| { + let headers = response.headers(); + assert_eq!( + headers.get(CONTENT_TYPE).unwrap(), + HeaderValue::from_str(&ContentType::Ssz.to_string()).unwrap() + ); + assert_eq!( + headers + .get(ethereum_apis_common::CONSENSUS_VERSION_HEADER) + .unwrap(), + HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap() + ); + + // Get response body as bytes + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("should get bytes response"); + assert!( + FullPayloadContents::::from_ssz_bytes_by_fork( + &body, + ForkName::Deneb + ) + .is_ok() + ); + }, + ) + .await; + + // Test json request + send_request_and_assert_response( + Request::builder() + .uri("/eth/v1/builder/blinded_blocks") + .method("POST") + .header(CONTENT_TYPE, HeaderValue::from_static("application/json")) + .header(ACCEPT, HeaderValue::from_static("application/json")) + .header( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap(), + ) + .body(Body::from(serde_json::to_vec(&dummy_block).unwrap())) + .unwrap(), + StatusCode::OK, + async |response: Response| { + let headers = response.headers(); + assert_eq!( + headers.get(CONTENT_TYPE).unwrap(), + HeaderValue::from_str(&ContentType::Json.to_string()).unwrap() + ); + assert_eq!( + headers + .get(ethereum_apis_common::CONSENSUS_VERSION_HEADER) + .unwrap(), + HeaderValue::from_str(&ForkName::Deneb.to_string()).unwrap() + ); + + // Get response body as bytes + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("should get bytes response"); + assert!(serde_json::from_slice::< + ForkVersionedResponse>, + >(&body) + .is_ok()); + }, + ) + .await; + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 9b7974e..2e81d0e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -11,8 +11,8 @@ ethereum_ssz.workspace = true flate2.workspace = true futures.workspace = true http.workspace = true +mediatype.workspace = true serde.workspace = true serde_json.workspace = true -tokio.workspace = true tracing.workspace = true beacon-api-types = { path = "../beacon-api-types" } diff --git a/common/src/lib.rs b/common/src/lib.rs index 2e693fb..4f6cd2f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,23 +1,104 @@ use axum::{ - async_trait, body::Body, extract::{FromRequest, Request}, response::{IntoResponse, Response}, }; -use beacon_api_types::ForkVersionDeserialize; +use beacon_api_types::{ + fork_versioned_response::EmptyMetadata, ForkName, ForkVersionDecode, ForkVersionDeserialize, + ForkVersionedResponse, +}; use bytes::Bytes; use flate2::read::GzDecoder; use http::header::CONTENT_ENCODING; use http::{header::CONTENT_TYPE, HeaderValue, StatusCode}; +use mediatype::{names, MediaType, MediaTypeList}; use serde::{Deserialize, Serialize}; -use std::io::Read; +use ssz::Encode; +use std::{fmt, io::Read, str::FromStr}; use tracing::error; -pub const CONSENSUS_VERSION_HEADER: &'static str = "Eth-Consensus-Version"; +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; -pub async fn build_response( +pub fn build_response_with_headers( result: Result, + content_type: ContentType, + fork_name: ForkName, ) -> Result, StatusCode> +where + T: Serialize + Encode + Send + 'static, +{ + let response_builder = Response::builder(); + + let resp = match result { + Ok(body) => { + let mut response = response_builder.status(200); + + if let Some(response_headers) = response.headers_mut() { + response_headers.insert( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&fork_name.to_string()).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ); + + response_headers.insert( + CONTENT_TYPE, + HeaderValue::from_str(&content_type.to_string()).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ); + } + let body_content = match content_type { + ContentType::Json => { + let body = ForkVersionedResponse { + version: Some(fork_name), + metadata: EmptyMetadata {}, + data: body, + }; + serde_json::to_vec(&body).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + })? + } + + ContentType::Ssz => T::as_ssz_bytes(&body), + }; + response.body(Body::from(body_content)).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + }) + } + Err(body) => { + let mut response = response_builder.status(body.code); + + if let Some(response_headers) = response.headers_mut() { + response_headers.insert( + CONTENT_TYPE, + HeaderValue::from_str(&content_type.to_string()).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ); + } + + let body_content = serde_json::to_vec(&body).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + response.body(Body::from(body_content)).map_err(|e| { + error!(error = ?e); + StatusCode::INTERNAL_SERVER_ERROR + }) + } + }; + + resp +} + +pub fn build_response(result: Result) -> Result, StatusCode> where T: Serialize + Send + 'static, { @@ -37,17 +118,10 @@ where ); } - let body_content = tokio::task::spawn_blocking(move || { - serde_json::to_vec(&body).map_err(|e| { - error!(error = ?e); - StatusCode::INTERNAL_SERVER_ERROR - }) - }) - .await - .map_err(|e| { + let body_content = serde_json::to_vec(&body).map_err(|e| { error!(error = ?e); StatusCode::INTERNAL_SERVER_ERROR - })??; + })?; response.body(Body::from(body_content)).map_err(|e| { error!(error = ?e); @@ -67,17 +141,10 @@ where ); } - let body_content = tokio::task::spawn_blocking(move || { - serde_json::to_vec(&body).map_err(|e| { - error!(error = ?e); - StatusCode::INTERNAL_SERVER_ERROR - }) - }) - .await - .map_err(|e| { + let body_content = serde_json::to_vec(&body).map_err(|e| { error!(error = ?e); StatusCode::INTERNAL_SERVER_ERROR - })??; + })?; response.body(Body::from(body_content)).map_err(|e| { error!(error = ?e); @@ -93,7 +160,6 @@ where #[derive(Debug, Clone, Copy, Default)] pub struct Ssz(pub T); -#[async_trait] impl FromRequest for Ssz where T: ssz::Decode, @@ -110,9 +176,51 @@ where let bytes = Bytes::from_request(req, state) .await .map_err(IntoResponse::into_response)?; - return Ok(T::from_ssz_bytes(&bytes) + return T::from_ssz_bytes(&bytes) .map(Ssz) - .map_err(|_| StatusCode::BAD_REQUEST.into_response())?); + .map_err(|_| StatusCode::BAD_REQUEST.into_response()); + } + } + + Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()) + } +} + +#[must_use] +#[derive(Debug, Clone, Copy, Default)] +pub struct JsonOrSszWithFork(pub T); + +impl FromRequest for JsonOrSszWithFork +where + T: serde::de::DeserializeOwned + ForkVersionDecode + 'static, + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, _state: &S) -> Result { + let headers = req.headers().clone(); + let content_type = headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()); + let fork_name = headers + .get(CONSENSUS_VERSION_HEADER) + .and_then(|value| ForkName::from_str(value.to_str().unwrap()).ok()); + + let bytes = Bytes::from_request(req, _state) + .await + .map_err(IntoResponse::into_response)?; + + if let Some(content_type) = content_type { + if content_type.starts_with(&ContentType::Json.to_string()) { + let payload: T = serde_json::from_slice(&bytes) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); + } + + if content_type.starts_with(&ContentType::Ssz.to_string()) { + let payload = T::from_ssz_bytes_by_fork(&bytes, fork_name.unwrap()) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); } } @@ -124,7 +232,6 @@ where #[derive(Debug, Clone, Copy, Default)] pub struct JsonOrSsz(pub T); -#[async_trait] impl FromRequest for JsonOrSsz where T: serde::de::DeserializeOwned + ssz::Decode + 'static, @@ -164,7 +271,6 @@ where #[derive(Debug, Clone, Copy, Default)] pub struct JsonOrSszMaybeGzipped(pub T); -#[async_trait] impl FromRequest for JsonOrSszMaybeGzipped where T: serde::de::DeserializeOwned + ssz::Decode + 'static, @@ -215,7 +321,7 @@ where } // Headers -#[derive(Default, Clone, Copy)] +#[derive(Default, Clone, Copy, Debug)] pub enum ContentType { #[default] Json, @@ -288,7 +394,6 @@ pub fn custom_internal_err(message: String) -> ErrorResponse { #[derive(Debug, Clone, Copy, Default)] pub struct JsonConsensusVersionHeader(pub T); -#[async_trait] impl FromRequest for JsonConsensusVersionHeader where T: ForkVersionDeserialize + 'static, @@ -317,3 +422,78 @@ where Ok(Self(result)) } } + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Accept { + Json, + Ssz, + Any, +} + +impl fmt::Display for Accept { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Accept::Ssz => write!(f, "application/octet-stream"), + Accept::Json => write!(f, "application/json"), + Accept::Any => write!(f, "*/*"), + } + } +} + +impl FromStr for Accept { + type Err = String; + + fn from_str(s: &str) -> Result { + let media_type_list = MediaTypeList::new(s); + + // [q-factor weighting]: https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2 + // find the highest q-factor supported accept type + let mut highest_q = 0_u16; + let mut accept_type = None; + + const APPLICATION: &str = names::APPLICATION.as_str(); + const OCTET_STREAM: &str = names::OCTET_STREAM.as_str(); + const JSON: &str = names::JSON.as_str(); + const STAR: &str = names::_STAR.as_str(); + const Q: &str = names::Q.as_str(); + + media_type_list.into_iter().for_each(|item| { + if let Ok(MediaType { + ty, + subty, + suffix: _, + params, + }) = item + { + let q_accept = match (ty.as_str(), subty.as_str()) { + (APPLICATION, OCTET_STREAM) => Some(Accept::Ssz), + (APPLICATION, JSON) => Some(Accept::Json), + (STAR, STAR) => Some(Accept::Any), + _ => None, + } + .map(|item_accept_type| { + let q_val = params + .iter() + .find_map(|(n, v)| match n.as_str() { + Q => { + Some((v.as_str().parse::().unwrap_or(0_f32) * 1000_f32) as u16) + } + _ => None, + }) + .or(Some(1000_u16)); + + (q_val.unwrap(), item_accept_type) + }); + + match q_accept { + Some((q, accept)) if q > highest_q => { + highest_q = q; + accept_type = Some(accept); + } + _ => (), + } + } + }); + accept_type.ok_or_else(|| "accept header is not supported".to_string()) + } +} diff --git a/relay-server/src/server.rs b/relay-server/src/server.rs index 189ab49..3f7f9d5 100644 --- a/relay-server/src/server.rs +++ b/relay-server/src/server.rs @@ -71,7 +71,7 @@ where A: Builder, { let result = api_impl.as_ref().submit_block(query_params, body).await; - build_response(result).await + build_response(result) } /// SubmitBlockOptimisticV2 - POST /relay/v1/builder/blocks_optimistic_v2 @@ -90,7 +90,7 @@ where .as_ref() .submit_block_optimistic_v2(query_params, body) .await; - build_response(result).await + build_response(result) } /// SubmitHeader - POST /relay/v1/builder/headers @@ -106,7 +106,7 @@ where A: Builder, { let result = api_impl.as_ref().submit_header(query_params, body).await; - build_response(result).await + build_response(result) } /// SubmitCancellation - POST /relay/v1/builder/cancel_bid @@ -121,7 +121,7 @@ where A: Builder, { let result = api_impl.as_ref().submit_cancellation(body).await; - build_response(result).await + build_response(result) } /// GetValidators - GET /relay/v1/builder/validators @@ -133,7 +133,7 @@ where E: EthSpec, { let result = api_impl.as_ref().get_validators().await; - build_response(result).await + build_response(result) } /// GetTopBids - GET /relay/v1/builder/top_bids @@ -173,7 +173,7 @@ where while let Some(update) = stream.next().await { match serde_json::to_string(&update) { Ok(json) => { - if let Err(e) = sender.send(Message::Text(json)).await { + if let Err(e) = sender.send(Message::text(json)).await { tracing::error!("Error sending message: {:?}", e); break; } @@ -189,9 +189,8 @@ where let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { - match message { - Message::Close(_) => break, - _ => {} + if let Message::Close(_) = message { + break; } } }); @@ -214,7 +213,7 @@ where A: Data, { let result = api_impl.as_ref().get_delivered_payloads(query_params).await; - build_response(result).await + build_response(result) } /// GetReceivedBids - GET /relay/v1/data/bidtraces/builder_blocks_received @@ -228,7 +227,7 @@ where A: Data, { let result = api_impl.as_ref().get_received_bids(query_params).await; - build_response(result).await + build_response(result) } /// GetValidatorRegistration - GET /relay/v1/data/validator_registration @@ -245,5 +244,5 @@ where .as_ref() .get_validator_registration(query_params) .await; - build_response(result).await + build_response(result) }