Skip to content

Commit dca1b82

Browse files
committed
Implement Kafka integration for comment sanitization and add sanitizedText field
1 parent 6f1a52d commit dca1b82

File tree

3 files changed

+55
-9
lines changed

3 files changed

+55
-9
lines changed

src/main/java/com/programming/commentService/config/KafkaProducerConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
88
import org.springframework.kafka.core.KafkaTemplate;
99
import org.springframework.kafka.core.ProducerFactory;
10-
import org.springframework.kafka.core.ProducerFactory;
11-
import org.springframework.kafka.core.ProducerFactory;
1210

1311
import java.util.HashMap;
1412
import java.util.Map;

src/main/java/com/programming/commentService/controller/CommentController.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package com.programming.commentService.controller;
22

33
import java.util.List;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.UUID;
48

59
import org.springframework.beans.factory.annotation.Autowired;
610
import org.springframework.http.HttpStatus;
711
import org.springframework.http.ResponseEntity;
12+
import org.springframework.kafka.annotation.KafkaListener;
813
import org.springframework.kafka.core.KafkaTemplate;
914
import org.springframework.web.bind.annotation.GetMapping;
1015
import org.springframework.web.bind.annotation.PathVariable;
@@ -42,26 +47,68 @@ public String getServiceName() {
4247
return "Comment Service";
4348
}
4449

50+
private final Gson gson = new Gson();
51+
52+
private final ConcurrentHashMap<String, CompletableFuture<Comment>> futures = new ConcurrentHashMap<>();
53+
private static final String SANITIZE_TOPIC = "sanitize-comments";
54+
private static final String RESULT_TOPIC = "sanitized-comments";
4555

46-
@PostMapping("/upload")
47-
public ResponseEntity<?> uploadComment(@RequestBody Comment comment) {
56+
@KafkaListener(topics = RESULT_TOPIC, groupId = "comment-group")
57+
public void listenSanitizedComments(String message) {
4858
try {
49-
String commentJson = new Gson().toJson(commentRepository.save(comment));
50-
kafkaTemplate.send(TOPIC, commentJson);
59+
// Chuyển đổi JSON từ Kafka thành Comment
60+
Comment sanitizedComment = gson.fromJson(message, Comment.class);
61+
62+
// Nếu comment không có ID, tạo một ID mới
63+
if (sanitizedComment.getId() == null) {
64+
sanitizedComment.setId(UUID.randomUUID().toString());
65+
}
66+
67+
// Lấy CompletableFuture từ futures map và hoàn thành nó
68+
CompletableFuture<Comment> future = futures.get(sanitizedComment.getId());
69+
if (future != null) {
70+
future.complete(sanitizedComment); // Hoàn thành CompletableFuture với comment đã xử lý
71+
} else {
72+
// Nếu không tìm thấy CompletableFuture, log thông báo lỗi
73+
System.err.println("No future found for comment ID: " + sanitizedComment.getId());
74+
}
75+
} catch (Exception e) {
76+
// Ghi lại thông báo lỗi nếu có bất kỳ ngoại lệ nào xảy ra
77+
e.printStackTrace();
78+
}
79+
}
5180

81+
// Nhận comment từ client và gửi tới Kafka để xử lý
82+
@PostMapping("/upload")
83+
public ResponseEntity<?> uploadComment(@RequestBody Comment comment) {
84+
// Nếu comment không có ID, tạo một ID mới
85+
if (comment.getId() == null) {
86+
comment.setId(UUID.randomUUID().toString());
87+
}
5288

53-
Comment save = commentRepository.save(comment);
89+
try {
90+
// Chuyển comment sang JSON và gửi qua Kafka tới Flask (topic: sanitize-comments)
91+
String commentJson = gson.toJson(comment);
92+
kafkaTemplate.send(SANITIZE_TOPIC, commentJson);
5493

94+
// Khởi tạo CompletableFuture để chờ phản hồi từ Flask qua Kafka (topic: sanitized-comments)
95+
CompletableFuture<Comment> future = new CompletableFuture<>();
96+
futures.put(comment.getId(), future); // Lưu CompletableFuture vào futures map
5597

56-
//Produce message to kafka
98+
// Đợi phản hồi từ Kafka, timeout sau 30 giây
99+
Comment sanitizedComment = future.get(30, TimeUnit.SECONDS); // Timeout 30 giây
57100

58-
return ResponseEntity.ok(HttpStatus.CREATED);
101+
// Lưu comment đã xử lý vào MongoDB
102+
Comment savedComment = commentRepository.save(sanitizedComment);
103+
return ResponseEntity.ok(savedComment);
59104
} catch (Exception e) {
60105
return ResponseEntity.internalServerError().body(e.getMessage());
61106
}
62107
}
63108

64109

110+
111+
65112
@GetMapping("/get/{id}")
66113
public ResponseEntity<?> getComment(@PathVariable String id) {
67114
try {

src/main/java/com/programming/commentService/model/Comment.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ public class Comment {
2323
private int likes;
2424
private int dislikes;
2525
private boolean isRely;
26+
private String sanitizedText;
2627
}

0 commit comments

Comments
 (0)