Skip to content

Commit 0630865

Browse files
Merge branch 'eugenp:master' into master
2 parents 59e4649 + 042c685 commit 0630865

File tree

87 files changed

+1432
-279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+1432
-279
lines changed

.gitignore

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ dependency-reduced-pom.xml
6161
*.dylib
6262
*.dll
6363

64-
xml/src/test/resources/example_dom4j_new.xml
65-
xml/src/test/resources/example_dom4j_updated.xml
66-
xml/src/test/resources/example_jaxb_new.xml
64+
xml-modules/xml/src/test/resources/example_dom4j_new.xml
65+
xml-modules/xml/src/test/resources/example_dom4j_updated.xml
66+
xml-modules/xml/src/test/resources/example_jaxb_new.xml
6767
core-java-io/hard_link.txt
6868
core-java-io/target_link.txt
6969
core-java/src/main/java/com/baeldung/manifest/MANIFEST.MF
@@ -99,7 +99,7 @@ customers.xml
9999
apache-cxf/cxf-aegis/baeldung.xml
100100
testing-modules/report-*.json
101101

102-
libraries-2/*.db
102+
libraries-4/*.db
103103

104104
apache-spark/data/output
105105
logs/
@@ -140,6 +140,7 @@ persistence-modules/neo4j/data/**
140140
/microservices-modules/micronaut-reactive/.micronaut/test-resources/test-resources.properties
141141
/libraries-security/src/main/resources/home/upload/test_file_SCP.txt
142142
/libraries-security/src/main/resources/home/upload/test_file_SFTP.txt
143+
/libraries-security/decryptedFile
143144
/libraries-data-io/src/test/resources/protocols/gson_user.json
144145
/libraries-io/src/main/resources/application.csv
145146
/libraries-io/src/main/resources/application2.csv
@@ -149,7 +150,7 @@ persistence-modules/neo4j/data/**
149150
/core-java-modules/core-java-io-conversions-3/src/test/resources/xlsxToCsv_output.csv
150151
/core-java-modules/core-java-io-5/output.txt
151152
/core-java-modules/core-java-io-apis-2/sample.txt
152-
/persistence-modules/core-java-persistence-3/test.mv.db
153+
/persistence-modules/core-java-persistence/test.mv.db
153154
/apache-libraries/src/main/java/com/baeldung/apache/avro/
154155
/apache-libraries-2/cars.avro
155156

@@ -160,4 +161,9 @@ persistence-modules/neo4j/data/**
160161
/spring-cloud-modules/spring-cloud-bootstrap/gateway/src/main/resources/static/home/server/*
161162

162163
/web-modules/linkrest/src/main/java/com/baeldung/cayenne/auto/_Department.java
163-
/web-modules/linkrest/src/main/java/com/baeldung/cayenne/auto/_Employee.java
164+
/web-modules/linkrest/src/main/java/com/baeldung/cayenne/auto/_Employee.java
165+
166+
#log4j
167+
logging-modules/log4j/app-dynamic-log.log
168+
logging-modules/logback/conditional.log
169+
logging-modules/logback/filtered.log
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.streams.errors.ErrorHandlerContext;
4+
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
5+
import org.apache.kafka.streams.processor.api.Record;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.Map;
10+
11+
public class CustomProcessingExceptionHandler implements ProcessingExceptionHandler {
12+
13+
private static final Logger log = LoggerFactory.getLogger(CustomProcessingExceptionHandler.class);
14+
15+
@Override
16+
public ProcessingHandlerResponse handle(ErrorHandlerContext errorHandlerContext, Record<?, ?> record, Exception ex) {
17+
log.error("ProcessingExceptionHandler Error for record NodeId: {} | TaskId: {} | Key: {} | Value: {} | Exception: {}",
18+
errorHandlerContext.processorNodeId(), errorHandlerContext.taskId(), record.key(), record.value(), ex.getMessage(), ex);
19+
20+
return ProcessingHandlerResponse.CONTINUE;
21+
}
22+
23+
@Override
24+
public void configure(Map<String, ?> configs) {
25+
}
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.clients.producer.ProducerRecord;
4+
import org.apache.kafka.streams.errors.ErrorHandlerContext;
5+
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.Map;
10+
11+
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
12+
13+
private static final Logger log = LoggerFactory.getLogger(UserSerializer.class);
14+
15+
@Override
16+
public ProductionExceptionHandlerResponse handle(ErrorHandlerContext context, ProducerRecord<byte[], byte[]> record, Exception exception) {
17+
log.error("ProductionExceptionHandler Error producing record NodeId: {} | TaskId: {} | Topic: {} | Partition: {} | Exception: {}",
18+
context.processorNodeId(), context.taskId(), record.topic(), record.partition(), exception.getMessage(), exception);
19+
20+
return ProductionExceptionHandlerResponse.CONTINUE;
21+
}
22+
23+
@Override
24+
public void configure(Map<String, ?> configs) {
25+
}
26+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
public class StreamExceptionHandler implements StreamsUncaughtExceptionHandler {
8+
9+
private static final Logger log = LoggerFactory.getLogger(StreamExceptionHandler.class);
10+
11+
@Override
12+
public StreamThreadExceptionResponse handle(Throwable exception) {
13+
log.error("Stream encountered fatal exception: {}", exception.getMessage(), exception);
14+
return StreamThreadExceptionResponse.REPLACE_THREAD;
15+
}
16+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.baeldung.kafkastreams;
2+
3+
public record User(String id, String name, String country) {
4+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
5+
import org.apache.kafka.common.serialization.Deserializer;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
11+
public class UserDeserializer implements Deserializer<User> {
12+
private static final Logger log = LoggerFactory.getLogger(UserDeserializer.class);
13+
private final ObjectMapper mapper = new ObjectMapper();
14+
15+
@Override
16+
public User deserialize(String topic, byte[] bytes) {
17+
if (bytes == null || bytes.length == 0) {
18+
return null;
19+
}
20+
try {
21+
return mapper.readValue(bytes, User.class);
22+
} catch (IOException ex) {
23+
log.error("Error deserializing the message {} for topic {} error message {}", bytes, topic, ex.getMessage(), ex);
24+
throw new RuntimeException(ex);
25+
}
26+
}
27+
}
28+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.common.serialization.Serdes;
4+
5+
public class UserSerde extends Serdes.WrapperSerde<User> {
6+
public UserSerde() {
7+
super(new UserSerializer(), new UserDeserializer());
8+
}
9+
}
10+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import org.apache.kafka.common.serialization.Serializer;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
public class UserSerializer implements Serializer<User> {
10+
private static final Logger log = LoggerFactory.getLogger(UserSerializer.class);
11+
private final ObjectMapper mapper = new ObjectMapper();
12+
13+
@Override
14+
public byte[] serialize(String topic, User user) {
15+
if (user == null) {
16+
return null;
17+
}
18+
19+
try {
20+
return mapper.writeValueAsBytes(user);
21+
} catch (JsonProcessingException ex) {
22+
log.error("Error deserializing the user {} with exception {}", user, ex.getMessage(), ex);
23+
throw new RuntimeException(ex);
24+
}
25+
}
26+
}
27+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.common.serialization.Serdes;
4+
import org.apache.kafka.streams.*;
5+
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
6+
import org.apache.kafka.streams.kstream.*;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.util.Objects;
11+
import java.util.Properties;
12+
import java.util.UUID;
13+
14+
public class UserStreamService {
15+
private static final Logger log = LoggerFactory.getLogger(UserStreamService.class);
16+
private KafkaStreams kafkaStreams;
17+
18+
public void start(String bootstrapServer) {
19+
StreamsBuilder builder = new StreamsBuilder();
20+
KStream<String, User> userStream = builder.stream(
21+
"user-topic",
22+
Consumed.with(Serdes.String(), new UserSerde())
23+
);
24+
25+
KTable<String, Long> usersPerCountry = userStream
26+
.filter((key, user) ->
27+
Objects.nonNull(user) && user.country() != null && !user.country().isEmpty())
28+
.groupBy((key, user) -> user.country(), Grouped.with(Serdes.String(),
29+
new UserSerde()))
30+
.count(Materialized.as("users_per_country_store"));
31+
32+
usersPerCountry.toStream()
33+
.peek((country, count) -> log.info("Aggregated for country {} with count {}",country, count))
34+
.to("users_per_country", Produced.with(Serdes.String(), Serdes.Long()));
35+
36+
Properties props = getStreamProperties(bootstrapServer);
37+
kafkaStreams = new KafkaStreams(builder.build(), props);
38+
kafkaStreams.setUncaughtExceptionHandler(new StreamExceptionHandler());
39+
40+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
41+
42+
kafkaStreams.start();
43+
}
44+
45+
public void stop() {
46+
if (kafkaStreams != null) {
47+
kafkaStreams.close();
48+
}
49+
}
50+
51+
public void cleanUp() {
52+
if (kafkaStreams != null) {
53+
kafkaStreams.cleanUp();
54+
}
55+
}
56+
57+
private static Properties getStreamProperties(String bootstrapServer) {
58+
Properties props = new Properties();
59+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-country-aggregator" + UUID.randomUUID());
60+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
61+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
62+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
63+
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
64+
LogAndContinueExceptionHandler.class);
65+
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
66+
CustomProcessingExceptionHandler.class);
67+
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
68+
CustomProductionExceptionHandler.class);
69+
70+
return props;
71+
}
72+
}
73+

apache-kafka-2/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void shouldTestKafkaStreams() throws InterruptedException {
4040
.getName());
4141
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
4242
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
43-
43+
4444
// Use a temporary directory for storing state, which will be automatically removed after the test.
4545
try {
4646
Path stateDirectory = Files.createTempDirectory("kafka-streams");
@@ -75,4 +75,4 @@ public void shouldTestKafkaStreams() throws InterruptedException {
7575
Thread.sleep(30000);
7676
streams.close();
7777
}
78-
}
78+
}

0 commit comments

Comments
 (0)