Skip to content

Commit 87722aa

Browse files
committed
feat: aggregator: add kafka producer support
Signed-off-by: Theo Butler <[email protected]>
1 parent 4c173b5 commit 87722aa

File tree

6 files changed

+72
-1
lines changed

6 files changed

+72
-1
lines changed

tap_aggregator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tonic = { version = "0.12.3", features = ["transport", "zstd"] }
4040
tower = { version = "0.5.2", features = ["util", "steer"] }
4141
tracing-subscriber = "0.3.17"
4242
tap_graph = { version = "0.2.0", path = "../tap_graph", features = ["v2"] }
43+
rdkafka = { version = "0.37.0", features = ["tokio", "sasl"] }
4344

4445
[build-dependencies]
4546
tonic-build = "0.12.3"

tap_aggregator/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ struct Args {
7171
/// Domain salt to be used for the EIP-712 domain separator.
7272
#[arg(long, env = "TAP_DOMAIN_SALT")]
7373
domain_salt: Option<String>,
74+
75+
#[arg(long, env = "TAP_KAFKA_CONFIG")]
76+
kafka_config: Option<String>,
7477
}
7578

7679
#[tokio::main]
@@ -104,6 +107,17 @@ async fn main() -> Result<()> {
104107
accepted_addresses.extend(public_keys.iter().cloned());
105108
}
106109

110+
let kafka = match args.kafka_config {
111+
None => None,
112+
Some(config) => {
113+
let mut client = rdkafka::ClientConfig::new();
114+
for (key, value) in config.split(';').filter_map(|s| s.split_once('=')) {
115+
client.set(key, value);
116+
}
117+
Some(client.create()?)
118+
}
119+
};
120+
107121
// Start the JSON-RPC server.
108122
// This await is non-blocking
109123
let (handle, _) = server::run_server(
@@ -114,6 +128,7 @@ async fn main() -> Result<()> {
114128
args.max_request_body_size,
115129
args.max_response_body_size,
116130
args.max_connections,
131+
kafka,
117132
)
118133
.await?;
119134
info!("Server started. Listening on port {}.", args.port);

tap_aggregator/src/server.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use jsonrpsee::{
1212
server::{ServerBuilder, ServerHandle, TowerService},
1313
};
1414
use lazy_static::lazy_static;
15-
use log::info;
15+
use log::{error, info};
1616
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1717
use tap_core::signed_message::Eip712SignedMessage;
1818
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedReceipt};
@@ -94,6 +94,7 @@ struct RpcImpl {
9494
wallet: PrivateKeySigner,
9595
accepted_addresses: HashSet<Address>,
9696
domain_separator: Eip712Domain,
97+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
9798
}
9899

99100
/// Helper method that checks if the given API version is supported.
@@ -205,6 +206,14 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl {
205206
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
206207
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
207208
AGGREGATION_SUCCESS_COUNTER.inc();
209+
if let Some(kafka) = &self.kafka {
210+
produce_kafka_records(
211+
kafka,
212+
&self.wallet.address(),
213+
&res.message.allocationId,
214+
res.message.valueAggregate,
215+
);
216+
}
208217

209218
let response = v1::RavResponse {
210219
rav: Some(res.into()),
@@ -253,6 +262,14 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl {
253262
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
254263
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
255264
AGGREGATION_SUCCESS_COUNTER.inc();
265+
if let Some(kafka) = &self.kafka {
266+
produce_kafka_records(
267+
kafka,
268+
&res.message.payer,
269+
&res.message.allocationId,
270+
res.message.valueAggregate,
271+
);
272+
}
256273

257274
let response = v2::RavResponse {
258275
rav: Some(res.into()),
@@ -294,6 +311,14 @@ impl RpcServer for RpcImpl {
294311
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
295312
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
296313
AGGREGATION_SUCCESS_COUNTER.inc();
314+
if let Some(kafka) = &self.kafka {
315+
produce_kafka_records(
316+
kafka,
317+
&self.wallet.address(),
318+
&res.data.message.allocationId,
319+
res.data.message.valueAggregate,
320+
);
321+
}
297322
Ok(res)
298323
}
299324
Err(e) => {
@@ -304,6 +329,7 @@ impl RpcServer for RpcImpl {
304329
}
305330
}
306331

332+
#[allow(clippy::too_many_arguments)]
307333
pub async fn run_server(
308334
port: u16,
309335
wallet: PrivateKeySigner,
@@ -312,12 +338,14 @@ pub async fn run_server(
312338
max_request_body_size: u32,
313339
max_response_body_size: u32,
314340
max_concurrent_connections: u32,
341+
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
315342
) -> Result<(JoinHandle<()>, std::net::SocketAddr)> {
316343
// Setting up the JSON RPC server
317344
let rpc_impl = RpcImpl {
318345
wallet,
319346
accepted_addresses,
320347
domain_separator,
348+
kafka,
321349
};
322350
let (json_rpc_service, _) = create_json_rpc_service(
323351
rpc_impl.clone(),
@@ -432,6 +460,25 @@ fn create_json_rpc_service(
432460
Ok((handle, server_handle))
433461
}
434462

463+
fn produce_kafka_records(
464+
kafka: &rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>,
465+
sender: &Address,
466+
allocation: &Address,
467+
aggregated_value: u128,
468+
) {
469+
let topic = "gateway_ravs";
470+
let key = format!("{sender:?}:{allocation:?}");
471+
let payload = aggregated_value.to_string();
472+
let result = kafka.send(
473+
rdkafka::producer::BaseRecord::to(topic)
474+
.key(&key)
475+
.payload(&payload),
476+
);
477+
if let Err((err, _)) = result {
478+
error!("error producing to {topic}: {err}");
479+
}
480+
}
481+
435482
#[cfg(test)]
436483
#[allow(clippy::too_many_arguments)]
437484
mod tests {
@@ -508,6 +555,7 @@ mod tests {
508555
http_request_size_limit,
509556
http_response_size_limit,
510557
http_max_concurrent_connections,
558+
None,
511559
)
512560
.await
513561
.unwrap();
@@ -557,6 +605,7 @@ mod tests {
557605
http_request_size_limit,
558606
http_response_size_limit,
559607
http_max_concurrent_connections,
608+
None,
560609
)
561610
.await
562611
.unwrap();
@@ -637,6 +686,7 @@ mod tests {
637686
http_request_size_limit,
638687
http_response_size_limit,
639688
http_max_concurrent_connections,
689+
None,
640690
)
641691
.await
642692
.unwrap();
@@ -714,6 +764,7 @@ mod tests {
714764
http_request_size_limit,
715765
http_response_size_limit,
716766
http_max_concurrent_connections,
767+
None,
717768
)
718769
.await
719770
.unwrap();
@@ -805,6 +856,7 @@ mod tests {
805856
http_request_size_limit,
806857
http_response_size_limit,
807858
http_max_concurrent_connections,
859+
None,
808860
)
809861
.await
810862
.unwrap();

tap_aggregator/tests/aggregate_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async fn aggregation_test() {
3434
max_request_body_size,
3535
max_response_body_size,
3636
max_concurrent_connections,
37+
None,
3738
)
3839
.await
3940
.unwrap();

tap_aggregator/tests/aggregate_v1_and_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async fn aggregation_test() {
3838
max_request_body_size,
3939
max_response_body_size,
4040
max_concurrent_connections,
41+
None,
4142
)
4243
.await
4344
.unwrap();

tap_integration_tests/tests/showcase.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,7 @@ async fn start_sender_aggregator(
846846
http_request_size_limit,
847847
http_response_size_limit,
848848
http_max_concurrent_connections,
849+
None,
849850
)
850851
.await?;
851852

0 commit comments

Comments
 (0)