Skip to content

Commit 76c5c47

Browse files
authored
Merge pull request #3 from sigp/gzip-and-ssz-relay
Gzip and ssz relay
2 parents e1f82a6 + 6c57d0c commit 76c5c47

File tree

5 files changed

+155
-21
lines changed

5 files changed

+155
-21
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ eth2 = { git = "https://github.com/realbigsean/lighthouse.git", rev = "10ce60633
1717
ethereum_serde_utils = "0.5.2"
1818
ethereum_ssz = "0.5.4"
1919
ethereum_ssz_derive = "0.5.4"
20+
flate2 = "1.0.34"
2021
futures = "0.3.30"
2122
http = "1"
2223
reqwest = { version = "0.12.5", features = ["json"] }

relay-api-types/src/lib.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,56 @@ pub type GetValidatorsResponse = Vec<ValidatorsResponse>;
264264
pub type GetDeliveredPayloadsResponse = Vec<BidTraceV2>;
265265
pub type GetReceivedBidsResponse = Vec<BidTraceV2WithTimestamp>;
266266
pub type GetValidatorRegistrationResponse = SignedValidatorRegistrationData;
267+
268+
// Headers
269+
#[derive(Default)]
270+
pub enum ContentType {
271+
#[default]
272+
Json,
273+
Ssz,
274+
}
275+
276+
impl std::fmt::Display for ContentType {
277+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278+
match self {
279+
ContentType::Json => write!(f, "application/json"),
280+
ContentType::Ssz => write!(f, "application/octet-stream"),
281+
}
282+
}
283+
}
284+
285+
impl From<String> for ContentType {
286+
fn from(value: String) -> Self {
287+
match value.as_str() {
288+
"application/json" => ContentType::Json,
289+
"application/octet-stream" => ContentType::Ssz,
290+
_ => panic!("unknown content type: {}", value),
291+
}
292+
}
293+
}
294+
295+
#[derive(Default)]
296+
pub enum ContentEncoding {
297+
Gzip,
298+
#[default]
299+
None,
300+
}
301+
302+
impl std::fmt::Display for ContentEncoding {
303+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304+
match self {
305+
ContentEncoding::Gzip => write!(f, "gzip"),
306+
ContentEncoding::None => write!(f, ""),
307+
}
308+
}
309+
}
310+
311+
impl From<String> for ContentEncoding {
312+
fn from(value: String) -> Self {
313+
match value.as_ref() {
314+
"gzip" => ContentEncoding::Gzip,
315+
"" => ContentEncoding::None,
316+
_ => panic!("unknown content encoding: {}", value),
317+
}
318+
}
319+
}

relay-client/src/lib.rs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
use futures::{Stream, StreamExt};
2+
use http::header::InvalidHeaderValue;
3+
use http::header::CONTENT_ENCODING;
4+
use http::header::CONTENT_TYPE;
5+
use http::HeaderValue;
26
pub use relay_api_types::*;
37
use reqwest::Client;
48
use reqwest::Url;
@@ -13,6 +17,7 @@ pub enum Error {
1317
StatusCode(http::StatusCode),
1418
InvalidUrl(Url),
1519
WebSocket(tokio_tungstenite::tungstenite::Error),
20+
InvalidHeader(InvalidHeaderValue),
1621
}
1722

1823
impl From<reqwest::Error> for Error {
@@ -27,6 +32,12 @@ impl From<tokio_tungstenite::tungstenite::Error> for Error {
2732
}
2833
}
2934

35+
impl From<InvalidHeaderValue> for Error {
36+
fn from(value: InvalidHeaderValue) -> Self {
37+
Error::InvalidHeader(value)
38+
}
39+
}
40+
3041
#[derive(Clone)]
3142
pub struct RelayClient {
3243
client: Client,
@@ -45,6 +56,34 @@ impl RelayClient {
4556
where
4657
T: for<'de> Deserialize<'de>,
4758
{
59+
self.build_response_with_headers(response, <_>::default(), <_>::default())
60+
.await
61+
}
62+
63+
async fn build_response_with_headers<T>(
64+
&self,
65+
mut response: reqwest::Response,
66+
content_type: ContentType,
67+
content_encoding: ContentEncoding,
68+
) -> Result<T, Error>
69+
where
70+
T: for<'de> Deserialize<'de>,
71+
{
72+
response.headers_mut().insert(
73+
CONTENT_TYPE,
74+
HeaderValue::from_str(content_type.to_string().as_str())?,
75+
);
76+
77+
match content_encoding {
78+
ContentEncoding::Gzip => {
79+
response.headers_mut().insert(
80+
CONTENT_ENCODING,
81+
HeaderValue::from_str(content_encoding.to_string().as_str())?,
82+
);
83+
}
84+
ContentEncoding::None => {}
85+
}
86+
4887
let status = response.status();
4988
let text = response.text().await;
5089

@@ -62,6 +101,8 @@ impl RelayClient {
62101
&self,
63102
query_params: SubmitBlockQueryParams,
64103
body: SubmitBlockRequest<E>,
104+
content_type: ContentType,
105+
content_encoding: ContentEncoding,
65106
) -> Result<(), Error>
66107
where
67108
E: EthSpec,
@@ -78,7 +119,8 @@ impl RelayClient {
78119
.send()
79120
.await?;
80121

81-
self.build_response(response).await
122+
self.build_response_with_headers(response, content_type, content_encoding)
123+
.await
82124
}
83125

84126
pub async fn get_validators<E>(&self) -> Result<GetValidatorsResponse, Error>
@@ -149,6 +191,8 @@ impl RelayClient {
149191
&self,
150192
query_params: SubmitBlockQueryParams,
151193
body: SignedHeaderSubmission<E>,
194+
content_type: ContentType,
195+
content_encoding: ContentEncoding,
152196
) -> Result<(), Error>
153197
where
154198
E: EthSpec,
@@ -165,13 +209,16 @@ impl RelayClient {
165209
.send()
166210
.await?;
167211

168-
self.build_response(response).await
212+
self.build_response_with_headers(response, content_type, content_encoding)
213+
.await
169214
}
170215

171216
pub async fn submit_block_optimistic_v2<E>(
172217
&self,
173218
query_params: SubmitBlockQueryParams,
174219
body: SubmitBlockRequest<E>,
220+
content_type: ContentType,
221+
content_encoding: ContentEncoding,
175222
) -> Result<(), Error>
176223
where
177224
E: EthSpec,
@@ -188,17 +235,24 @@ impl RelayClient {
188235
.send()
189236
.await?;
190237

191-
self.build_response(response).await
238+
self.build_response_with_headers(response, content_type, content_encoding)
239+
.await
192240
}
193241

194-
pub async fn submit_cancellation(&self, body: SignedCancellation) -> Result<(), Error> {
242+
pub async fn submit_cancellation(
243+
&self,
244+
body: SignedCancellation,
245+
content_type: ContentType,
246+
content_encoding: ContentEncoding,
247+
) -> Result<(), Error> {
195248
let mut url = self.base_url.clone();
196249
url.path_segments_mut()
197250
.map_err(|_| Error::InvalidUrl(self.base_url.clone()))?
198251
.extend(&["relay", "v1", "builder", "cancel_bid"]);
199252
let response = self.client.post(url).json(&body).send().await?;
200253

201-
self.build_response(response).await
254+
self.build_response_with_headers(response, content_type, content_encoding)
255+
.await
202256
}
203257

204258
pub async fn subscribe_top_bids(

relay-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ async-trait.workspace = true
88
axum.workspace = true
99
bytes.workspace = true
1010
ethereum_ssz.workspace = true
11+
flate2.workspace = true
1112
futures.workspace = true
1213
http.workspace = true
1314
relay-api-types = { path = "../relay-api-types" }

relay-server/src/server.rs

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@ use axum::{
77
extract::{FromRequest, Query, Request, State},
88
response::{IntoResponse, Response},
99
routing::{get, post},
10-
Json, RequestExt, Router,
10+
Router,
1111
};
1212
use bytes::Bytes;
13+
use flate2::read::GzDecoder;
1314
use futures::{sink::SinkExt, stream::StreamExt};
15+
use http::header::CONTENT_ENCODING;
1416
use http::{header::CONTENT_TYPE, HeaderValue, StatusCode};
1517
use relay_api_types::{
16-
ErrorResponse, GetDeliveredPayloadsQueryParams, GetReceivedBidsQueryParams,
17-
GetValidatorRegistrationQueryParams, SignedCancellation, SignedHeaderSubmission,
18-
SubmitBlockQueryParams, SubmitBlockRequest,
18+
ContentEncoding, ContentType, ErrorResponse, GetDeliveredPayloadsQueryParams,
19+
GetReceivedBidsQueryParams, GetValidatorRegistrationQueryParams, SignedCancellation,
20+
SignedHeaderSubmission, SubmitBlockQueryParams, SubmitBlockRequest,
1921
};
2022
use serde::Serialize;
23+
use std::io::Read;
2124
use std::net::SocketAddr;
2225
use tracing::error;
2326
use types::eth_spec::EthSpec;
@@ -139,7 +142,7 @@ where
139142
async fn submit_block<I, A, E>(
140143
Query(query_params): Query<SubmitBlockQueryParams>,
141144
State(api_impl): State<I>,
142-
JsonOrSsz(body): JsonOrSsz<SubmitBlockRequest<E>>,
145+
JsonOrSszMaybeGzipped(body): JsonOrSszMaybeGzipped<SubmitBlockRequest<E>>,
143146
) -> Result<Response<Body>, StatusCode>
144147
where
145148
E: EthSpec,
@@ -155,7 +158,7 @@ where
155158
async fn submit_block_optimistic_v2<I, A, E>(
156159
Query(query_params): Query<SubmitBlockQueryParams>,
157160
State(api_impl): State<I>,
158-
JsonOrSsz(body): JsonOrSsz<SubmitBlockRequest<E>>,
161+
JsonOrSszMaybeGzipped(body): JsonOrSszMaybeGzipped<SubmitBlockRequest<E>>,
159162
) -> Result<Response<Body>, StatusCode>
160163
where
161164
E: EthSpec,
@@ -174,7 +177,7 @@ where
174177
async fn submit_header<I, A, E>(
175178
Query(query_params): Query<SubmitBlockQueryParams>,
176179
State(api_impl): State<I>,
177-
JsonOrSsz(body): JsonOrSsz<SignedHeaderSubmission<E>>,
180+
JsonOrSszMaybeGzipped(body): JsonOrSszMaybeGzipped<SignedHeaderSubmission<E>>,
178181
) -> Result<Response<Body>, StatusCode>
179182
where
180183
E: EthSpec,
@@ -189,7 +192,7 @@ where
189192
#[tracing::instrument(skip_all)]
190193
async fn submit_cancellation<I, A, E>(
191194
State(api_impl): State<I>,
192-
JsonOrSsz(body): JsonOrSsz<SignedCancellation>,
195+
JsonOrSszMaybeGzipped(body): JsonOrSszMaybeGzipped<SignedCancellation>,
193196
) -> Result<Response<Body>, StatusCode>
194197
where
195198
E: EthSpec,
@@ -357,28 +360,50 @@ where
357360

358361
#[must_use]
359362
#[derive(Debug, Clone, Copy, Default)]
360-
struct JsonOrSsz<T>(T);
363+
struct JsonOrSszMaybeGzipped<T>(T);
361364

362365
#[async_trait]
363-
impl<T, S> FromRequest<S> for JsonOrSsz<T>
366+
impl<T, S> FromRequest<S> for JsonOrSszMaybeGzipped<T>
364367
where
365368
T: serde::de::DeserializeOwned + ssz::Decode + 'static,
366369
S: Send + Sync,
367370
{
368371
type Rejection = Response;
369372

370373
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
371-
let content_type_header = req.headers().get(CONTENT_TYPE);
372-
let content_type = content_type_header.and_then(|value| value.to_str().ok());
374+
let headers = req.headers().clone();
375+
let content_type = headers
376+
.get(CONTENT_TYPE)
377+
.and_then(|value| value.to_str().ok());
378+
let content_encoding = headers
379+
.get(CONTENT_ENCODING)
380+
.and_then(|value| value.to_str().ok());
381+
382+
let bytes = Bytes::from_request(req, _state)
383+
.await
384+
.map_err(IntoResponse::into_response)?;
385+
386+
let decoded_bytes = if content_encoding == Some(&ContentEncoding::Gzip.to_string()) {
387+
let mut decoder = GzDecoder::new(&bytes[..]);
388+
let mut decoded = Vec::new();
389+
decoder
390+
.read_to_end(&mut decoded)
391+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
392+
decoded
393+
} else {
394+
bytes.to_vec()
395+
};
373396

374397
if let Some(content_type) = content_type {
375-
if content_type.starts_with("application/json") {
376-
let Json(payload) = req.extract().await.map_err(IntoResponse::into_response)?;
398+
if content_type.starts_with(&ContentType::Json.to_string()) {
399+
let payload: T = serde_json::from_slice(&decoded_bytes)
400+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
377401
return Ok(Self(payload));
378402
}
379403

380-
if content_type.starts_with("application/octet-stream") {
381-
let Ssz(payload) = req.extract().await.map_err(IntoResponse::into_response)?;
404+
if content_type.starts_with(&ContentType::Ssz.to_string()) {
405+
let payload = T::from_ssz_bytes(&decoded_bytes)
406+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
382407
return Ok(Self(payload));
383408
}
384409
}

0 commit comments

Comments
 (0)