Skip to content

Commit 681fa28

Browse files
committed
Add Kafka integration and new repositories for video service
1 parent 5c72cf6 commit 681fa28

File tree

11 files changed

+377
-33
lines changed

11 files changed

+377
-33
lines changed

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@
101101
</dependency>
102102

103103

104+
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
105+
<dependency>
106+
<groupId>org.springframework.kafka</groupId>
107+
<artifactId>spring-kafka</artifactId>
108+
<version>3.2.3</version>
109+
</dependency>
110+
111+
104112

105113

106114

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.programming.videoService.config;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.common.serialization.StringDeserializer;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.kafka.annotation.EnableKafka;
8+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
9+
import org.springframework.kafka.core.ConsumerFactory;
10+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
11+
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
@EnableKafka
16+
@Configuration
17+
public class KafkaConsumerConfig {
18+
19+
@Bean
20+
public Map<String, Object> consumerConfigs() {
21+
Map<String, Object> props = new HashMap<>();
22+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
23+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "video-genres-group"); // Tên group ID
24+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
25+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
26+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Lấy dữ liệu từ offset đầu tiên nếu không tìm thấy offset hiện tại
27+
return props;
28+
}
29+
30+
@Bean
31+
public ConsumerFactory<String, String> consumerFactory() {
32+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
33+
}
34+
35+
@Bean
36+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
37+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
38+
new ConcurrentKafkaListenerContainerFactory<>();
39+
factory.setConsumerFactory(consumerFactory());
40+
return factory;
41+
}
42+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.programming.videoService.config;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.common.serialization.StringSerializer;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.core.ProducerFactory;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
@Configuration
14+
public class KafkaProducerConfig {
15+
16+
@Bean
17+
public Map<String, Object> producerConfigs() {
18+
Map<String, Object> props = new HashMap<>();
19+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
20+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
21+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
22+
return props;
23+
}
24+
25+
@Bean
26+
public ProducerFactory<String, String> producerFactory() {
27+
return new DefaultKafkaProducerFactory<>(producerConfigs());
28+
}
29+
30+
@Bean
31+
public KafkaTemplate<String, String> kafkaTemplate() {
32+
return new KafkaTemplate<>(producerFactory());
33+
}
34+
}

src/main/java/com/programming/videoService/controller/VideoController.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.programming.videoService.controller;
22

33
import com.mongodb.client.gridfs.model.GridFSFile;
4+
import com.programming.videoService.model.Video;
45
import com.programming.videoService.service.VideoService;
56
import com.programming.videoService.service.VideoService.VideoWithStream;
7+
8+
import org.slf4j.MDC;
69
import org.springframework.beans.factory.annotation.Autowired;
710
import org.springframework.core.io.InputStreamResource;
811
import org.springframework.http.ContentDisposition;
@@ -20,15 +23,21 @@
2023
import java.sql.Timestamp;
2124
import java.util.Map;
2225

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.slf4j.MDC;
29+
2330
@RestController
2431
@RequestMapping("/video")
2532
public class VideoController {
2633

2734
@Autowired
2835
private VideoService videoService;
29-
36+
private static final Logger logger = LoggerFactory.getLogger(VideoController.class);
3037
@GetMapping("/")
3138
public String getServiceName(){
39+
MDC.put("type", "videoservice");
40+
logger.info("Video Service Start");
3241
return "Video Service";
3342
}
3443

@@ -174,7 +183,7 @@ public int read(byte[] b, int off, int len) throws IOException {
174183
}
175184

176185

177-
//Hanlde Subcri
186+
//Hanlde Subcribe
178187
@PostMapping("/subscribe")
179188
public ResponseEntity<?> subscribe(@RequestParam("subscriberId") String subscriberId,
180189
@RequestParam("subscribedToId") String subscribedToId) {
@@ -236,5 +245,32 @@ public ResponseEntity<Long> getLikeCount(@RequestParam("videoId") String videoId
236245
public ResponseEntity<?> getIdFromLikerToId(@PathVariable String likerToId) {
237246
return new ResponseEntity<>(videoService.getLikedToIdsFromLikerToId(likerToId), HttpStatus.OK);
238247
}
248+
249+
// Hanlde History
250+
@PostMapping("/addHistory")
251+
public ResponseEntity<?> addHistory(@RequestParam("userId") String userId,
252+
@RequestParam("thumbId") String thumbId) {
253+
videoService.addHistory(userId, thumbId);
254+
return new ResponseEntity<>(HttpStatus.OK);
255+
}
256+
257+
@GetMapping("/getHistoryByUserId/{userId}")
258+
public ResponseEntity<?> getHistoryByUserId(@PathVariable String userId) {
259+
return new ResponseEntity<>(videoService.getHistoryByUserId(userId), HttpStatus.OK);
260+
}
261+
262+
//handle Report
263+
@PostMapping("/uploadReport")
264+
public ResponseEntity<?> uploadReport(@RequestParam("videoId") String videoId,
265+
@RequestParam("msg") String msg,
266+
@RequestParam("userId") String userId) {
267+
videoService.uploadReport(videoId, msg, userId);
268+
return new ResponseEntity<>(HttpStatus.OK);
269+
}
270+
271+
@GetMapping("/videoIdLargerThanFive")
272+
public ResponseEntity<?> videoIdLargerThanFive() {
273+
return new ResponseEntity<>(videoService.getReportsWithHighFrequencyVideoIds(), HttpStatus.OK);
274+
}
239275

240276
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.programming.videoService.model;
2+
3+
public class History {
4+
private String userId;
5+
private String thumbId;
6+
7+
public History() {
8+
}
9+
10+
public History(String userId, String thumbId) {
11+
this.userId = userId;
12+
this.thumbId = thumbId;
13+
}
14+
15+
public String getUserId() {
16+
return userId;
17+
}
18+
19+
public void setUserId(String userId) {
20+
this.userId = userId;
21+
}
22+
23+
public String getThumbId() {
24+
return thumbId;
25+
}
26+
27+
public void setThumbId(String thumbId) {
28+
this.thumbId = thumbId;
29+
}
30+
31+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.programming.videoService.model;
2+
3+
4+
public class Report {
5+
private String videoId;
6+
private String msg;
7+
private String userId;
8+
9+
public Report() {
10+
11+
}
12+
13+
public Report(String videoId, String msg, String userId) {
14+
this.videoId = videoId;
15+
this.msg = msg;
16+
this.userId = userId;
17+
}
18+
19+
public String getVideoId() {
20+
return videoId;
21+
}
22+
23+
public String getMsg() {
24+
return msg;
25+
}
26+
27+
public String getUserId() {
28+
return userId;
29+
}
30+
31+
public void setVideoId(String videoId) {
32+
this.videoId = videoId;
33+
}
34+
35+
public void setMsg(String msg) {
36+
this.msg = msg;
37+
}
38+
public void setUserId(String userId) {
39+
this.userId = userId;
40+
}
41+
42+
}

src/main/java/com/programming/videoService/model/Video.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
package com.programming.videoService.model;
2+
3+
import java.util.List;
4+
25
public class Video {
36

47
private String filename;
@@ -10,9 +13,15 @@ public class Video {
1013
private byte[] thumbnail;
1114
private String userName;
1215
private String videoName;
13-
// private int views;
14-
// private int likes;
15-
// private int dislikes;
16+
private List<String> genres;
17+
18+
public List<String> getGenres() {
19+
return genres;
20+
}
21+
22+
public void setGenres(List<String> genres) {
23+
this.genres = genres;
24+
}
1625

1726
public String getUserID() {
1827
return userID;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.programming.videoService.repository;
2+
import com.programming.videoService.model.History;
3+
import org.springframework.data.mongodb.repository.MongoRepository;
4+
public interface HistoryRepository extends MongoRepository<History, String> {
5+
6+
7+
8+
9+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.programming.videoService.repository;
2+
3+
import org.springframework.data.mongodb.repository.MongoRepository;
4+
import com.programming.videoService.model.Report;
5+
public interface ReportRepository extends MongoRepository <Report, String> {
6+
7+
}

0 commit comments

Comments
 (0)