Skip to content

Commit 4cf7b59

Browse files
committed
almost done
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 3497509 commit 4cf7b59

File tree

9 files changed

+202
-392
lines changed

9 files changed

+202
-392
lines changed

tap_aggregator/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ description = "A JSON-RPC service for the Timeline Aggregation Protocol that let
1111
name = "tap_aggregator"
1212
path = "src/main.rs"
1313

14-
[[bin]]
15-
name = "client"
16-
1714
[dependencies]
1815
tap_core = { path = "../tap_core", version = "2.0.0" }
1916
serde.workspace = true
@@ -42,7 +39,7 @@ futures-util = "0.3.28"
4239
lazy_static = "1.4.0"
4340
ruint = "1.10.1"
4441
tower = { version = "0.4", features = ["util", "steer"] }
45-
tonic = { version = "0.12.3", features = ["transport"] }
42+
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
4643
prost = "0.13.3"
4744

4845
[build-dependencies]

tap_aggregator/proto/tap_aggregator.proto

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,7 @@ message RavResponse {
3333
SignedRav rav = 1;
3434
}
3535

36-
message TapRpcApiVersion {
37-
string version = 1;
38-
}
39-
40-
message TapRpcApiVersionsInfo {
41-
repeated TapRpcApiVersion versions_supported = 1;
42-
repeated TapRpcApiVersion versions_deprecated = 2;
43-
}
44-
45-
// Optional request message for ApiVersions (TODO: should we use use google.protobuf.Empty?)
46-
message ApiVersionsRequest {}
47-
4836
service TapAggregator {
49-
rpc ApiVersions(ApiVersionsRequest) returns (TapRpcApiVersionsInfo);
5037
rpc AggregateReceipts(RavRequest) returns (RavResponse);
5138
}
5239

@@ -55,4 +42,4 @@ message Uint128 {
5542
uint64 high = 1;
5643
// Lowest 64 bits of a 128 bit number.
5744
uint64 low = 2;
58-
}
45+
}

tap_aggregator/src/tap_aggregator.rs renamed to tap_aggregator/src/grpc.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,26 @@ impl From<u128> for Uint128 {
107107
Self { high, low }
108108
}
109109
}
110+
111+
impl RavRequest {
112+
pub fn new(
113+
receipts: Vec<tap_core::receipt::SignedReceipt>,
114+
previous_rav: Option<tap_core::rav::SignedRAV>,
115+
) -> Self {
116+
Self {
117+
receipts: receipts.into_iter().map(Into::into).collect(),
118+
previous_rav: previous_rav.map(Into::into),
119+
}
120+
}
121+
}
122+
123+
impl RavResponse {
124+
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
125+
let signed_rav: tap_core::rav::SignedRAV = self
126+
.rav
127+
.take()
128+
.ok_or(anyhow!("Couldn't find rav"))?
129+
.try_into()?;
130+
Ok(signed_rav)
131+
}
132+
}

tap_aggregator/src/hybrid.rs

Lines changed: 0 additions & 148 deletions
This file was deleted.

tap_aggregator/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
pub mod aggregator;
55
pub mod api_versioning;
66
pub mod error_codes;
7-
// pub mod hybrid;
7+
pub mod grpc;
88
pub mod jsonrpsee_helpers;
99
pub mod metrics;
1010
pub mod server;
11-
pub mod tap_aggregator;

tap_aggregator/src/main.rs

Lines changed: 12 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,15 @@
33

44
#![doc = include_str!("../README.md")]
55

6-
use std::collections::HashSet;
7-
use std::str::FromStr;
6+
use std::{collections::HashSet, str::FromStr};
87

9-
use alloy::dyn_abi::Eip712Domain;
10-
use alloy::primitives::Address;
11-
use alloy::signers::local::PrivateKeySigner;
8+
use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
129
use anyhow::Result;
13-
use axum::error_handling::HandleError;
14-
use axum::routing::post_service;
15-
use axum::BoxError;
16-
use axum::Router;
1710
use clap::Parser;
18-
use hyper::StatusCode;
11+
use log::{debug, info};
1912
use tap_core::tap_eip712_domain;
20-
use tokio::net::TcpListener;
2113

22-
use log::{debug, info};
23-
use tap_aggregator::metrics;
24-
use tap_aggregator::server;
25-
use tokio::signal;
26-
use tower::make::Shared;
14+
use tap_aggregator::{metrics, server};
2715

2816
#[derive(Parser, Debug)]
2917
#[command(author, version, about, long_about = None)]
@@ -119,96 +107,25 @@ async fn main() -> Result<()> {
119107

120108
// Start the JSON-RPC server.
121109
// This await is non-blocking
122-
let (service, handle) = server::create_rpc_tower_service(
123-
wallet.clone(),
124-
accepted_addresses.clone(),
125-
domain_separator.clone(),
110+
let (handle, _) = server::run_server(
111+
args.port,
112+
wallet,
113+
accepted_addresses,
114+
domain_separator,
126115
args.max_request_body_size,
127116
args.max_response_body_size,
128117
args.max_connections,
129-
)?;
130-
118+
)
119+
.await?;
131120
info!("Server started. Listening on port {}.", args.port);
132121

133-
async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) {
134-
(
135-
StatusCode::INTERNAL_SERVER_ERROR,
136-
format!("Something went wrong: {err}"),
137-
)
138-
}
139-
let router = Router::new().route_service(
140-
"/",
141-
HandleError::new(post_service(service), handle_anyhow_error),
142-
);
143-
144-
let grpc_service = server::create_grpc_service(wallet, accepted_addresses, domain_separator)?;
145-
146-
let service = tower::steer::Steer::new(
147-
[router, grpc_service.into_axum_router()],
148-
|req: &hyper::Request<_>, _services: &[_]| {
149-
if req
150-
.headers()
151-
.get(hyper::header::CONTENT_TYPE)
152-
.map(|content_type| content_type.as_bytes())
153-
.filter(|content_type| content_type.starts_with(b"application/grpc"))
154-
.is_some()
155-
{
156-
// route to the gRPC service (second service element) when the
157-
// header is set
158-
1
159-
} else {
160-
// otherwise route to the REST service
161-
0
162-
}
163-
},
164-
);
165-
166-
// Create a `TcpListener` using tokio.
167-
let listener = TcpListener::bind(&format!("0.0.0.0:{}", args.port))
168-
.await
169-
.expect("Failed to bind to indexer-service port");
170-
171-
if let Err(e) = axum::serve(listener, Shared::new(service))
172-
.with_graceful_shutdown(shutdown_handler())
173-
.await
174-
{
175-
anyhow::bail!("Indexer service error: {e}");
176-
}
122+
let _ = handle.await;
177123

178124
// If we're here, we've received a signal to exit.
179125
info!("Shutting down...");
180-
181-
// Stop the server and wait for it to finish gracefully.
182-
let _ = handle.stop();
183-
handle.stopped().await;
184-
185-
debug!("Goodbye!");
186126
Ok(())
187127
}
188128

189-
/// Graceful shutdown handler
190-
async fn shutdown_handler() {
191-
let ctrl_c = async {
192-
signal::ctrl_c()
193-
.await
194-
.expect("Failed to install Ctrl+C handler");
195-
};
196-
197-
let terminate = async {
198-
signal::unix::signal(signal::unix::SignalKind::terminate())
199-
.expect("Failed to install signal handler")
200-
.recv()
201-
.await;
202-
};
203-
204-
tokio::select! {
205-
_ = ctrl_c => {},
206-
_ = terminate => {},
207-
}
208-
209-
info!("Signal received, starting graceful shutdown");
210-
}
211-
212129
fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
213130
// Transfrom the args into the types expected by Eip712Domain::new().
214131

0 commit comments

Comments
 (0)