11package com .programming .videoService .service ;
22
3+ import com .fasterxml .jackson .databind .ObjectMapper ;
34import com .mongodb .BasicDBObject ;
45import com .mongodb .DBObject ;
56import com .mongodb .client .gridfs .model .GridFSFile ;
89import com .programming .videoService .model .Report ;
910import com .programming .videoService .model .Subscription ;
1011import com .programming .videoService .model .Video ;
12+ import com .programming .videoService .model .Recommender ;
1113import com .programming .videoService .repository .HistoryRepository ;
1214import com .programming .videoService .repository .LikeRepository ;
15+ import com .programming .videoService .repository .RecommenderRepository ;
1316import com .programming .videoService .repository .SubscriptionRepository ;
1417
1518import org .apache .kafka .clients .producer .KafkaProducer ;
2932import org .springframework .data .mongodb .core .query .Update ;
3033import org .springframework .data .mongodb .gridfs .GridFsOperations ;
3134import org .springframework .data .mongodb .gridfs .GridFsTemplate ;
35+ import org .springframework .kafka .annotation .KafkaListener ;
36+ import org .springframework .kafka .core .KafkaTemplate ;
37+ import org .springframework .messaging .handler .annotation .Payload ;
3238import org .springframework .stereotype .Service ;
3339import org .springframework .web .multipart .MultipartFile ;
3440
5056import org .springframework .data .domain .Sort ;
5157import java .util .Properties ;
5258import java .util .Set ;
59+ import java .util .concurrent .CompletableFuture ;
60+ import java .util .concurrent .ExecutorService ;
61+ import java .util .concurrent .Executors ;
5362
5463import org .apache .kafka .common .serialization .StringSerializer ;
5564import org .apache .kafka .common .serialization .StringDeserializer ;
5665import java .time .Duration ;
66+ import java .util .Properties ;
67+
5768
5869@ Service
5970public class VideoService {
@@ -70,6 +81,9 @@ public class VideoService {
7081 private KafkaProducer <String , String > kafkaProducer ;
7182 private KafkaConsumer <String , String > kafkaConsumer ;
7283
84+ @ Autowired
85+ private KafkaTemplate <String , String > kafkaTemplate ;
86+
7387 public VideoService () {
7488 // Kafka Producer Configuration
7589 Properties producerProps = new Properties ();
@@ -447,7 +461,60 @@ public List<String> getThumbnailIdsByUserGenres(String userId) {
447461 // Trả về danh sách thumbnailId duy nhất
448462 return new ArrayList <>(uniqueThumbnailIds );
449463 }
450-
464+
465+
466+ private static final String KAFKA_TOPIC = "synonyms_topic" ;
467+ public void processVideoData (String userId , Map <String , Integer > videoData ) {
468+ try {
469+ // Đóng gói dữ liệu thành Map
470+ Map <String , Object > message = new HashMap <>();
471+ message .put ("userId" , userId );
472+ message .put ("videoData" , videoData );
473+
474+ // Sử dụng Jackson để chuyển đổi thành JSON
475+ ObjectMapper objectMapper = new ObjectMapper ();
476+ String jsonMessage = objectMapper .writeValueAsString (message );
477+
478+ // Gửi JSON đến Kafka
479+ kafkaTemplate .send (KAFKA_TOPIC , jsonMessage );
480+
481+ System .out .println ("Processing video data: " + jsonMessage );
482+ } catch (Exception e ) {
483+ System .err .println ("Error converting to JSON: " + e .getMessage ());
484+ }
485+ }
486+
487+
488+ @ Autowired
489+ private RecommenderRepository recommenderRepository ;
490+ @ KafkaListener (topics = "recommendation_topic" )
491+ public void consumeMessage (String message ) {
492+ try {
493+ // Manually convert the string to Recommender object
494+ JSONObject json = new JSONObject (message );
495+ Recommender recommender = new ObjectMapper ().readValue (json .toString (), Recommender .class );
496+
497+ // Process the recommender object as needed (you can also add more logic here if required)
498+ System .out .println ("Processing recommendation message: " + recommender );
499+
500+ // Save the recommender object to the database
501+ saveToDatabase (recommender );
502+ } catch (Exception e ) {
503+ logger .error ("Error processing recommendation message" , e );
504+ }
505+ }
506+
507+ private void saveToDatabase (Recommender recommender ) {
508+ try {
509+ // Assuming recommenderRepository is autowired and set up for the Recommender entity
510+ recommenderRepository .save (recommender );
511+ System .out .println ("Saved to DB: " + recommender );
512+ } catch (Exception e ) {
513+ System .err .println ("Error saving to DB: " + e .getMessage ());
514+ }
515+ }
516+
517+
451518
452519
453520 //hanlde Report
0 commit comments