Skip to content

Commit e1554a2

Browse files
committed
more wip
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 16de227 commit e1554a2

File tree

6 files changed

+326
-30
lines changed

6 files changed

+326
-30
lines changed

tap_aggregator/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ description = "A JSON-RPC service for the Timeline Aggregation Protocol that let
1111
name = "tap_aggregator"
1212
path = "src/main.rs"
1313

14+
[[bin]]
15+
name = "client"
16+
1417
[dependencies]
1518
tap_core = { path = "../tap_core", version = "2.0.0" }
1619
serde.workspace = true
@@ -22,6 +25,9 @@ jsonrpsee = { workspace = true, features = ["server", "macros"] }
2225
clap = { version = "4.5.15", features = ["derive", "env"] }
2326
strum = { version = "0.26.3", features = ["derive"] }
2427
tracing-subscriber = "0.3.17"
28+
pin-project = "1.1.7"
29+
hyper = { version = "1", features = ["full"] }
30+
hyper-util = "0.1.10"
2531
log = "0.4.19"
2632
prometheus = "0.13.3"
2733
axum = { version = "0.7.5", features = [
@@ -40,7 +46,7 @@ tonic = { version = "0.12.3", features = ["transport"] }
4046
prost = "0.13.3"
4147

4248
[build-dependencies]
43-
tonic-build = "0.12.3"
49+
tonic-build = "0.12.3"
4450

4551

4652
[dev-dependencies]

tap_aggregator/src/bin/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
fn main() {
3+
println!("Hello world");
4+
}

tap_aggregator/src/hybrid.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::{future::Future, marker::PhantomData};
2+
3+
use hyper::body::Frame;
4+
use std::pin::Pin;
5+
use std::task::Poll;
6+
use tower::Service;
7+
8+
use hyper::{
9+
body::{Body, Incoming},
10+
Request, Response,
11+
};
12+
use pin_project::pin_project;
13+
14+
pub fn hybrid_once<Web, Grpc, WebBody, GrpcBody>(
15+
web: Web,
16+
grpc: Grpc,
17+
) -> HybridService<Web, Grpc, WebBody, GrpcBody>
18+
where
19+
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
20+
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
21+
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
22+
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
23+
{
24+
HybridService {
25+
web,
26+
grpc,
27+
_phantom: PhantomData,
28+
}
29+
}
30+
31+
#[derive(Clone)]
32+
pub struct HybridService<Web, Grpc, WebBody, GrpcBody>
33+
where
34+
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
35+
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
36+
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
37+
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
38+
{
39+
web: Web,
40+
grpc: Grpc,
41+
_phantom: PhantomData<(WebBody, GrpcBody)>,
42+
}
43+
44+
impl<Web, Grpc, WebBody, GrpcBody> Service<Request<Incoming>>
45+
for HybridService<Web, Grpc, WebBody, GrpcBody>
46+
where
47+
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
48+
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
49+
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
50+
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
51+
{
52+
type Response = Response<HybridBody<WebBody, GrpcBody>>;
53+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
54+
type Future = HybridFuture<Web::Future, Grpc::Future>;
55+
56+
fn poll_ready(
57+
&mut self,
58+
cx: &mut std::task::Context<'_>,
59+
) -> std::task::Poll<Result<(), Self::Error>> {
60+
match self.web.poll_ready(cx) {
61+
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
62+
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
63+
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
64+
Poll::Pending => Poll::Pending,
65+
},
66+
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
67+
Poll::Pending => Poll::Pending,
68+
}
69+
}
70+
71+
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
72+
if req.headers().get("content-type").map(|x| x.as_bytes()) == Some(b"application/grpc") {
73+
HybridFuture::Grpc(self.grpc.call(req))
74+
} else {
75+
HybridFuture::Web(self.web.call(req))
76+
}
77+
}
78+
}
79+
80+
#[derive(Clone)]
81+
#[pin_project(project = HybridBodyProj)]
82+
pub enum HybridBody<WebBody, GrpcBody> {
83+
Web(#[pin] WebBody),
84+
Grpc(#[pin] GrpcBody),
85+
}
86+
87+
impl<WebBody, GrpcBody> Body for HybridBody<WebBody, GrpcBody>
88+
where
89+
WebBody: Body + Send + Unpin,
90+
GrpcBody: Body<Data = WebBody::Data> + Send + Unpin,
91+
WebBody::Error: std::error::Error + Send + Sync + 'static,
92+
GrpcBody::Error: std::error::Error + Send + Sync + 'static,
93+
{
94+
type Data = WebBody::Data;
95+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
96+
97+
fn is_end_stream(&self) -> bool {
98+
match self {
99+
HybridBody::Web(b) => b.is_end_stream(),
100+
HybridBody::Grpc(b) => b.is_end_stream(),
101+
}
102+
}
103+
104+
fn poll_frame(
105+
self: Pin<&mut Self>,
106+
cx: &mut std::task::Context,
107+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
108+
match self.project() {
109+
HybridBodyProj::Web(b) => b.poll_frame(cx).map_err(|e| e.into()),
110+
HybridBodyProj::Grpc(b) => b.poll_frame(cx).map_err(|e| e.into()),
111+
}
112+
}
113+
}
114+
115+
#[pin_project(project = HybridFutureProj)]
116+
pub enum HybridFuture<WebFuture, GrpcFuture> {
117+
Web(#[pin] WebFuture),
118+
Grpc(#[pin] GrpcFuture),
119+
}
120+
121+
impl<WebFuture, GrpcFuture, WebBody, GrpcBody, WebError, GrpcError> Future
122+
for HybridFuture<WebFuture, GrpcFuture>
123+
where
124+
WebFuture: Future<Output = Result<Response<WebBody>, WebError>>,
125+
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
126+
WebError: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
127+
GrpcError: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
128+
{
129+
type Output = Result<
130+
Response<HybridBody<WebBody, GrpcBody>>,
131+
Box<dyn std::error::Error + Send + Sync + 'static>,
132+
>;
133+
134+
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
135+
match self.project() {
136+
HybridFutureProj::Web(a) => match a.poll(cx) {
137+
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Web))),
138+
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
139+
Poll::Pending => Poll::Pending,
140+
},
141+
HybridFutureProj::Grpc(b) => match b.poll(cx) {
142+
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Grpc))),
143+
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
144+
Poll::Pending => Poll::Pending,
145+
},
146+
}
147+
}
148+
}

tap_aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
pub mod aggregator;
55
pub mod api_versioning;
66
pub mod error_codes;
7+
// pub mod hybrid;
78
pub mod jsonrpsee_helpers;
89
pub mod metrics;
910
pub mod server;
11+
pub mod tap_aggregator;

tap_aggregator/src/main.rs

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,20 @@ use alloy::primitives::Address;
1212
use alloy::primitives::FixedBytes;
1313
use alloy::signers::local::PrivateKeySigner;
1414
use anyhow::Result;
15+
use axum::error_handling::HandleError;
16+
use axum::routing::post_service;
17+
use axum::BoxError;
18+
use axum::Router;
1519
use clap::Parser;
20+
use hyper::StatusCode;
1621
use ruint::aliases::U256;
17-
use tokio::signal::unix::{signal, SignalKind};
22+
use tokio::net::TcpListener;
1823

1924
use log::{debug, info};
2025
use tap_aggregator::metrics;
2126
use tap_aggregator::server;
27+
use tokio::signal;
28+
use tower::make::Shared;
2229

2330
#[derive(Parser, Debug)]
2431
#[command(author, version, about, long_about = None)]
@@ -114,24 +121,60 @@ async fn main() -> Result<()> {
114121

115122
// Start the JSON-RPC server.
116123
// This await is non-blocking
117-
let (handle, _) = server::run_server(
118-
args.port,
119-
wallet,
120-
accepted_addresses,
121-
domain_separator,
124+
let (service, handle) = server::create_rpc_tower_service(
125+
wallet.clone(),
126+
accepted_addresses.clone(),
127+
domain_separator.clone(),
122128
args.max_request_body_size,
123129
args.max_response_body_size,
124130
args.max_connections,
125-
)
126-
.await?;
131+
)?;
132+
127133
info!("Server started. Listening on port {}.", args.port);
128134

129-
// Have tokio wait for SIGTERM or SIGINT.
130-
let mut signal_sigint = signal(SignalKind::interrupt())?;
131-
let mut signal_sigterm = signal(SignalKind::terminate())?;
132-
tokio::select! {
133-
_ = signal_sigint.recv() => debug!("Received SIGINT."),
134-
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
135+
async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) {
136+
(
137+
StatusCode::INTERNAL_SERVER_ERROR,
138+
format!("Something went wrong: {err}"),
139+
)
140+
}
141+
let router = Router::new().route_service(
142+
"/",
143+
HandleError::new(post_service(service), handle_anyhow_error),
144+
);
145+
146+
let grpc_service = server::create_grpc_service(wallet, accepted_addresses, domain_separator)?;
147+
148+
let service = tower::steer::Steer::new(
149+
[router, grpc_service.into_axum_router()],
150+
|req: &hyper::Request<_>, _services: &[_]| {
151+
if req
152+
.headers()
153+
.get(hyper::header::CONTENT_TYPE)
154+
.map(|content_type| content_type.as_bytes())
155+
.filter(|content_type| content_type.starts_with(b"application/grpc"))
156+
.is_some()
157+
{
158+
// route to the gRPC service (second service element) when the
159+
// header is set
160+
1
161+
} else {
162+
// otherwise route to the REST service
163+
0
164+
}
165+
},
166+
);
167+
168+
// Create a `TcpListener` using tokio.
169+
let listener = TcpListener::bind(&format!("0.0.0.0:{}", args.port))
170+
.await
171+
.expect("Failed to bind to indexer-service port");
172+
173+
if let Err(e) = axum::serve(listener, Shared::new(service))
174+
.with_graceful_shutdown(shutdown_handler())
175+
.await
176+
{
177+
anyhow::bail!("Indexer service error: {e}");
135178
}
136179

137180
// If we're here, we've received a signal to exit.
@@ -145,6 +188,29 @@ async fn main() -> Result<()> {
145188
Ok(())
146189
}
147190

191+
/// Graceful shutdown handler
192+
async fn shutdown_handler() {
193+
let ctrl_c = async {
194+
signal::ctrl_c()
195+
.await
196+
.expect("Failed to install Ctrl+C handler");
197+
};
198+
199+
let terminate = async {
200+
signal::unix::signal(signal::unix::SignalKind::terminate())
201+
.expect("Failed to install signal handler")
202+
.recv()
203+
.await;
204+
};
205+
206+
tokio::select! {
207+
_ = ctrl_c => {},
208+
_ = terminate => {},
209+
}
210+
211+
info!("Signal received, starting graceful shutdown");
212+
}
213+
148214
fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
149215
// Transfrom the args into the types expected by Eip712Domain::new().
150216

0 commit comments

Comments
 (0)