1+ package com .baeldung .kafka .partitions ;
2+
3+ import java .time .Duration ;
4+ import java .util .Arrays ;
5+ import java .util .Collection ;
6+ import java .util .HashMap ;
7+ import java .util .Map ;
8+ import java .util .Properties ;
9+ import java .util .concurrent .CountDownLatch ;
10+ import java .util .concurrent .TimeUnit ;
11+
12+ import org .apache .kafka .clients .consumer .ConsumerRebalanceListener ;
13+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
14+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
15+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
16+ import org .apache .kafka .clients .producer .KafkaProducer ;
17+ import org .apache .kafka .clients .producer .ProducerConfig ;
18+ import org .apache .kafka .clients .producer .ProducerRecord ;
19+ import org .apache .kafka .common .TopicPartition ;
20+ import org .apache .kafka .common .serialization .StringDeserializer ;
21+ import org .apache .kafka .common .serialization .StringSerializer ;
22+ import org .slf4j .Logger ;
23+ import org .slf4j .LoggerFactory ;
24+
25+ public class KafkaMultiplePartitionsDemo {
26+
27+ private static final Logger logger = LoggerFactory .getLogger (KafkaMultiplePartitionsDemo .class );
28+ private final KafkaProducer <String , String > producer ;
29+ private final String bootstrapServers ;
30+
31+ public KafkaMultiplePartitionsDemo (String bootstrapServers ) {
32+ this .bootstrapServers = bootstrapServers ;
33+ this .producer = createProducer ();
34+ }
35+
36+ private KafkaProducer <String , String > createProducer () {
37+ Properties props = new Properties ();
38+ props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
39+ props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
40+ props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
41+ props .put (ProducerConfig .ACKS_CONFIG , "all" );
42+ return new KafkaProducer <>(props );
43+ }
44+
45+ public void sendMessagesWithKey () {
46+ String key = "user-123" ;
47+
48+ for (int i = 0 ; i < 5 ; i ++) {
49+ ProducerRecord <String , String > record = new ProducerRecord <>("user-events" , key , "Event " + i );
50+
51+ producer .send (record , (metadata , exception ) -> {
52+ if (exception == null ) {
53+ logger .info ("Key: {}, Partition: {}, Offset: {}" , key , metadata .partition (), metadata .offset ());
54+ }
55+ });
56+ }
57+ producer .flush ();
58+ }
59+
60+ public Map <Integer , Integer > sendMessagesWithoutKey () {
61+ Map <Integer , Integer > partitionCounts = new HashMap <>();
62+
63+ for (int i = 0 ; i < 100 ; i ++) {
64+ ProducerRecord <String , String > record = new ProducerRecord <>("events" , null , // no key
65+ "Message " + i );
66+
67+ producer .send (record , (metadata , exception ) -> {
68+ if (exception == null ) {
69+ synchronized (partitionCounts ) {
70+ partitionCounts .merge (metadata .partition (), 1 , Integer ::sum );
71+ }
72+ }
73+ });
74+ }
75+ producer .flush ();
76+ logger .info ("Distribution across partitions: {}" , partitionCounts );
77+ return partitionCounts ;
78+ }
79+
80+ public void demonstratePartitionOrdering () throws InterruptedException {
81+ String orderId = "order-789" ;
82+ String [] events = { "created" , "validated" , "paid" , "shipped" , "delivered" };
83+
84+ for (String event : events ) {
85+ ProducerRecord <String , String > record = new ProducerRecord <>("orders" , orderId , event );
86+
87+ producer .send (record , (metadata , exception ) -> {
88+ if (exception == null ) {
89+ logger .info ("Event: {} -> Partition: {}, Offset: {}" , event , metadata .partition (), metadata .offset ());
90+ }
91+ });
92+ // small delay to demonstrate sequential processing
93+ Thread .sleep (100 );
94+ }
95+ producer .flush ();
96+ }
97+
98+ public void demonstrateCrossPartitionBehavior () {
99+ long startTime = System .currentTimeMillis ();
100+
101+ // these will likely go to different partitions
102+ producer .send (new ProducerRecord <>("events" , "key-A" , "First at " + (System .currentTimeMillis () - startTime ) + "ms" ));
103+ producer .send (new ProducerRecord <>("events" , "key-B" , "Second at " + (System .currentTimeMillis () - startTime ) + "ms" ));
104+ producer .send (new ProducerRecord <>("events" , "key-C" , "Third at " + (System .currentTimeMillis () - startTime ) + "ms" ));
105+
106+ producer .flush ();
107+ }
108+
109+ public void close () {
110+ if (producer != null ) {
111+ producer .close ();
112+ }
113+ }
114+
115+ public void createConsumerGroup () {
116+ Properties props = new Properties ();
117+ props .put ("bootstrap.servers" , bootstrapServers );
118+ props .put ("group.id" , "order-processors" );
119+ props .put ("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
120+ props .put ("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
121+ props .put ("auto.offset.reset" , "earliest" );
122+
123+ KafkaConsumer <String , String > consumer = new KafkaConsumer <>(props );
124+ consumer .subscribe (Arrays .asList ("orders" ));
125+
126+ int recordCount = 0 ;
127+ while (recordCount < 10 ) { // process limited records for demo
128+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
129+
130+ for (ConsumerRecord <String , String > record : records ) {
131+ logger .info ("Consumer: {}, Partition: {}, Offset: {}, Value: {}" , Thread .currentThread ()
132+ .getName (), record .partition (), record .offset (), record .value ());
133+ recordCount ++;
134+ }
135+ consumer .commitSync ();
136+ }
137+ consumer .close ();
138+ }
139+
140+ public void startMultipleGroups () {
141+ String [] groupIds = { "analytics-group" , "audit-group" , "notification-group" };
142+ CountDownLatch latch = new CountDownLatch (groupIds .length );
143+ for (String groupId : groupIds ) {
144+ startConsumerGroup (groupId , latch );
145+ }
146+
147+ try {
148+ latch .await (10 , TimeUnit .SECONDS );
149+ } catch (InterruptedException e ) {
150+ Thread .currentThread ()
151+ .interrupt ();
152+ }
153+ }
154+
155+ private void startConsumerGroup (String groupId , CountDownLatch latch ) {
156+ Properties props = new Properties ();
157+ props .put ("bootstrap.servers" , bootstrapServers );
158+ props .put ("group.id" , groupId );
159+ props .put ("auto.offset.reset" , "earliest" );
160+ props .put ("key.deserializer" , StringDeserializer .class .getName ());
161+ props .put ("value.deserializer" , StringDeserializer .class .getName ());
162+
163+ new Thread (() -> {
164+ try (KafkaConsumer <String , String > consumer = new KafkaConsumer <>(props )) {
165+ consumer .subscribe (Arrays .asList ("orders" ));
166+
167+ int recordCount = 0 ;
168+ while (recordCount < 5 ) {
169+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
170+ recordCount += processRecordsForGroup (groupId , records );
171+ }
172+ } finally {
173+ latch .countDown ();
174+ }
175+ }).start ();
176+ }
177+
178+ private int processRecordsForGroup (String groupId , ConsumerRecords <String , String > records ) {
179+ int count = 0 ;
180+ for (ConsumerRecord <String , String > record : records ) {
181+ logger .info ("[{}] Processing: {}" , groupId , record .value ());
182+ count ++;
183+ }
184+ return count ;
185+ }
186+
187+ public void configureCooperativeRebalancing () {
188+ Properties props = new Properties ();
189+ props .put ("bootstrap.servers" , bootstrapServers );
190+ props .put ("group.id" , "cooperative-group" );
191+ props .put ("partition.assignment.strategy" , "org.apache.kafka.clients.consumer.CooperativeStickyAssignor" );
192+ props .put ("key.deserializer" , StringDeserializer .class .getName ());
193+ props .put ("value.deserializer" , StringDeserializer .class .getName ());
194+ props .put ("auto.offset.reset" , "earliest" );
195+
196+ KafkaConsumer <String , String > consumer = new KafkaConsumer <>(props );
197+
198+ consumer .subscribe (Arrays .asList ("orders" ), new ConsumerRebalanceListener () {
199+ @ Override
200+ public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
201+ logger .info ("Revoked partitions: {}" , partitions );
202+ // complete processing of current records
203+ }
204+
205+ @ Override
206+ public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
207+ logger .info ("Assigned partitions: {}" , partitions );
208+ // initialize any partition-specific state
209+ }
210+ });
211+
212+ // process a few records to demonstrate
213+ int recordCount = 0 ;
214+ while (recordCount < 5 ) {
215+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
216+ recordCount += records .count ();
217+ }
218+
219+ consumer .close ();
220+ }
221+
222+ public void processWithManualCommit () {
223+ Properties props = new Properties ();
224+ props .put ("bootstrap.servers" , bootstrapServers );
225+ props .put ("group.id" , "manual-commit-group" );
226+ props .put ("enable.auto.commit" , "false" );
227+ props .put ("max.poll.records" , "10" );
228+ props .put ("key.deserializer" , StringDeserializer .class .getName ());
229+ props .put ("value.deserializer" , StringDeserializer .class .getName ());
230+ props .put ("auto.offset.reset" , "earliest" );
231+
232+ KafkaConsumer <String , String > consumer = new KafkaConsumer <>(props );
233+ consumer .subscribe (Arrays .asList ("orders" ));
234+
235+ int totalProcessed = 0 ;
236+ while (totalProcessed < 10 ) {
237+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
238+
239+ for (ConsumerRecord <String , String > record : records ) {
240+ try {
241+ processOrder (record );
242+ totalProcessed ++;
243+ } catch (Exception e ) {
244+ logger .error ("Processing failed for offset: {}" , record .offset (), e );
245+ break ;
246+ }
247+ }
248+
249+ if (!records .isEmpty ()) {
250+ consumer .commitSync ();
251+ logger .info ("Committed {} records" , records .count ());
252+ }
253+ }
254+
255+ consumer .close ();
256+ }
257+
258+ private void processOrder (ConsumerRecord <String , String > record ) {
259+ // simulate order processing
260+ logger .info ("Processing order: {}" , record .value ());
261+ // this section is mostly your part of implementation, which is out of bounds of the article topic coverage
262+ }
263+ }
0 commit comments