Skip to content

Commit 3c56018

Browse files
gusinaciopedrohba1
andauthored
feat: accept grpc requests in tap-aggregator (#253)
* feat: add proto definitions Signed-off-by: Gustavo Inacio <[email protected]> * feat: gRPC service implementation Signed-off-by: Gustavo Inacio <[email protected]> Signed-off-by: pedro bufulin <[email protected]> * ci: add protobuf compiler Signed-off-by: Gustavo Inacio <[email protected]> * build: add protobuf compiler to dockerfile Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]> Signed-off-by: pedro bufulin <[email protected]> Co-authored-by: pedro bufulin <[email protected]>
1 parent a6c5193 commit 3c56018

File tree

11 files changed

+512
-93
lines changed

11 files changed

+512
-93
lines changed

.github/workflows/tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ jobs:
2626
image: rust:1.83-bookworm
2727
steps:
2828
- uses: actions/checkout@v3
29+
- name: Install protobuf compiler
30+
run: apt-get update && apt-get install protobuf-compiler -y
2931
- uses: actions/cache@v3
3032
with:
3133
path: |
@@ -50,6 +52,8 @@ jobs:
5052
image: rust:1.83-bookworm
5153
steps:
5254
- uses: actions/checkout@v3
55+
- name: Install protobuf compiler
56+
run: apt-get update && apt-get install protobuf-compiler -y
5357
- uses: actions/cache@v3
5458
with:
5559
path: |
@@ -75,6 +79,8 @@ jobs:
7579
image: rust:1.83-bookworm
7680
steps:
7781
- uses: actions/checkout@v3
82+
- name: Install protobuf compiler
83+
run: apt-get update && apt-get install protobuf-compiler -y
7884
- uses: actions/cache@v3
7985
with:
8086
path: |

Dockerfile.tap_aggregator

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
FROM rust:1.83-bookworm as build
22

33
WORKDIR /root
4+
5+
RUN apt-get update && apt-get install -y --no-install-recommends \
6+
protobuf-compiler \
7+
&& rm -rf /var/lib/apt/lists/*
8+
49
COPY . .
510

611
RUN cargo build --release --bin tap_aggregator

tap_aggregator/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,14 @@ axum = { version = "0.7.5", features = [
3535
futures-util = "0.3.28"
3636
lazy_static = "1.4.0"
3737
ruint = "1.10.1"
38-
tower = { version = "0.4", features = ["util"] }
38+
tower = { version = "0.4", features = ["util", "steer"] }
39+
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
40+
prost = "0.13.3"
41+
hyper = { version = "1", features = ["full"] }
42+
43+
[build-dependencies]
44+
tonic-build = "0.12.3"
45+
3946

4047
[dev-dependencies]
4148
jsonrpsee = { workspace = true, features = ["http-client", "jsonrpsee-core"] }

tap_aggregator/build.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
fn main() -> Result<(), Box<dyn std::error::Error>> {
5+
println!("Running build.rs...");
6+
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo");
7+
println!("OUT_DIR: {}", out_dir); // This should print the output directory
8+
9+
tonic_build::compile_protos("./proto/tap_aggregator.proto")?;
10+
Ok(())
11+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
syntax = "proto3";
5+
package tap_aggregator.v1;
6+
7+
message Receipt {
8+
bytes allocation_id = 1;
9+
uint64 timestamp_ns = 2;
10+
uint64 nonce = 3;
11+
Uint128 value = 4;
12+
}
13+
14+
message SignedReceipt {
15+
Receipt message = 1;
16+
bytes signature = 2;
17+
}
18+
19+
message ReceiptAggregateVoucher {
20+
bytes allocation_id = 1;
21+
uint64 timestamp_ns = 2;
22+
Uint128 value_aggregate = 3;
23+
}
24+
25+
message SignedRav {
26+
ReceiptAggregateVoucher message = 1;
27+
bytes signature = 2;
28+
}
29+
30+
message RavRequest {
31+
repeated SignedReceipt receipts = 1;
32+
optional SignedRav previous_rav = 2;
33+
}
34+
35+
message RavResponse {
36+
SignedRav rav = 1;
37+
}
38+
39+
service TapAggregator {
40+
rpc AggregateReceipts(RavRequest) returns (RavResponse);
41+
}
42+
43+
message Uint128 {
44+
// Highest 64 bits of a 128 bit number.
45+
uint64 high = 1;
46+
// Lowest 64 bits of a 128 bit number.
47+
uint64 low = 2;
48+
}

tap_aggregator/src/grpc.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use anyhow::anyhow;
5+
use tap_core::signed_message::EIP712SignedMessage;
6+
7+
tonic::include_proto!("tap_aggregator.v1");
8+
9+
impl TryFrom<Receipt> for tap_core::receipt::Receipt {
10+
type Error = anyhow::Error;
11+
fn try_from(receipt: Receipt) -> Result<Self, Self::Error> {
12+
Ok(Self {
13+
allocation_id: receipt.allocation_id.as_slice().try_into()?,
14+
timestamp_ns: receipt.timestamp_ns,
15+
value: receipt.value.ok_or(anyhow!("Missing value"))?.into(),
16+
nonce: receipt.nonce,
17+
})
18+
}
19+
}
20+
21+
impl TryFrom<SignedReceipt> for tap_core::receipt::SignedReceipt {
22+
type Error = anyhow::Error;
23+
fn try_from(receipt: SignedReceipt) -> Result<Self, Self::Error> {
24+
Ok(Self {
25+
signature: receipt.signature.as_slice().try_into()?,
26+
message: receipt
27+
.message
28+
.ok_or(anyhow!("Missing message"))?
29+
.try_into()?,
30+
})
31+
}
32+
}
33+
34+
impl From<tap_core::receipt::Receipt> for Receipt {
35+
fn from(value: tap_core::receipt::Receipt) -> Self {
36+
Self {
37+
allocation_id: value.allocation_id.as_slice().to_vec(),
38+
timestamp_ns: value.timestamp_ns,
39+
nonce: value.nonce,
40+
value: Some(value.value.into()),
41+
}
42+
}
43+
}
44+
45+
impl From<tap_core::receipt::SignedReceipt> for SignedReceipt {
46+
fn from(value: tap_core::receipt::SignedReceipt) -> Self {
47+
Self {
48+
message: Some(value.message.into()),
49+
signature: value.signature.as_bytes().to_vec(),
50+
}
51+
}
52+
}
53+
54+
impl TryFrom<SignedRav> for EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher> {
55+
type Error = anyhow::Error;
56+
fn try_from(voucher: SignedRav) -> Result<Self, Self::Error> {
57+
Ok(Self {
58+
signature: voucher.signature.as_slice().try_into()?,
59+
message: voucher
60+
.message
61+
.ok_or(anyhow!("Missing message"))?
62+
.try_into()?,
63+
})
64+
}
65+
}
66+
67+
impl From<EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>> for SignedRav {
68+
fn from(voucher: EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>) -> Self {
69+
Self {
70+
signature: voucher.signature.as_bytes().to_vec(),
71+
message: Some(voucher.message.into()),
72+
}
73+
}
74+
}
75+
76+
impl TryFrom<ReceiptAggregateVoucher> for tap_core::rav::ReceiptAggregateVoucher {
77+
type Error = anyhow::Error;
78+
fn try_from(voucher: ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
79+
Ok(Self {
80+
allocationId: voucher.allocation_id.as_slice().try_into()?,
81+
timestampNs: voucher.timestamp_ns,
82+
valueAggregate: voucher
83+
.value_aggregate
84+
.ok_or(anyhow!("Missing Value Aggregate"))?
85+
.into(),
86+
})
87+
}
88+
}
89+
90+
impl From<tap_core::rav::ReceiptAggregateVoucher> for ReceiptAggregateVoucher {
91+
fn from(voucher: tap_core::rav::ReceiptAggregateVoucher) -> Self {
92+
Self {
93+
allocation_id: voucher.allocationId.to_vec(),
94+
timestamp_ns: voucher.timestampNs,
95+
value_aggregate: Some(voucher.valueAggregate.into()),
96+
}
97+
}
98+
}
99+
100+
impl From<Uint128> for u128 {
101+
fn from(Uint128 { high, low }: Uint128) -> Self {
102+
((high as u128) << 64) | low as u128
103+
}
104+
}
105+
106+
impl From<u128> for Uint128 {
107+
fn from(value: u128) -> Self {
108+
let high = (value >> 64) as u64;
109+
let low = value as u64;
110+
Self { high, low }
111+
}
112+
}
113+
114+
impl RavRequest {
115+
pub fn new(
116+
receipts: Vec<tap_core::receipt::SignedReceipt>,
117+
previous_rav: Option<tap_core::rav::SignedRAV>,
118+
) -> Self {
119+
Self {
120+
receipts: receipts.into_iter().map(Into::into).collect(),
121+
previous_rav: previous_rav.map(Into::into),
122+
}
123+
}
124+
}
125+
126+
impl RavResponse {
127+
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
128+
let signed_rav: tap_core::rav::SignedRAV = self
129+
.rav
130+
.take()
131+
.ok_or(anyhow!("Couldn't find rav"))?
132+
.try_into()?;
133+
Ok(signed_rav)
134+
}
135+
}

tap_aggregator/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
pub mod aggregator;
55
pub mod api_versioning;
66
pub mod error_codes;
7+
pub mod grpc;
78
pub mod jsonrpsee_helpers;
89
pub mod metrics;
910
pub mod server;

tap_aggregator/src/main.rs

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,15 @@
33

44
#![doc = include_str!("../README.md")]
55

6-
use std::borrow::Cow;
7-
use std::collections::HashSet;
8-
use std::str::FromStr;
9-
10-
use alloy::dyn_abi::Eip712Domain;
11-
use alloy::primitives::Address;
12-
use alloy::primitives::FixedBytes;
13-
use alloy::signers::local::PrivateKeySigner;
6+
use std::{collections::HashSet, str::FromStr};
7+
8+
use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
149
use anyhow::Result;
1510
use clap::Parser;
16-
use ruint::aliases::U256;
17-
use tokio::signal::unix::{signal, SignalKind};
18-
1911
use log::{debug, info};
20-
use tap_aggregator::metrics;
21-
use tap_aggregator::server;
12+
use tap_core::tap_eip712_domain;
13+
14+
use tap_aggregator::{metrics, server};
2215

2316
#[derive(Parser, Debug)]
2417
#[command(author, version, about, long_about = None)]
@@ -126,37 +119,22 @@ async fn main() -> Result<()> {
126119
.await?;
127120
info!("Server started. Listening on port {}.", args.port);
128121

129-
// Have tokio wait for SIGTERM or SIGINT.
130-
let mut signal_sigint = signal(SignalKind::interrupt())?;
131-
let mut signal_sigterm = signal(SignalKind::terminate())?;
132-
tokio::select! {
133-
_ = signal_sigint.recv() => debug!("Received SIGINT."),
134-
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
135-
}
122+
let _ = handle.await;
136123

137124
// If we're here, we've received a signal to exit.
138125
info!("Shutting down...");
139-
140-
// Stop the server and wait for it to finish gracefully.
141-
handle.stop()?;
142-
handle.stopped().await;
143-
144-
debug!("Goodbye!");
145126
Ok(())
146127
}
147128

148129
fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
149130
// Transfrom the args into the types expected by Eip712Domain::new().
150131

151132
// Transform optional strings into optional Cow<str>.
152-
let name = args.domain_name.clone().map(Cow::Owned);
153-
let version = args.domain_version.clone().map(Cow::Owned);
154-
155133
// Transform optional strings into optional U256.
156134
if args.domain_chain_id.is_some() {
157135
debug!("Parsing domain chain ID...");
158136
}
159-
let chain_id: Option<U256> = args
137+
let chain_id: Option<u64> = args
160138
.domain_chain_id
161139
.as_ref()
162140
.map(|s| s.parse())
@@ -165,17 +143,13 @@ fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
165143
if args.domain_salt.is_some() {
166144
debug!("Parsing domain salt...");
167145
}
168-
let salt: Option<FixedBytes<32>> = args.domain_salt.as_ref().map(|s| s.parse()).transpose()?;
169146

170147
// Transform optional strings into optional Address.
171148
let verifying_contract: Option<Address> = args.domain_verifying_contract;
172149

173150
// Create the EIP-712 domain separator.
174-
Ok(Eip712Domain::new(
175-
name,
176-
version,
177-
chain_id,
178-
verifying_contract,
179-
salt,
151+
Ok(tap_eip712_domain(
152+
chain_id.unwrap_or(1),
153+
verifying_contract.unwrap_or_default(),
180154
))
181155
}

0 commit comments

Comments
 (0)