Skip to content

Commit d566d07

Browse files
committed
Add Kafka dependencies and configuration
1 parent 9b10af9 commit d566d07

File tree

5 files changed

+75
-0
lines changed

5 files changed

+75
-0
lines changed

pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@
7070
<artifactId>mockito-junit-jupiter</artifactId>
7171
<scope>test</scope>
7272
</dependency>
73+
74+
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
75+
<dependency>
76+
<groupId>org.springframework.kafka</groupId>
77+
<artifactId>spring-kafka</artifactId>
78+
<version>3.2.3</version>
79+
</dependency>
80+
81+
82+
7383
</dependencies>
7484
<build>
7585
<plugins>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.programming.commentService.config;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.common.serialization.StringSerializer;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.core.ProducerFactory;
10+
import org.springframework.kafka.core.ProducerFactory;
11+
import org.springframework.kafka.core.ProducerFactory;
12+
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
@Configuration
16+
public class KafkaProducerConfig {
17+
18+
@Bean
19+
public Map<String, Object> producerConfigs() {
20+
Map<String, Object> props = new HashMap<>();
21+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
22+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
23+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
24+
return props;
25+
}
26+
27+
@Bean
28+
public ProducerFactory<String, String> producerFactory() {
29+
return new DefaultKafkaProducerFactory<>(producerConfigs());
30+
}
31+
32+
@Bean
33+
public KafkaTemplate<String, String> kafkaTemplate() {
34+
return new KafkaTemplate<>(producerFactory());
35+
}
36+
}

src/main/java/com/programming/commentService/controller/CommentController.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.http.HttpStatus;
77
import org.springframework.http.ResponseEntity;
8+
import org.springframework.kafka.core.KafkaTemplate;
89
import org.springframework.web.bind.annotation.GetMapping;
910
import org.springframework.web.bind.annotation.PathVariable;
1011
import org.springframework.web.bind.annotation.RequestMapping;
1112
import org.springframework.web.bind.annotation.RequestParam;
1213
import org.springframework.web.bind.annotation.RestController;
1314

15+
import com.google.gson.Gson;
1416
import com.programming.commentService.model.Comment;
1517
import com.programming.commentService.repository.CommentRepository;
1618
import com.programming.commentService.service.CommentService;
@@ -24,6 +26,12 @@
2426
@RequestMapping("/comment")
2527
@AllArgsConstructor
2628
public class CommentController {
29+
30+
@Autowired
31+
private KafkaTemplate<String, String> kafkaTemplate;
32+
33+
private static final String TOPIC = "new-comments";
34+
2735
private final CommentRepository commentRepository;
2836

2937
@Autowired
@@ -38,7 +46,15 @@ public String getServiceName() {
3846
@PostMapping("/upload")
3947
public ResponseEntity<?> uploadComment(@RequestBody Comment comment) {
4048
try {
49+
String commentJson = new Gson().toJson(commentRepository.save(comment));
50+
kafkaTemplate.send(TOPIC, commentJson);
51+
52+
4153
Comment save = commentRepository.save(comment);
54+
55+
56+
//Produce message to kafka
57+
4258
return ResponseEntity.ok(HttpStatus.CREATED);
4359
} catch (Exception e) {
4460
return ResponseEntity.internalServerError().body(e.getMessage());
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
spring.kafka.producer.properties.max.request.size=500000000 # Set the maximum request size for producer
2+
spring.kafka.producer.properties.buffer.memory=1024000000 # Set the buffer memory for producer

src/main/resources/application.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,14 @@ spring:
77
data:
88
mongodb:
99
uri: ${SPRING_DATA_MONGODB_URI:mongodb://localhost:27017/comment-service}
10+
# uri: ${SPRING_DATA_MONGODB_URI:mongodb://localhost:27777/comment-service}
11+
12+
13+
kafka:
14+
bootstrap-servers: localhost:9092 # Địa chỉ Kafka broker
15+
producer:
16+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
17+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
18+
consumer:
19+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
20+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

0 commit comments

Comments
 (0)