Skip to content

Commit 04d4db5

Browse files
committed
Add Kafka producer interface to be used
1 parent 422095f commit 04d4db5

File tree

1 file changed

+64
-0
lines changed

1 file changed

+64
-0
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.kafka;
15+
16+
import org.apache.kafka.clients.producer.ProducerConfig;
17+
import org.apache.kafka.clients.producer.KafkaProducer;
18+
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.serialization.Serializer;
20+
21+
import java.util.Properties;
22+
23+
/**
24+
* Generic Kafka Producer interface to send messages to a specified topic.
25+
* @param <K> - Type of the message key (to be used for partitioning)
26+
* @param <V> - Type of the message value (payload)
27+
*/
28+
public class KafkaProducerInterface<K, V> implements AutoCloseable {
29+
private final KafkaProducer<K, V> producer;
30+
private final String topic;
31+
32+
/**
33+
* Constructor to create a KafkaProducerInterface
34+
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
35+
* @param topic - Kafka topic to which messages will be sent
36+
* @param keySerializer - Kafka supported serializer for the message key
37+
* @param valueSerializer - Kafka supported serializer for the message value
38+
*/
39+
public KafkaProducerInterface(String bootstrapServers, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
40+
this.topic = topic;
41+
Properties props = new Properties();
42+
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
43+
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
44+
this.producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
45+
}
46+
47+
/**
48+
* Send a message to the configured Kafka topic
49+
* @param key - message key for partitioning
50+
* @param value - message value (payload)
51+
*/
52+
public void send(K key, V value) {
53+
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
54+
producer.send(record);
55+
}
56+
57+
/**
58+
* Method to close the Kafka producer instance
59+
*/
60+
@Override
61+
public void close() {
62+
producer.close();
63+
}
64+
}

0 commit comments

Comments
 (0)