Skip to content

Commit 9c41fc9

Browse files
pedrohba1gusinacio
authored andcommitted
feat: gRPC service implementation
1 parent 5f84e20 commit 9c41fc9

File tree

9 files changed

+368
-94
lines changed

9 files changed

+368
-94
lines changed

tap_aggregator/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,14 @@ axum = { version = "0.7.5", features = [
3535
futures-util = "0.3.28"
3636
lazy_static = "1.4.0"
3737
ruint = "1.10.1"
38-
tower = { version = "0.4", features = ["util"] }
38+
tower = { version = "0.4", features = ["util", "steer"] }
39+
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
40+
prost = "0.13.3"
41+
hyper = { version = "1", features = ["full"] }
42+
43+
[build-dependencies]
44+
tonic-build = "0.12.3"
45+
3946

4047
[dev-dependencies]
4148
jsonrpsee = { workspace = true, features = ["http-client", "jsonrpsee-core"] }

tap_aggregator/build.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
fn main() -> Result<(), Box<dyn std::error::Error>> {
5+
println!("Running build.rs...");
6+
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo");
7+
println!("OUT_DIR: {}", out_dir); // This should print the output directory
8+
9+
tonic_build::compile_protos("./proto/tap_aggregator.proto")?;
10+
Ok(())
11+
}

tap_aggregator/proto/tap_aggregator.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
14
syntax = "proto3";
25
package tap_aggregator.v1;
36

@@ -42,4 +45,4 @@ message Uint128 {
4245
uint64 high = 1;
4346
// Lowest 64 bits of a 128 bit number.
4447
uint64 low = 2;
45-
}
48+
}

tap_aggregator/src/tap_aggregator.rs renamed to tap_aggregator/src/grpc.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
14
use anyhow::anyhow;
25
use tap_core::signed_message::EIP712SignedMessage;
36

@@ -28,6 +31,26 @@ impl TryFrom<SignedReceipt> for tap_core::receipt::SignedReceipt {
2831
}
2932
}
3033

34+
impl From<tap_core::receipt::Receipt> for Receipt {
35+
fn from(value: tap_core::receipt::Receipt) -> Self {
36+
Self {
37+
allocation_id: value.allocation_id.as_slice().to_vec(),
38+
timestamp_ns: value.timestamp_ns,
39+
nonce: value.nonce,
40+
value: Some(value.value.into()),
41+
}
42+
}
43+
}
44+
45+
impl From<tap_core::receipt::SignedReceipt> for SignedReceipt {
46+
fn from(value: tap_core::receipt::SignedReceipt) -> Self {
47+
Self {
48+
message: Some(value.message.into()),
49+
signature: value.signature.as_bytes().to_vec(),
50+
}
51+
}
52+
}
53+
3154
impl TryFrom<SignedRav> for EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher> {
3255
type Error = anyhow::Error;
3356
fn try_from(voucher: SignedRav) -> Result<Self, Self::Error> {
@@ -87,3 +110,26 @@ impl From<u128> for Uint128 {
87110
Self { high, low }
88111
}
89112
}
113+
114+
impl RavRequest {
115+
pub fn new(
116+
receipts: Vec<tap_core::receipt::SignedReceipt>,
117+
previous_rav: Option<tap_core::rav::SignedRAV>,
118+
) -> Self {
119+
Self {
120+
receipts: receipts.into_iter().map(Into::into).collect(),
121+
previous_rav: previous_rav.map(Into::into),
122+
}
123+
}
124+
}
125+
126+
impl RavResponse {
127+
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
128+
let signed_rav: tap_core::rav::SignedRAV = self
129+
.rav
130+
.take()
131+
.ok_or(anyhow!("Couldn't find rav"))?
132+
.try_into()?;
133+
Ok(signed_rav)
134+
}
135+
}

tap_aggregator/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
pub mod aggregator;
55
pub mod api_versioning;
66
pub mod error_codes;
7+
pub mod grpc;
78
pub mod jsonrpsee_helpers;
89
pub mod metrics;
910
pub mod server;

tap_aggregator/src/main.rs

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,15 @@
33

44
#![doc = include_str!("../README.md")]
55

6-
use std::borrow::Cow;
7-
use std::collections::HashSet;
8-
use std::str::FromStr;
9-
10-
use alloy::dyn_abi::Eip712Domain;
11-
use alloy::primitives::Address;
12-
use alloy::primitives::FixedBytes;
13-
use alloy::signers::local::PrivateKeySigner;
6+
use std::{collections::HashSet, str::FromStr};
7+
8+
use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
149
use anyhow::Result;
1510
use clap::Parser;
16-
use ruint::aliases::U256;
17-
use tokio::signal::unix::{signal, SignalKind};
18-
1911
use log::{debug, info};
20-
use tap_aggregator::metrics;
21-
use tap_aggregator::server;
12+
use tap_core::tap_eip712_domain;
13+
14+
use tap_aggregator::{metrics, server};
2215

2316
#[derive(Parser, Debug)]
2417
#[command(author, version, about, long_about = None)]
@@ -126,37 +119,22 @@ async fn main() -> Result<()> {
126119
.await?;
127120
info!("Server started. Listening on port {}.", args.port);
128121

129-
// Have tokio wait for SIGTERM or SIGINT.
130-
let mut signal_sigint = signal(SignalKind::interrupt())?;
131-
let mut signal_sigterm = signal(SignalKind::terminate())?;
132-
tokio::select! {
133-
_ = signal_sigint.recv() => debug!("Received SIGINT."),
134-
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
135-
}
122+
let _ = handle.await;
136123

137124
// If we're here, we've received a signal to exit.
138125
info!("Shutting down...");
139-
140-
// Stop the server and wait for it to finish gracefully.
141-
handle.stop()?;
142-
handle.stopped().await;
143-
144-
debug!("Goodbye!");
145126
Ok(())
146127
}
147128

148129
fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
149130
// Transfrom the args into the types expected by Eip712Domain::new().
150131

151132
// Transform optional strings into optional Cow<str>.
152-
let name = args.domain_name.clone().map(Cow::Owned);
153-
let version = args.domain_version.clone().map(Cow::Owned);
154-
155133
// Transform optional strings into optional U256.
156134
if args.domain_chain_id.is_some() {
157135
debug!("Parsing domain chain ID...");
158136
}
159-
let chain_id: Option<U256> = args
137+
let chain_id: Option<u64> = args
160138
.domain_chain_id
161139
.as_ref()
162140
.map(|s| s.parse())
@@ -165,17 +143,13 @@ fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
165143
if args.domain_salt.is_some() {
166144
debug!("Parsing domain salt...");
167145
}
168-
let salt: Option<FixedBytes<32>> = args.domain_salt.as_ref().map(|s| s.parse()).transpose()?;
169146

170147
// Transform optional strings into optional Address.
171148
let verifying_contract: Option<Address> = args.domain_verifying_contract;
172149

173150
// Create the EIP-712 domain separator.
174-
Ok(Eip712Domain::new(
175-
name,
176-
version,
177-
chain_id,
178-
verifying_contract,
179-
salt,
151+
Ok(tap_eip712_domain(
152+
chain_id.unwrap_or(1),
153+
verifying_contract.unwrap_or_default(),
180154
))
181155
}

0 commit comments

Comments
 (0)