Skip to content

Commit 7855fc6

Browse files
authored
Merge pull request #277 from Theodus/theodus/aggregator
feat: aggregator: add kafka producer support
2 parents c2dc065 + d15dc11 commit 7855fc6

File tree

11 files changed

+78
-9
lines changed

11 files changed

+78
-9
lines changed

.github/workflows/tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
steps:
3030
- uses: actions/checkout@v3
3131
- name: Install protobuf compiler
32-
run: apt-get update && apt-get install protobuf-compiler -y
32+
run: apt-get update && apt-get install libsasl2-dev protobuf-compiler -y
3333
- uses: actions/cache@v3
3434
with:
3535
path: |
@@ -55,7 +55,7 @@ jobs:
5555
steps:
5656
- uses: actions/checkout@v3
5757
- name: Install protobuf compiler
58-
run: apt-get update && apt-get install protobuf-compiler -y
58+
run: apt-get update && apt-get install libsasl2-dev protobuf-compiler -y
5959
- uses: actions/cache@v3
6060
with:
6161
path: |
@@ -82,7 +82,7 @@ jobs:
8282
steps:
8383
- uses: actions/checkout@v3
8484
- name: Install protobuf compiler
85-
run: apt-get update && apt-get install protobuf-compiler -y
85+
run: apt-get update && apt-get install libsasl2-dev protobuf-compiler -y
8686
- uses: actions/cache@v3
8787
with:
8888
path: |

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ license = "Apache-2.0"
1616
repository = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
1717

1818
[workspace.dependencies]
19-
alloy = { version = "0.9.2", features = ["full"] }
19+
alloy = { version = "0.11.0", features = ["full"] }
2020
serde = { version = "1.0.217", features = ["derive"] }
2121
rstest = "0.24.0"
2222
anyhow = { version = "1.0.95" }

Dockerfile.tap_aggregator

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ FROM rust:1.83-bookworm as build
33
WORKDIR /root
44

55
RUN apt-get update && apt-get install -y --no-install-recommends \
6-
protobuf-compiler \
6+
libsasl2-dev protobuf-compiler \
77
&& rm -rf /var/lib/apt/lists/*
88

99
COPY . .
@@ -15,7 +15,7 @@ RUN cargo build --release --bin tap_aggregator
1515
FROM debian:bookworm-slim
1616

1717
RUN apt-get update && apt-get install -y --no-install-recommends \
18-
openssl ca-certificates \
18+
ca-certificates libsasl2-dev openssl \
1919
&& rm -rf /var/lib/apt/lists/*
2020
COPY --from=build /root/target/release/tap_aggregator /usr/local/bin/tap_aggregator
2121

tap_aggregator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tonic = { version = "0.12.3", features = ["transport", "zstd"] }
4040
tower = { version = "0.5.2", features = ["util", "steer"] }
4141
tracing-subscriber = "0.3.17"
4242
tap_graph = { version = "0.2.0", path = "../tap_graph", features = ["v2"] }
43+
rdkafka = { version = "0.37.0", features = ["tokio", "sasl"] }
4344

4445
[build-dependencies]
4546
tonic-build = "0.12.3"

tap_aggregator/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ struct Args {
7171
/// Domain salt to be used for the EIP-712 domain separator.
7272
#[arg(long, env = "TAP_DOMAIN_SALT")]
7373
domain_salt: Option<String>,
74+
75+
#[arg(long, env = "TAP_KAFKA_CONFIG")]
76+
kafka_config: Option<String>,
7477
}
7578

7679
#[tokio::main]
@@ -104,6 +107,17 @@ async fn main() -> Result<()> {
104107
accepted_addresses.extend(public_keys.iter().cloned());
105108
}
106109

110+
let kafka = match args.kafka_config {
111+
None => None,
112+
Some(config) => {
113+
let mut client = rdkafka::ClientConfig::new();
114+
for (key, value) in config.split(';').filter_map(|s| s.split_once('=')) {
115+
client.set(key, value);
116+
}
117+
Some(client.create()?)
118+
}
119+
};
120+
107121
// Start the JSON-RPC server.
108122
// This await is non-blocking
109123
let (handle, _) = server::run_server(
@@ -114,6 +128,7 @@ async fn main() -> Result<()> {
114128
args.max_request_body_size,
115129
args.max_response_body_size,
116130
args.max_connections,
131+
kafka,
117132
)
118133
.await?;
119134
info!("Server started. Listening on port {}.", args.port);

tap_aggregator/src/server.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use jsonrpsee::{
1212
server::{ServerBuilder, ServerHandle, TowerService},
1313
};
1414
use lazy_static::lazy_static;
15-
use log::info;
15+
use log::{error, info};
1616
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1717
use tap_core::signed_message::Eip712SignedMessage;
1818
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedReceipt};
@@ -94,6 +94,7 @@ struct RpcImpl {
9494
wallet: PrivateKeySigner,
9595
accepted_addresses: HashSet<Address>,
9696
domain_separator: Eip712Domain,
97+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
9798
}
9899

99100
/// Helper method that checks if the given API version is supported.
@@ -205,6 +206,14 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl {
205206
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
206207
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
207208
AGGREGATION_SUCCESS_COUNTER.inc();
209+
if let Some(kafka) = &self.kafka {
210+
produce_kafka_records(
211+
kafka,
212+
&self.wallet.address(),
213+
&res.message.allocationId,
214+
res.message.valueAggregate,
215+
);
216+
}
208217

209218
let response = v1::RavResponse {
210219
rav: Some(res.into()),
@@ -253,6 +262,14 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl {
253262
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
254263
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
255264
AGGREGATION_SUCCESS_COUNTER.inc();
265+
if let Some(kafka) = &self.kafka {
266+
produce_kafka_records(
267+
kafka,
268+
&res.message.payer,
269+
&res.message.allocationId,
270+
res.message.valueAggregate,
271+
);
272+
}
256273

257274
let response = v2::RavResponse {
258275
rav: Some(res.into()),
@@ -294,6 +311,14 @@ impl RpcServer for RpcImpl {
294311
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
295312
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
296313
AGGREGATION_SUCCESS_COUNTER.inc();
314+
if let Some(kafka) = &self.kafka {
315+
produce_kafka_records(
316+
kafka,
317+
&self.wallet.address(),
318+
&res.data.message.allocationId,
319+
res.data.message.valueAggregate,
320+
);
321+
}
297322
Ok(res)
298323
}
299324
Err(e) => {
@@ -304,6 +329,7 @@ impl RpcServer for RpcImpl {
304329
}
305330
}
306331

332+
#[allow(clippy::too_many_arguments)]
307333
pub async fn run_server(
308334
port: u16,
309335
wallet: PrivateKeySigner,
@@ -312,12 +338,14 @@ pub async fn run_server(
312338
max_request_body_size: u32,
313339
max_response_body_size: u32,
314340
max_concurrent_connections: u32,
341+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
315342
) -> Result<(JoinHandle<()>, std::net::SocketAddr)> {
316343
// Setting up the JSON RPC server
317344
let rpc_impl = RpcImpl {
318345
wallet,
319346
accepted_addresses,
320347
domain_separator,
348+
kafka,
321349
};
322350
let (json_rpc_service, _) = create_json_rpc_service(
323351
rpc_impl.clone(),
@@ -432,6 +460,25 @@ fn create_json_rpc_service(
432460
Ok((handle, server_handle))
433461
}
434462

463+
fn produce_kafka_records(
464+
kafka: &rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>,
465+
sender: &Address,
466+
allocation: &Address,
467+
aggregated_value: u128,
468+
) {
469+
let topic = "gateway_ravs";
470+
let key = format!("{sender:?}:{allocation:?}");
471+
let payload = aggregated_value.to_string();
472+
let result = kafka.send(
473+
rdkafka::producer::BaseRecord::to(topic)
474+
.key(&key)
475+
.payload(&payload),
476+
);
477+
if let Err((err, _)) = result {
478+
error!("error producing to {topic}: {err}");
479+
}
480+
}
481+
435482
#[cfg(test)]
436483
#[allow(clippy::too_many_arguments)]
437484
mod tests {
@@ -508,6 +555,7 @@ mod tests {
508555
http_request_size_limit,
509556
http_response_size_limit,
510557
http_max_concurrent_connections,
558+
None,
511559
)
512560
.await
513561
.unwrap();
@@ -557,6 +605,7 @@ mod tests {
557605
http_request_size_limit,
558606
http_response_size_limit,
559607
http_max_concurrent_connections,
608+
None,
560609
)
561610
.await
562611
.unwrap();
@@ -637,6 +686,7 @@ mod tests {
637686
http_request_size_limit,
638687
http_response_size_limit,
639688
http_max_concurrent_connections,
689+
None,
640690
)
641691
.await
642692
.unwrap();
@@ -714,6 +764,7 @@ mod tests {
714764
http_request_size_limit,
715765
http_response_size_limit,
716766
http_max_concurrent_connections,
767+
None,
717768
)
718769
.await
719770
.unwrap();
@@ -805,6 +856,7 @@ mod tests {
805856
http_request_size_limit,
806857
http_response_size_limit,
807858
http_max_concurrent_connections,
859+
None,
808860
)
809861
.await
810862
.unwrap();

tap_aggregator/tests/aggregate_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async fn aggregation_test() {
3434
max_request_body_size,
3535
max_response_body_size,
3636
max_concurrent_connections,
37+
None,
3738
)
3839
.await
3940
.unwrap();

tap_aggregator/tests/aggregate_v1_and_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async fn aggregation_test() {
3838
max_request_body_size,
3939
max_response_body_size,
4040
max_concurrent_connections,
41+
None,
4142
)
4243
.await
4344
.unwrap();

tap_core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ alloy.workspace = true
1212
anyhow.workspace = true
1313
async-trait = "0.1.85"
1414
rand.workspace = true
15-
serde.workspace = true
1615
thiserror.workspace = true
1716
tokio.workspace = true
1817
tap_receipt = { version = "0.1.0", path = "../tap_receipt" }

tap_graph/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ description = "The Graph TAP receipt structs"
1010
alloy.workspace = true
1111
serde.workspace = true
1212
rand.workspace = true
13-
thiserror.workspace = true
1413
tap_eip712_message = { version = "0.1.0", path = "../tap_eip712_message" }
1514
tap_receipt = { version = "0.1.0", path = "../tap_receipt" }
1615

0 commit comments

Comments
 (0)