Skip to content

Commit 13d438b

Browse files
committed
feat: aggregator: add kafka producer support
1 parent 41fb094 commit 13d438b

File tree

6 files changed

+48
-1
lines changed

6 files changed

+48
-1
lines changed

tap_aggregator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tonic = { version = "0.12.3", features = ["transport", "zstd"] }
4040
tower = { version = "0.5.2", features = ["util", "steer"] }
4141
tracing-subscriber = "0.3.17"
4242
tap_graph = { version = "0.2.0", path = "../tap_graph", features = ["v2"] }
43+
rdkafka = { version = "0.37.0", features = ["tokio", "sasl"] }
4344

4445
[build-dependencies]
4546
tonic-build = "0.12.3"

tap_aggregator/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ struct Args {
7171
/// Domain salt to be used for the EIP-712 domain separator.
7272
#[arg(long, env = "TAP_DOMAIN_SALT")]
7373
domain_salt: Option<String>,
74+
75+
#[arg(long, env = "TAP_KAFKA_CONFIG")]
76+
kafka_config: Option<String>,
7477
}
7578

7679
#[tokio::main]
@@ -104,6 +107,17 @@ async fn main() -> Result<()> {
104107
accepted_addresses.extend(public_keys.iter().cloned());
105108
}
106109

110+
let kafka = match args.kafka_config {
111+
None => None,
112+
Some(config) => {
113+
let mut client = rdkafka::ClientConfig::new();
114+
for (key, value) in config.split(';').filter_map(|s| s.split_once('=')) {
115+
client.set(key, value);
116+
}
117+
Some(client.create()?)
118+
}
119+
};
120+
107121
// Start the JSON-RPC server.
108122
// This await is non-blocking
109123
let (handle, _) = server::run_server(
@@ -114,6 +128,7 @@ async fn main() -> Result<()> {
114128
args.max_request_body_size,
115129
args.max_response_body_size,
116130
args.max_connections,
131+
kafka,
117132
)
118133
.await?;
119134
info!("Server started. Listening on port {}.", args.port);

tap_aggregator/src/server.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use jsonrpsee::{
1212
server::{ServerBuilder, ServerHandle, TowerService},
1313
};
1414
use lazy_static::lazy_static;
15-
use log::info;
15+
use log::{error, info};
1616
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1717
use tap_core::signed_message::Eip712SignedMessage;
1818
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedReceipt};
@@ -94,6 +94,7 @@ struct RpcImpl {
9494
wallet: PrivateKeySigner,
9595
accepted_addresses: HashSet<Address>,
9696
domain_separator: Eip712Domain,
97+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
9798
}
9899

99100
/// Helper method that checks if the given API version is supported.
@@ -254,6 +255,25 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl {
254255
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
255256
AGGREGATION_SUCCESS_COUNTER.inc();
256257

258+
if let Some(kafka) = &self.kafka {
259+
let topic = "gateway_ravs";
260+
let key = format!(
261+
"{:?}:{:?}:{:?}",
262+
self.wallet.address(),
263+
res.message.payer,
264+
res.message.allocationId,
265+
);
266+
let payload = res.message.valueAggregate.to_string();
267+
let result = kafka.send(
268+
rdkafka::producer::BaseRecord::to(topic)
269+
.key(&key)
270+
.payload(&payload),
271+
);
272+
if let Err((err, _)) = result {
273+
error!("error producing to {topic}: {err}");
274+
}
275+
}
276+
257277
let response = v2::RavResponse {
258278
rav: Some(res.into()),
259279
};
@@ -304,6 +324,7 @@ impl RpcServer for RpcImpl {
304324
}
305325
}
306326

327+
#[allow(clippy::too_many_arguments)]
307328
pub async fn run_server(
308329
port: u16,
309330
wallet: PrivateKeySigner,
@@ -312,12 +333,14 @@ pub async fn run_server(
312333
max_request_body_size: u32,
313334
max_response_body_size: u32,
314335
max_concurrent_connections: u32,
336+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
315337
) -> Result<(JoinHandle<()>, std::net::SocketAddr)> {
316338
// Setting up the JSON RPC server
317339
let rpc_impl = RpcImpl {
318340
wallet,
319341
accepted_addresses,
320342
domain_separator,
343+
kafka,
321344
};
322345
let (json_rpc_service, _) = create_json_rpc_service(
323346
rpc_impl.clone(),
@@ -508,6 +531,7 @@ mod tests {
508531
http_request_size_limit,
509532
http_response_size_limit,
510533
http_max_concurrent_connections,
534+
None,
511535
)
512536
.await
513537
.unwrap();
@@ -557,6 +581,7 @@ mod tests {
557581
http_request_size_limit,
558582
http_response_size_limit,
559583
http_max_concurrent_connections,
584+
None,
560585
)
561586
.await
562587
.unwrap();
@@ -637,6 +662,7 @@ mod tests {
637662
http_request_size_limit,
638663
http_response_size_limit,
639664
http_max_concurrent_connections,
665+
None,
640666
)
641667
.await
642668
.unwrap();
@@ -714,6 +740,7 @@ mod tests {
714740
http_request_size_limit,
715741
http_response_size_limit,
716742
http_max_concurrent_connections,
743+
None,
717744
)
718745
.await
719746
.unwrap();
@@ -805,6 +832,7 @@ mod tests {
805832
http_request_size_limit,
806833
http_response_size_limit,
807834
http_max_concurrent_connections,
835+
None,
808836
)
809837
.await
810838
.unwrap();

tap_aggregator/tests/aggregate_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async fn aggregation_test() {
3434
max_request_body_size,
3535
max_response_body_size,
3636
max_concurrent_connections,
37+
None,
3738
)
3839
.await
3940
.unwrap();

tap_aggregator/tests/aggregate_v1_and_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async fn aggregation_test() {
3838
max_request_body_size,
3939
max_response_body_size,
4040
max_concurrent_connections,
41+
None,
4142
)
4243
.await
4344
.unwrap();

tap_integration_tests/tests/showcase.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,7 @@ async fn start_sender_aggregator(
846846
http_request_size_limit,
847847
http_response_size_limit,
848848
http_max_concurrent_connections,
849+
None,
849850
)
850851
.await?;
851852

0 commit comments

Comments
 (0)