Skip to content

Commit 8664582

Browse files
authored
[BAEL 9326] Understanding Message Delivery in Kafka with Multiple Partitions (#18676)
* [BAEL-8394] added junit-jupiter dependency * [BAEL-8394] defined protobuf schema * [BAEL-8394] defined class to manage serialization and deserialization of the generated protobuf map * [BAEL-8394] test for FoodDelivery class * [BAEL-8394] modified imports to include generated sources * [BAEL-8394] added protoc generated source file * [BAEL-8394] modified package in generated source * [BAEL-8394] updated sources with protobuf version * [BAEL-8394] package updated * [BAEL-8394] added properties for versioning * [BAEL-8394] refactored package name and logging * [BAEL-8394] rewritten tests using logger-aware to support verification of logs * [BAEL-8394] fixed formatting for code * [BAEL-8394] formatting changes * [BAEL-8394] removed cleanup from test * [BAEL-9326] code commits for article issue bael-9326 * [BAEL-9326] refactored the article with code changes and minimized example sizes
1 parent 37a1715 commit 8664582

File tree

3 files changed

+311
-1
lines changed

3 files changed

+311
-1
lines changed

apache-kafka-3/docker-compose.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
version: '3.8'
2+
services:
3+
zookeeper:
4+
image: confluentinc/cp-zookeeper:7.4.0
5+
hostname: zookeeper
6+
container_name: zookeeper
7+
ports:
8+
- "2181:2181"
9+
environment:
10+
ZOOKEEPER_CLIENT_PORT: 2181
11+
ZOOKEEPER_TICK_TIME: 2000
12+
13+
kafka:
14+
image: confluentinc/cp-kafka:7.4.0
15+
hostname: kafka
16+
container_name: kafka
17+
depends_on:
18+
- zookeeper
19+
ports:
20+
- "9092:9092"
21+
environment:
22+
KAFKA_BROKER_ID: 1
23+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
24+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
25+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
26+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
27+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
28+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
29+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
30+
KAFKA_NUM_PARTITIONS: 6
31+
32+
kafka-ui:
33+
image: provectuslabs/kafka-ui:latest
34+
container_name: kafka-ui
35+
depends_on:
36+
- kafka
37+
ports:
38+
- "8080:8080"
39+
environment:
40+
KAFKA_CLUSTERS_0_NAME: local
41+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

apache-kafka-3/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,16 @@
4646
<version>${testcontainers-jupiter.version}</version>
4747
<scope>test</scope>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.slf4j</groupId>
51+
<artifactId>slf4j-api</artifactId>
52+
<version>${slf4j.version}</version>
53+
</dependency>
4954
</dependencies>
5055

5156
<properties>
52-
<kafka.version>3.8.0</kafka.version>
57+
<kafka.version>3.9.1</kafka.version>
58+
<slf4j.version>2.0.9</slf4j.version>
5359
<jackson.databind.version>2.15.2</jackson.databind.version>
5460
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
5561
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package com.baeldung.kafka.partitions;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.Properties;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13+
import org.apache.kafka.clients.consumer.ConsumerRecord;
14+
import org.apache.kafka.clients.consumer.ConsumerRecords;
15+
import org.apache.kafka.clients.consumer.KafkaConsumer;
16+
import org.apache.kafka.clients.producer.KafkaProducer;
17+
import org.apache.kafka.clients.producer.ProducerConfig;
18+
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.serialization.StringDeserializer;
21+
import org.apache.kafka.common.serialization.StringSerializer;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
public class KafkaMultiplePartitionsDemo {
26+
27+
private static final Logger logger = LoggerFactory.getLogger(KafkaMultiplePartitionsDemo.class);
28+
private final KafkaProducer<String, String> producer;
29+
private final String bootstrapServers;
30+
31+
public KafkaMultiplePartitionsDemo(String bootstrapServers) {
32+
this.bootstrapServers = bootstrapServers;
33+
this.producer = createProducer();
34+
}
35+
36+
private KafkaProducer<String, String> createProducer() {
37+
Properties props = new Properties();
38+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
39+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
40+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
41+
props.put(ProducerConfig.ACKS_CONFIG, "all");
42+
return new KafkaProducer<>(props);
43+
}
44+
45+
public void sendMessagesWithKey() {
46+
String key = "user-123";
47+
48+
for (int i = 0; i < 5; i++) {
49+
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, "Event " + i);
50+
51+
producer.send(record, (metadata, exception) -> {
52+
if (exception == null) {
53+
logger.info("Key: {}, Partition: {}, Offset: {}", key, metadata.partition(), metadata.offset());
54+
}
55+
});
56+
}
57+
producer.flush();
58+
}
59+
60+
public Map<Integer, Integer> sendMessagesWithoutKey() {
61+
Map<Integer, Integer> partitionCounts = new HashMap<>();
62+
63+
for (int i = 0; i < 100; i++) {
64+
ProducerRecord<String, String> record = new ProducerRecord<>("events", null, // no key
65+
"Message " + i);
66+
67+
producer.send(record, (metadata, exception) -> {
68+
if (exception == null) {
69+
synchronized (partitionCounts) {
70+
partitionCounts.merge(metadata.partition(), 1, Integer::sum);
71+
}
72+
}
73+
});
74+
}
75+
producer.flush();
76+
logger.info("Distribution across partitions: {}", partitionCounts);
77+
return partitionCounts;
78+
}
79+
80+
public void demonstratePartitionOrdering() throws InterruptedException {
81+
String orderId = "order-789";
82+
String[] events = { "created", "validated", "paid", "shipped", "delivered" };
83+
84+
for (String event : events) {
85+
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, event);
86+
87+
producer.send(record, (metadata, exception) -> {
88+
if (exception == null) {
89+
logger.info("Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(), metadata.offset());
90+
}
91+
});
92+
// small delay to demonstrate sequential processing
93+
Thread.sleep(100);
94+
}
95+
producer.flush();
96+
}
97+
98+
public void demonstrateCrossPartitionBehavior() {
99+
long startTime = System.currentTimeMillis();
100+
101+
// these will likely go to different partitions
102+
producer.send(new ProducerRecord<>("events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
103+
producer.send(new ProducerRecord<>("events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
104+
producer.send(new ProducerRecord<>("events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
105+
106+
producer.flush();
107+
}
108+
109+
public void close() {
110+
if (producer != null) {
111+
producer.close();
112+
}
113+
}
114+
115+
public void createConsumerGroup() {
116+
Properties props = new Properties();
117+
props.put("bootstrap.servers", bootstrapServers);
118+
props.put("group.id", "order-processors");
119+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
120+
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
121+
props.put("auto.offset.reset", "earliest");
122+
123+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
124+
consumer.subscribe(Arrays.asList("orders"));
125+
126+
int recordCount = 0;
127+
while (recordCount < 10) { // process limited records for demo
128+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
129+
130+
for (ConsumerRecord<String, String> record : records) {
131+
logger.info("Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
132+
.getName(), record.partition(), record.offset(), record.value());
133+
recordCount++;
134+
}
135+
consumer.commitSync();
136+
}
137+
consumer.close();
138+
}
139+
140+
public void startMultipleGroups() {
141+
String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
142+
CountDownLatch latch = new CountDownLatch(groupIds.length);
143+
for (String groupId : groupIds) {
144+
startConsumerGroup(groupId, latch);
145+
}
146+
147+
try {
148+
latch.await(10, TimeUnit.SECONDS);
149+
} catch (InterruptedException e) {
150+
Thread.currentThread()
151+
.interrupt();
152+
}
153+
}
154+
155+
private void startConsumerGroup(String groupId, CountDownLatch latch) {
156+
Properties props = new Properties();
157+
props.put("bootstrap.servers", bootstrapServers);
158+
props.put("group.id", groupId);
159+
props.put("auto.offset.reset", "earliest");
160+
props.put("key.deserializer", StringDeserializer.class.getName());
161+
props.put("value.deserializer", StringDeserializer.class.getName());
162+
163+
new Thread(() -> {
164+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
165+
consumer.subscribe(Arrays.asList("orders"));
166+
167+
int recordCount = 0;
168+
while (recordCount < 5) {
169+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
170+
recordCount += processRecordsForGroup(groupId, records);
171+
}
172+
} finally {
173+
latch.countDown();
174+
}
175+
}).start();
176+
}
177+
178+
private int processRecordsForGroup(String groupId, ConsumerRecords<String, String> records) {
179+
int count = 0;
180+
for (ConsumerRecord<String, String> record : records) {
181+
logger.info("[{}] Processing: {}", groupId, record.value());
182+
count++;
183+
}
184+
return count;
185+
}
186+
187+
public void configureCooperativeRebalancing() {
188+
Properties props = new Properties();
189+
props.put("bootstrap.servers", bootstrapServers);
190+
props.put("group.id", "cooperative-group");
191+
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
192+
props.put("key.deserializer", StringDeserializer.class.getName());
193+
props.put("value.deserializer", StringDeserializer.class.getName());
194+
props.put("auto.offset.reset", "earliest");
195+
196+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
197+
198+
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
199+
@Override
200+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
201+
logger.info("Revoked partitions: {}", partitions);
202+
// complete processing of current records
203+
}
204+
205+
@Override
206+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
207+
logger.info("Assigned partitions: {}", partitions);
208+
// initialize any partition-specific state
209+
}
210+
});
211+
212+
// process a few records to demonstrate
213+
int recordCount = 0;
214+
while (recordCount < 5) {
215+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
216+
recordCount += records.count();
217+
}
218+
219+
consumer.close();
220+
}
221+
222+
public void processWithManualCommit() {
223+
Properties props = new Properties();
224+
props.put("bootstrap.servers", bootstrapServers);
225+
props.put("group.id", "manual-commit-group");
226+
props.put("enable.auto.commit", "false");
227+
props.put("max.poll.records", "10");
228+
props.put("key.deserializer", StringDeserializer.class.getName());
229+
props.put("value.deserializer", StringDeserializer.class.getName());
230+
props.put("auto.offset.reset", "earliest");
231+
232+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
233+
consumer.subscribe(Arrays.asList("orders"));
234+
235+
int totalProcessed = 0;
236+
while (totalProcessed < 10) {
237+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
238+
239+
for (ConsumerRecord<String, String> record : records) {
240+
try {
241+
processOrder(record);
242+
totalProcessed++;
243+
} catch (Exception e) {
244+
logger.error("Processing failed for offset: {}", record.offset(), e);
245+
break;
246+
}
247+
}
248+
249+
if (!records.isEmpty()) {
250+
consumer.commitSync();
251+
logger.info("Committed {} records", records.count());
252+
}
253+
}
254+
255+
consumer.close();
256+
}
257+
258+
private void processOrder(ConsumerRecord<String, String> record) {
259+
// simulate order processing
260+
logger.info("Processing order: {}", record.value());
261+
// this section is mostly your part of implementation, which is out of bounds of the article topic coverage
262+
}
263+
}

0 commit comments

Comments
 (0)