Skip to content

Commit 644c14f

Browse files
committed
Merge branch 'master' into BAEL-7919
2 parents e4249a6 + a16d961 commit 644c14f

File tree

97 files changed

+3248
-132
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+3248
-132
lines changed

akka-modules/akka-http/src/test/java/com/baeldung/akkahttp/UserServerUnitTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ public class UserServerUnitTest extends JUnitRouteTest {
1919

2020
TestRoute appRoute = testRoute(new UserServer(userActorRef).routes());
2121

22-
@Ignore
2322
@Test
2423
public void whenRequest_thenActorResponds() {
2524

apache-httpclient-2/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
<configuration>
6565
<source>11</source>
6666
<target>11</target>
67+
<release>11</release>
6768
</configuration>
6869
</plugin>
6970
<plugin>

apache-httpclient/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
<configuration>
8181
<source>11</source>
8282
<target>11</target>
83+
<release>11</release>
8384
</configuration>
8485
</plugin>
8586
<plugin>

apache-kafka-3/docker-compose.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
version: '3.8'
2+
services:
3+
zookeeper:
4+
image: confluentinc/cp-zookeeper:7.4.0
5+
hostname: zookeeper
6+
container_name: zookeeper
7+
ports:
8+
- "2181:2181"
9+
environment:
10+
ZOOKEEPER_CLIENT_PORT: 2181
11+
ZOOKEEPER_TICK_TIME: 2000
12+
13+
kafka:
14+
image: confluentinc/cp-kafka:7.4.0
15+
hostname: kafka
16+
container_name: kafka
17+
depends_on:
18+
- zookeeper
19+
ports:
20+
- "9092:9092"
21+
environment:
22+
KAFKA_BROKER_ID: 1
23+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
24+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
25+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
26+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
27+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
28+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
29+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
30+
KAFKA_NUM_PARTITIONS: 6
31+
32+
kafka-ui:
33+
image: provectuslabs/kafka-ui:latest
34+
container_name: kafka-ui
35+
depends_on:
36+
- kafka
37+
ports:
38+
- "8080:8080"
39+
environment:
40+
KAFKA_CLUSTERS_0_NAME: local
41+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

apache-kafka-3/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,16 @@
4646
<version>${testcontainers-jupiter.version}</version>
4747
<scope>test</scope>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.slf4j</groupId>
51+
<artifactId>slf4j-api</artifactId>
52+
<version>${slf4j.version}</version>
53+
</dependency>
4954
</dependencies>
5055

5156
<properties>
52-
<kafka.version>3.8.0</kafka.version>
57+
<kafka.version>3.9.1</kafka.version>
58+
<slf4j.version>2.0.9</slf4j.version>
5359
<jackson.databind.version>2.15.2</jackson.databind.version>
5460
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
5561
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package com.baeldung.kafka.partitions;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.Properties;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13+
import org.apache.kafka.clients.consumer.ConsumerRecord;
14+
import org.apache.kafka.clients.consumer.ConsumerRecords;
15+
import org.apache.kafka.clients.consumer.KafkaConsumer;
16+
import org.apache.kafka.clients.producer.KafkaProducer;
17+
import org.apache.kafka.clients.producer.ProducerConfig;
18+
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.serialization.StringDeserializer;
21+
import org.apache.kafka.common.serialization.StringSerializer;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
public class KafkaMultiplePartitionsDemo {
26+
27+
private static final Logger logger = LoggerFactory.getLogger(KafkaMultiplePartitionsDemo.class);
28+
private final KafkaProducer<String, String> producer;
29+
private final String bootstrapServers;
30+
31+
public KafkaMultiplePartitionsDemo(String bootstrapServers) {
32+
this.bootstrapServers = bootstrapServers;
33+
this.producer = createProducer();
34+
}
35+
36+
private KafkaProducer<String, String> createProducer() {
37+
Properties props = new Properties();
38+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
39+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
40+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
41+
props.put(ProducerConfig.ACKS_CONFIG, "all");
42+
return new KafkaProducer<>(props);
43+
}
44+
45+
public void sendMessagesWithKey() {
46+
String key = "user-123";
47+
48+
for (int i = 0; i < 5; i++) {
49+
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, "Event " + i);
50+
51+
producer.send(record, (metadata, exception) -> {
52+
if (exception == null) {
53+
logger.info("Key: {}, Partition: {}, Offset: {}", key, metadata.partition(), metadata.offset());
54+
}
55+
});
56+
}
57+
producer.flush();
58+
}
59+
60+
public Map<Integer, Integer> sendMessagesWithoutKey() {
61+
Map<Integer, Integer> partitionCounts = new HashMap<>();
62+
63+
for (int i = 0; i < 100; i++) {
64+
ProducerRecord<String, String> record = new ProducerRecord<>("events", null, // no key
65+
"Message " + i);
66+
67+
producer.send(record, (metadata, exception) -> {
68+
if (exception == null) {
69+
synchronized (partitionCounts) {
70+
partitionCounts.merge(metadata.partition(), 1, Integer::sum);
71+
}
72+
}
73+
});
74+
}
75+
producer.flush();
76+
logger.info("Distribution across partitions: {}", partitionCounts);
77+
return partitionCounts;
78+
}
79+
80+
public void demonstratePartitionOrdering() throws InterruptedException {
81+
String orderId = "order-789";
82+
String[] events = { "created", "validated", "paid", "shipped", "delivered" };
83+
84+
for (String event : events) {
85+
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, event);
86+
87+
producer.send(record, (metadata, exception) -> {
88+
if (exception == null) {
89+
logger.info("Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(), metadata.offset());
90+
}
91+
});
92+
// small delay to demonstrate sequential processing
93+
Thread.sleep(100);
94+
}
95+
producer.flush();
96+
}
97+
98+
public void demonstrateCrossPartitionBehavior() {
99+
long startTime = System.currentTimeMillis();
100+
101+
// these will likely go to different partitions
102+
producer.send(new ProducerRecord<>("events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
103+
producer.send(new ProducerRecord<>("events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
104+
producer.send(new ProducerRecord<>("events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
105+
106+
producer.flush();
107+
}
108+
109+
public void close() {
110+
if (producer != null) {
111+
producer.close();
112+
}
113+
}
114+
115+
public void createConsumerGroup() {
116+
Properties props = new Properties();
117+
props.put("bootstrap.servers", bootstrapServers);
118+
props.put("group.id", "order-processors");
119+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
120+
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
121+
props.put("auto.offset.reset", "earliest");
122+
123+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
124+
consumer.subscribe(Arrays.asList("orders"));
125+
126+
int recordCount = 0;
127+
while (recordCount < 10) { // process limited records for demo
128+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
129+
130+
for (ConsumerRecord<String, String> record : records) {
131+
logger.info("Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
132+
.getName(), record.partition(), record.offset(), record.value());
133+
recordCount++;
134+
}
135+
consumer.commitSync();
136+
}
137+
consumer.close();
138+
}
139+
140+
public void startMultipleGroups() {
141+
String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
142+
CountDownLatch latch = new CountDownLatch(groupIds.length);
143+
for (String groupId : groupIds) {
144+
startConsumerGroup(groupId, latch);
145+
}
146+
147+
try {
148+
latch.await(10, TimeUnit.SECONDS);
149+
} catch (InterruptedException e) {
150+
Thread.currentThread()
151+
.interrupt();
152+
}
153+
}
154+
155+
private void startConsumerGroup(String groupId, CountDownLatch latch) {
156+
Properties props = new Properties();
157+
props.put("bootstrap.servers", bootstrapServers);
158+
props.put("group.id", groupId);
159+
props.put("auto.offset.reset", "earliest");
160+
props.put("key.deserializer", StringDeserializer.class.getName());
161+
props.put("value.deserializer", StringDeserializer.class.getName());
162+
163+
new Thread(() -> {
164+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
165+
consumer.subscribe(Arrays.asList("orders"));
166+
167+
int recordCount = 0;
168+
while (recordCount < 5) {
169+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
170+
recordCount += processRecordsForGroup(groupId, records);
171+
}
172+
} finally {
173+
latch.countDown();
174+
}
175+
}).start();
176+
}
177+
178+
private int processRecordsForGroup(String groupId, ConsumerRecords<String, String> records) {
179+
int count = 0;
180+
for (ConsumerRecord<String, String> record : records) {
181+
logger.info("[{}] Processing: {}", groupId, record.value());
182+
count++;
183+
}
184+
return count;
185+
}
186+
187+
public void configureCooperativeRebalancing() {
188+
Properties props = new Properties();
189+
props.put("bootstrap.servers", bootstrapServers);
190+
props.put("group.id", "cooperative-group");
191+
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
192+
props.put("key.deserializer", StringDeserializer.class.getName());
193+
props.put("value.deserializer", StringDeserializer.class.getName());
194+
props.put("auto.offset.reset", "earliest");
195+
196+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
197+
198+
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
199+
@Override
200+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
201+
logger.info("Revoked partitions: {}", partitions);
202+
// complete processing of current records
203+
}
204+
205+
@Override
206+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
207+
logger.info("Assigned partitions: {}", partitions);
208+
// initialize any partition-specific state
209+
}
210+
});
211+
212+
// process a few records to demonstrate
213+
int recordCount = 0;
214+
while (recordCount < 5) {
215+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
216+
recordCount += records.count();
217+
}
218+
219+
consumer.close();
220+
}
221+
222+
public void processWithManualCommit() {
223+
Properties props = new Properties();
224+
props.put("bootstrap.servers", bootstrapServers);
225+
props.put("group.id", "manual-commit-group");
226+
props.put("enable.auto.commit", "false");
227+
props.put("max.poll.records", "10");
228+
props.put("key.deserializer", StringDeserializer.class.getName());
229+
props.put("value.deserializer", StringDeserializer.class.getName());
230+
props.put("auto.offset.reset", "earliest");
231+
232+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
233+
consumer.subscribe(Arrays.asList("orders"));
234+
235+
int totalProcessed = 0;
236+
while (totalProcessed < 10) {
237+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
238+
239+
for (ConsumerRecord<String, String> record : records) {
240+
try {
241+
processOrder(record);
242+
totalProcessed++;
243+
} catch (Exception e) {
244+
logger.error("Processing failed for offset: {}", record.offset(), e);
245+
break;
246+
}
247+
}
248+
249+
if (!records.isEmpty()) {
250+
consumer.commitSync();
251+
logger.info("Committed {} records", records.count());
252+
}
253+
}
254+
255+
consumer.close();
256+
}
257+
258+
private void processOrder(ConsumerRecord<String, String> record) {
259+
// simulate order processing
260+
logger.info("Processing order: {}", record.value());
261+
// this section is mostly your part of implementation, which is out of bounds of the article topic coverage
262+
}
263+
}

apache-libraries-2/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@
233233
<configuration>
234234
<source>${maven.compiler.source}</source>
235235
<target>${maven.compiler.target}</target>
236+
<release>${maven.compiler.release}</release>
236237
</configuration>
237238
</plugin>
238239
<plugin>
@@ -266,6 +267,7 @@
266267
<geode.core>1.15.1</geode.core>
267268
<maven.compiler.source>17</maven.compiler.source>
268269
<maven.compiler.target>17</maven.compiler.target>
270+
<maven.compiler.release>17</maven.compiler.release>
269271
</properties>
270272

271273
</project>

apache-libraries/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
<configuration>
114114
<source>15</source>
115115
<target>15</target>
116+
<release>15</release>
116117
</configuration>
117118
</plugin>
118119
</plugins>

aws-modules/aws-dynamodb-v2/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
<configuration>
9292
<source>9</source>
9393
<target>9</target>
94+
<release>9</release>
9495
</configuration>
9596
</plugin>
9697
</plugins>

aws-modules/aws-lambda-modules/lambda-function/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
<!-- Exclude module-info.class -->
107107
<exclude>**/module-info.class</exclude>
108108
<!-- Exclude overlapping META-INF files -->
109+
<exclude>META-INF/MANIFEST.MF</exclude>
109110
<exclude>META-INF/LICENSE</exclude>
110111
<exclude>META-INF/NOTICE</exclude>
111112
<exclude>META-INF/LICENSE.txt</exclude>

0 commit comments

Comments
 (0)