Skip to content

Commit 7f71cc2

Browse files
authored
feat: add support for Beacon API Event Stream (#1848)
1 parent 9f32985 commit 7f71cc2

File tree

6 files changed

+198
-33
lines changed

6 files changed

+198
-33
lines changed

Cargo.lock

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ rust-version = "1.87.0"
3838
version = "0.3.2"
3939

4040
[workspace.dependencies]
41-
alloy = { version = "1.0", default-features = false, features = ["std", "serde", "getrandom"] }
41+
alloy = { version = "1.0", default-features = false, features = ["std", "serde", "getrandom", "rpc-types-beacon"] }
4242
alloy-chains = "0.2"
4343
alloy-hardforks = "0.2.0"
4444
alloy-rlp = { version = "0.3.8", default-features = false, features = ["derive"] }
@@ -56,6 +56,7 @@ ethereum_hashing = "0.7.0"
5656
ethereum_serde_utils = "0.8"
5757
ethereum_ssz = "0.9"
5858
ethereum_ssz_derive = "0.9"
59+
eventsource-client = "0.15.0"
5960
futures = "0.3.23"
6061
futures-util = "0.3.23"
6162
hex = "0.4.3"

crates/ethereum-rpc-client/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ anyhow.workspace = true
1818
ethereum_ssz.workspace = true
1919
ethereum_ssz_derive.workspace = true
2020
ethportal-api.workspace = true
21+
eventsource-client.workspace = true
22+
futures.workspace = true
2123
reqwest.workspace = true
2224
reqwest-middleware = { version = "0.4", features = ["json"] }
2325
reqwest-retry = "0.7"
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use alloy::rpc::types::beacon::events::{
2+
BeaconNodeEventTopic, ChainReorgEvent, FinalizedCheckpointEvent, HeadEvent,
3+
LightClientOptimisticUpdateEvent,
4+
};
5+
use eventsource_client::Event;
6+
use serde::de::{DeserializeOwned, Error};
7+
8+
pub enum BeaconEvent {
9+
ChainReorg(ChainReorgEvent),
10+
Head(HeadEvent),
11+
LightClientOptimisticUpdate(LightClientOptimisticUpdateEvent),
12+
FinalizedCheckpoint(FinalizedCheckpointEvent),
13+
}
14+
15+
impl BeaconEvent {
16+
fn from_json<T: DeserializeOwned>(
17+
json: &str,
18+
constructor: impl FnOnce(T) -> Self,
19+
) -> Result<Self, serde_json::Error> {
20+
serde_json::from_str(json).map(constructor)
21+
}
22+
}
23+
24+
impl TryFrom<Event> for BeaconEvent {
25+
type Error = serde_json::Error;
26+
27+
fn try_from(event: Event) -> Result<Self, Self::Error> {
28+
if event.event_type == BeaconNodeEventTopic::ChainReorg.query_value() {
29+
Self::from_json(&event.data, Self::ChainReorg)
30+
} else if event.event_type == BeaconNodeEventTopic::Head.query_value() {
31+
Self::from_json(&event.data, Self::Head)
32+
} else if event.event_type
33+
== BeaconNodeEventTopic::LightClientOptimisticUpdate.query_value()
34+
{
35+
Self::from_json(&event.data, Self::LightClientOptimisticUpdate)
36+
} else if event.event_type == BeaconNodeEventTopic::FinalizedCheckpoint.query_value() {
37+
Self::from_json(&event.data, Self::FinalizedCheckpoint)
38+
} else {
39+
Err(Self::Error::custom(format!(
40+
"Can't create BeaconEvent: unexpected event type: {}",
41+
event.event_type,
42+
)))
43+
}
44+
}
45+
}

crates/ethereum-rpc-client/src/consensus/mod.rs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
pub mod constants;
2+
pub mod event;
23
pub mod rpc_types;
34

4-
use std::{fmt::Display, time::Duration};
5+
use std::{fmt::Display, pin::Pin, time::Duration};
56

6-
use alloy::primitives::B256;
7+
use alloy::{primitives::B256, rpc::types::beacon::events::BeaconNodeEventTopic};
78
use anyhow::{anyhow, bail, ensure};
89
use constants::DEFAULT_BEACON_STATE_REQUEST_TIMEOUT;
910
use ethportal_api::{
@@ -17,19 +18,23 @@ use ethportal_api::{
1718
optimistic_update::LightClientOptimisticUpdateElectra, update::LightClientUpdateElectra,
1819
},
1920
};
21+
use event::BeaconEvent;
22+
use eventsource_client::{Client, ClientBuilder, SSE};
23+
use futures::{Stream, StreamExt};
2024
use reqwest::{
2125
header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE},
2226
Response,
2327
};
2428
use rpc_types::{RootResponse, VersionResponse, VersionedDataResponse, VersionedDataResult};
2529
use serde::de::DeserializeOwned;
2630
use ssz::Decode;
27-
use tracing::{debug, warn};
31+
use tracing::{debug, error, info, warn};
2832
use url::Url;
2933

3034
use super::http_client::{
3135
ClientWithBaseUrl, ContentType, JSON_ACCEPT_PRIORITY, JSON_CONTENT_TYPE, SSZ_CONTENT_TYPE,
3236
};
37+
use crate::http_client::get_authorization_headers;
3338

3439
/// Implements endpoints from the Beacon API to access data from the consensus layer.
3540
#[derive(Clone, Debug)]
@@ -235,6 +240,55 @@ impl ConsensusApi {
235240
.message)
236241
}
237242

243+
pub fn get_events_stream(
244+
&self,
245+
topics: &[BeaconNodeEventTopic],
246+
stream_tag: &'static str,
247+
) -> anyhow::Result<Pin<Box<dyn Stream<Item = BeaconEvent> + Send>>> {
248+
let endpoint = self.primary.base_url().join(&format!(
249+
"/eth/v1/events?topics={}",
250+
topics
251+
.iter()
252+
.map(|topic| topic.query_value())
253+
.collect::<Vec<_>>()
254+
.join(",")
255+
))?;
256+
257+
let mut client_builder = ClientBuilder::for_url(endpoint.as_str())?;
258+
for (key, value) in get_authorization_headers(self.primary.base_url().clone())? {
259+
client_builder = client_builder.header(key.as_ref(), value.to_str()?)?;
260+
}
261+
262+
Ok(client_builder
263+
.build()
264+
.stream()
265+
.filter_map(move |event| async move {
266+
let event = match event {
267+
Ok(SSE::Event(event)) => event,
268+
Ok(SSE::Connected(connection_details)) => {
269+
info!("{stream_tag}: Connected to SSE stream: {connection_details:?}");
270+
return None;
271+
}
272+
Ok(SSE::Comment(comment)) => {
273+
info!("{stream_tag}: Received comment: {comment:?}");
274+
return None;
275+
}
276+
Err(err) => {
277+
error!("{stream_tag}: Error receiving event: {err:?}");
278+
return None;
279+
}
280+
};
281+
match BeaconEvent::try_from(event) {
282+
Ok(event) => Some(event),
283+
Err(err) => {
284+
error!("{stream_tag}: Failed to decode event: {err:?}");
285+
None
286+
}
287+
}
288+
})
289+
.boxed())
290+
}
291+
238292
/// Make a request to the cl provider. If the primary provider fails, it will retry with the
239293
/// fallback.
240294
async fn request<T>(

crates/ethereum-rpc-client/src/http_client.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{env, time::Duration};
22

3+
use anyhow::{anyhow, bail};
34
use reqwest::{
4-
header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE},
5+
header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE},
56
Client, IntoUrl, Request, Response,
67
};
78
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
@@ -31,7 +32,7 @@ impl ClientWithBaseUrl {
3132
url: Url,
3233
request_timeout: u64,
3334
content_type: ContentType,
34-
) -> Result<ClientWithBaseUrl, String> {
35+
) -> anyhow::Result<ClientWithBaseUrl> {
3536
let mut headers = HeaderMap::new();
3637
match content_type {
3738
ContentType::Json => {
@@ -43,40 +44,17 @@ impl ClientWithBaseUrl {
4344
}
4445
}
4546

46-
if let Some(host) = url.host_str() {
47-
if host.contains("pandaops.io") {
48-
let client_id = env::var("PANDAOPS_CLIENT_ID").unwrap_or_else(|_| {
49-
error!("Pandaops provider detected without PANDAOPS_CLIENT_ID set");
50-
"null".to_string()
51-
});
52-
53-
let client_secret = env::var("PANDAOPS_CLIENT_SECRET").unwrap_or_else(|_| {
54-
error!("Pandaops provider detected without PANDAOPS_CLIENT_SECRET set");
55-
"null".to_string()
56-
});
57-
58-
headers.insert(
59-
"CF-Access-Client-Id",
60-
HeaderValue::from_str(&client_id)
61-
.map_err(|_| "Invalid client id header value")?,
62-
);
63-
64-
headers.insert(
65-
"CF-Access-Client-Secret",
66-
HeaderValue::from_str(&client_secret)
67-
.map_err(|_| "Invalid client secret header value")?,
68-
);
69-
}
70-
} else {
71-
return Err("Failed to find host string".into());
47+
let authentication_headers = get_authorization_headers(url.clone())?;
48+
for (key, value) in authentication_headers {
49+
headers.insert(key, value);
7250
}
7351

7452
// Add retry middleware
7553
let reqwest_client = Client::builder()
7654
.default_headers(headers)
7755
.timeout(Duration::from_secs(request_timeout))
7856
.build()
79-
.map_err(|_| "Failed to build HTTP client")?;
57+
.map_err(|err| anyhow!("Failed to build HTTP client {err:?}"))?;
8058
let client = ClientBuilder::new(reqwest_client)
8159
.with(RetryTransientMiddleware::new_with_policy(
8260
ExponentialBackoff::builder().build_with_max_retries(3),
@@ -111,3 +89,32 @@ impl ClientWithBaseUrl {
11189
self.client.execute(request).await
11290
}
11391
}
92+
93+
pub fn get_authorization_headers(url: Url) -> anyhow::Result<Vec<(HeaderName, HeaderValue)>> {
94+
let mut headers = vec![];
95+
if let Some(host) = url.host_str() {
96+
if host.contains("pandaops.io") {
97+
let client_id = env::var("PANDAOPS_CLIENT_ID").unwrap_or_else(|_| {
98+
error!("Pandaops provider detected without PANDAOPS_CLIENT_ID set");
99+
"null".to_string()
100+
});
101+
102+
let client_secret = env::var("PANDAOPS_CLIENT_SECRET").unwrap_or_else(|_| {
103+
error!("Pandaops provider detected without PANDAOPS_CLIENT_SECRET set");
104+
"null".to_string()
105+
});
106+
107+
headers.push((
108+
HeaderName::from_static("cf-access-client-id"),
109+
HeaderValue::from_str(&client_id)?,
110+
));
111+
headers.push((
112+
HeaderName::from_static("cf-access-client-secret"),
113+
HeaderValue::from_str(&client_secret)?,
114+
));
115+
}
116+
} else {
117+
bail!("Failed to find host string");
118+
}
119+
Ok(headers)
120+
}

0 commit comments

Comments
 (0)