Skip to content

Commit 7f92186

Browse files
chore(spring-kafka-test): general improvements in the project
1 parent e8fefad commit 7f92186

File tree

13 files changed

+133
-44
lines changed

13 files changed

+133
-44
lines changed

.github/workflows/spring-kafka-example.yml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,19 @@ jobs:
6868
echo "Checking the health of the containers..."
6969
7070
SERVICES="${{ steps.services.outputs.services }}"
71+
7172
for service in $SERVICES; do
72-
status=$(docker inspect -f '{{.State.Running}}' "$service" || echo "false")
73-
echo "$service status: $status"
74-
if [ "$status" != "true" ]; then
75-
echo "::error ::Service $service is not running."
76-
exit 1
73+
running=$(docker inspect -f '{{.State.Running}}' "$service" 2>/dev/null || echo "false")
74+
75+
if [ "$running" = "true" ]; then
76+
echo "$service is running ✔️"
77+
else
78+
exit_code=$(docker inspect -f '{{.State.ExitCode}}' "$service" 2>/dev/null || echo "1")
79+
if [ "$exit_code" = "0" ]; then
80+
echo "$service is stopped but exited with code 0 ✔️"
81+
else
82+
echo "::error ::$service is stopped and exited with code $exit_code ❌"
83+
exit 1
84+
fi
7785
fi
78-
done
86+
done

spring-kafka-example/compose.yaml

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,59 @@ services:
99
environment:
1010
SERVER_PORT: "80"
1111
SPRING_PROFILES_ACTIVE: "default"
12-
KAFKA_SERVER_PORT: kafka:9092
12+
KAFKA_TOPIC_1: spring-kafka-example-simple-topic
13+
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
14+
KAFKA_GROUP_ID: spring-kafka-example-group
15+
KAFKA_AUTO_OFFSET_RESET: earliest
16+
KAFKA_KEY_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
17+
KAFKA_VALUE_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
18+
KAFKA_MISSING_TOPICS_FATAL: false
1319
depends_on:
1420
- kafka
21+
networks:
22+
- kafka-net
1523

1624
kafka:
1725
image: apache/kafka:4.0.0
1826
container_name: kafka
1927
ports:
2028
- "9092:9092"
29+
- "9093:9093"
2130
environment:
2231
KAFKA_NODE_ID: 1
2332
KAFKA_PROCESS_ROLES: broker,controller
24-
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
25-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
26-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
27-
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
33+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
34+
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9094,PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
35+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
2836
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
2937
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
38+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094
3039
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
31-
CLUSTER_ID: qGyb4Z0XQpeoKgUXYfCCLw
3240
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
33-
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
41+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
42+
networks:
43+
- kafka-net
44+
45+
kafka-init:
46+
container_name: kafka-init
47+
image: apache/kafka:4.0.0
48+
depends_on:
49+
kafka:
50+
condition: service_started
51+
command: [ "/bin/bash", "-c", "/create_topic.sh" ]
52+
environment:
53+
KAFKA_HOST: kafka
54+
KAFKA_PORT: 9092
55+
KAFKA_TOPIC_CREATE: spring-kafka-example-simple-topic
56+
volumes:
57+
- type: bind
58+
source: ./docker/create_topic.sh
59+
target: /create_topic.sh
60+
init: true
61+
networks:
62+
- kafka-net
63+
64+
networks:
65+
kafka-net:
66+
driver: bridge
67+
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/bin/bash
2+
3+
echo "Waiting for Kafka on port $KAFKA_PORT..."
4+
while ! nc -z $KAFKA_HOST $KAFKA_PORT; do
5+
sleep 2
6+
done
7+
8+
/opt/kafka/bin/kafka-topics.sh \
9+
--create \
10+
--topic "$KAFKA_TOPIC_CREATE" \
11+
--bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
12+
--partitions 1 \
13+
--replication-factor 1
14+
15+
echo "Topic $KAFKA_TOPIC_CREATE created successfully!"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.io.example.config;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.SerializationFeature;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
9+
@Configuration
10+
public class JacksonConfig {
11+
12+
@Bean
13+
public ObjectMapper objectMapper() {
14+
ObjectMapper mapper = new ObjectMapper();
15+
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
16+
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
17+
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
18+
return mapper;
19+
}
20+
}

spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22

33
@SuppressWarnings("unused")
44
public interface KafkaConsumerService {
5-
void consume(String mensagem);
5+
void consume(String message);
66
}
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
package com.io.example.consumer;
22

3+
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4+
import com.io.example.mapper.JsonMapper;
5+
import lombok.RequiredArgsConstructor;
36
import lombok.extern.slf4j.Slf4j;
47
import org.springframework.kafka.annotation.KafkaListener;
58
import org.springframework.stereotype.Service;
69

710
@Slf4j
811
@Service
12+
@RequiredArgsConstructor
913
@SuppressWarnings("unused")
1014
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
1115

16+
private final JsonMapper mapper;
17+
1218
@KafkaListener(topics = "${spring.kafka.topics.topic-1}" , groupId = "${spring.kafka.consumer.group-id}")
1319
public void consume(String message) {
14-
log.info("Message receive: {} ", message);
20+
MessageRequestDtoRequest messageRequestDtoRequest = mapper.toObject(message, MessageRequestDtoRequest.class);
21+
log.info("Message receive: {} ", messageRequestDtoRequest.toString());
1522
}
1623

1724
}

spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.io.example.controller;
22

33
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4-
import com.io.example.controller.dto.response.GetDtoResponse;
4+
import com.io.example.controller.dto.response.SimpleTopicDtoResponse;
55
import com.io.example.producer.KafkaProducerService;
66
import lombok.RequiredArgsConstructor;
77
import org.springframework.http.HttpStatus;
@@ -15,10 +15,10 @@ public class ExampleController {
1515

1616
private final KafkaProducerService kafkaProducerService;
1717

18-
@PostMapping("/send")
19-
public ResponseEntity<GetDtoResponse> sendMessage(@RequestBody MessageRequestDtoRequest dto){
20-
this.kafkaProducerService.sendMessage(dto.message());
21-
return ResponseEntity.status(HttpStatus.OK).body(new GetDtoResponse("Message sent successfully!"));
18+
@PostMapping("/send/simple-topic")
19+
public ResponseEntity<SimpleTopicDtoResponse> sendMessageToSimpleTopic(@RequestBody MessageRequestDtoRequest dto){
20+
this.kafkaProducerService.sendMessage(dto);
21+
return ResponseEntity.status(HttpStatus.OK).body(new SimpleTopicDtoResponse("Message sent successfully!"));
2222
}
2323

2424
}

spring-kafka-example/src/main/java/com/io/example/controller/dto/response/GetDtoResponse.java

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.io.example.controller.dto.response;
2+
3+
public record SimpleTopicDtoResponse(String message) {}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.io.example.producer;
22

3+
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4+
35
@FunctionalInterface
46
public interface KafkaProducerService {
5-
void sendMessage(String mensagem);
7+
void sendMessage(MessageRequestDtoRequest mensagem);
68
}

0 commit comments

Comments
 (0)