11import os
2- from schemas .emotion_schema import GetEmotionPercentagesResponse
2+ from collections import defaultdict
3+ from schemas .emotion_schema import GetEmotionPercentagesResponse , TimelineEntry , EmotionAnalysisResponse
34from services .emotion_analysis .emotion_analysis_service import EmotionsAnalysisService
45import logging
56import coloredlogs
67from utils .utils import load_model , load_face_cascade , extract_features , predict_emotion , getPercentages
78import cv2
9+ import numpy as np
10+
11+ EMOTION_LABELS = {0 : 'Angry' , 1 : 'Disgusted' , 2 : 'Fearful' , 3 : 'Happy' , 4 : 'Neutral' , 5 : 'Sad' , 6 : 'Surprised' }
12+
13+
14+ def _format_time (seconds ):
15+ return f"{ int (seconds )// 60 :02d} :{ int (seconds )% 60 :02d} "
16+
17+
18+ def _get_dominant (preds ):
19+ """Return (dominant_label, avg_confidence_%) from list of (label, confidence) tuples."""
20+ if not preds :
21+ return None , 0.0
22+ counts , confs = defaultdict (int ), defaultdict (list )
23+ for label , conf in preds :
24+ counts [label ] += 1
25+ confs [label ].append (conf )
26+ dominant = max (counts , key = counts .get )
27+ return dominant , round (np .mean (confs [dominant ]) * 100 , 2 )
28+
29+
30+ def _build_timeline_fixed (timed_preds , interval_s , duration ):
31+ """Group predictions into fixed time windows, return dominant emotion per window."""
32+ if not timed_preds or duration <= 0 :
33+ return []
34+ num_windows = max (1 , int (duration / interval_s ) + (1 if duration % interval_s else 0 ))
35+ windows = defaultdict (list )
36+ for ts , label , conf in timed_preds :
37+ windows [min (int (ts / interval_s ), num_windows - 1 )].append ((label , conf ))
38+
39+ timeline = []
40+ for i in range (num_windows ):
41+ if i not in windows :
42+ continue
43+ dominant , avg_conf = _get_dominant (windows [i ])
44+ timeline .append (TimelineEntry (
45+ id = len (timeline ) + 1 ,
46+ starting_time = _format_time (i * interval_s ),
47+ ending_time = _format_time (min ((i + 1 ) * interval_s , duration )),
48+ emotion = dominant .upper (), value = avg_conf ,
49+ ))
50+ return timeline
51+
52+
53+ def _build_timeline_dynamic (timed_preds , duration ):
54+ """Segment timeline whenever the dominant emotion changes."""
55+ if not timed_preds or duration <= 0 :
56+ return []
57+ timeline , seg_preds = [], []
58+ cur_label , seg_start = timed_preds [0 ][1 ], timed_preds [0 ][0 ]
59+
60+ for ts , label , conf in timed_preds :
61+ if label != cur_label :
62+ _ , avg_conf = _get_dominant (seg_preds )
63+ timeline .append (TimelineEntry (
64+ id = len (timeline ) + 1 , starting_time = _format_time (seg_start ),
65+ ending_time = _format_time (ts ), emotion = cur_label .upper (), value = avg_conf ,
66+ ))
67+ cur_label , seg_start , seg_preds = label , ts , []
68+ seg_preds .append ((label , conf ))
69+
70+ if seg_preds :
71+ _ , avg_conf = _get_dominant (seg_preds )
72+ timeline .append (TimelineEntry (
73+ id = len (timeline ) + 1 , starting_time = _format_time (seg_start ),
74+ ending_time = _format_time (duration ), emotion = cur_label .upper (), value = avg_conf ,
75+ ))
76+ return timeline
77+
878
979class EmotionsAnalysisImp (EmotionsAnalysisService ):
1080 def __init__ (self , model_path : str ):
@@ -13,82 +83,71 @@ def __init__(self, model_path: str):
1383 coloredlogs .install (level = "INFO" , fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" )
1484 self .logger = logging .getLogger (__name__ )
1585
16- def get_emotion_percentages (self , video_path : str ) -> GetEmotionPercentagesResponse :
17- predictions = []
18- labels = {0 : 'Angry' , 1 : 'Disgusted' , 2 : 'Fearful' , 3 : 'Happy' , 4 : 'Neutral' , 5 : 'Sad' , 6 : 'Surprised' }
86+ def get_emotion_percentages (self , video_path : str , interval_s : int = 10 ) -> EmotionAnalysisResponse :
87+ predictions , timed_predictions = [], []
1988 self .logger .info (f"Loading video from path: { video_path } " )
2089
90+ empty = EmotionAnalysisResponse (
91+ emotions = GetEmotionPercentagesResponse (
92+ Angry = 0 , Disgusted = 0 , Fearful = 0 , Happy = 0 , Neutral = 0 , Sad = 0 , Surprised = 0 ),
93+ timeline = [],
94+ )
95+
2196 if not os .path .exists (video_path ):
2297 self .logger .error (f"Video file does not exist: { video_path } " )
2398 directory = os .path .dirname (video_path )
2499 if os .path .exists (directory ):
25- self .logger .info (f"Contents of the directory { directory } :" )
26- for item in os .listdir (directory ):
27- self .logger .info (f" - { item } " )
28- else :
29- self .logger .error (f"Directory does not exist: { directory } " )
30- return GetEmotionPercentagesResponse (Angry = 0 , Disgusted = 0 , Fearful = 0 , Happy = 0 , Neutral = 0 , Sad = 0 , Surprised = 0 )
100+ self .logger .info (f"Contents of { directory } : { os .listdir (directory )} " )
101+ return empty
31102
32103 video = cv2 .VideoCapture (video_path )
33104 if not video .isOpened ():
34105 self .logger .error (f"Failed to open video file: { video_path } " )
35- return GetEmotionPercentagesResponse ( Angry = 0 , Disgusted = 0 , Fearful = 0 , Happy = 0 , Neutral = 0 , Sad = 0 , Surprised = 0 )
106+ return empty
36107
108+ fps = video .get (cv2 .CAP_PROP_FPS )
109+ video_duration = int (video .get (cv2 .CAP_PROP_FRAME_COUNT )) / fps if fps > 0 else 0
37110 last_processed_second = - 1
38-
39-
40- frame_count = 0
41- processed_frames = 0
42- face_count = 0
111+ frame_count , face_count = 0 , 0
43112
44113 while True :
45114 ret , im = video .read ()
46115 if not ret :
47116 break
48-
49117 timestamp_ms = video .get (cv2 .CAP_PROP_POS_MSEC )
50- current_second = int (timestamp_ms / 500 ) # 2 frame per second
51-
118+ current_second = int (timestamp_ms / 500 ) # 2 frames per second
52119 if current_second == last_processed_second :
53120 continue
54121 last_processed_second = current_second
55-
56122 frame_count += 1
57-
58- processed_frames += 1
59123 gray = cv2 .cvtColor (im , cv2 .COLOR_BGR2GRAY )
60124 faces = self .face_cascade .detectMultiScale (gray , 1.3 , 5 )
61125 try :
62126 for (p , q , r , s ) in faces :
63127 face_count += 1
64- image = gray [q :q + s , p :p + r ]
65- image = cv2 . resize ( image , ( 48 , 48 ))
66- img = extract_features ( image )
67- pred = predict_emotion ( self . model , img )
68- prediction_label = labels [ pred . argmax ()]
69- self . logger . info ( f"Prediction for frame { frame_count } : { prediction_label } " )
70- predictions .append (prediction_label )
128+ image = cv2 . resize ( gray [q :q + s , p :p + r ], ( 48 , 48 ))
129+ pred = predict_emotion ( self . model , extract_features ( image ))
130+ pred_idx = pred . argmax ( )
131+ label = EMOTION_LABELS [ pred_idx ]
132+ conf = float ( pred [ 0 ][ pred_idx ])
133+ predictions . append ( label )
134+ timed_predictions .append (( timestamp_ms / 1000.0 , label , conf ) )
71135 except cv2 .error as e :
72136 self .logger .error (f"OpenCV error: { e } " )
73- pass
74137
75138 video .release ()
76-
77- self .logger .info (f"Total frames in video: { frame_count } " )
78- self .logger .info (f"Frames actually processed: { processed_frames } " )
79- self .logger .info (f"Total faces detected: { face_count } " )
139+ self .logger .info (f"Processed { frame_count } frames, { face_count } faces detected" )
80140
81141 if not predictions :
82142 self .logger .warning ("No faces detected or no predictions made." )
83143
84144 percentages = getPercentages (predictions )
85- self .logger .info (f"Percentages of emotions detected: { percentages } " )
86- return GetEmotionPercentagesResponse (
87- Angry = percentages ['Angry' ],
88- Disgusted = percentages ['Disgusted' ],
89- Fearful = percentages ['Fearful' ],
90- Happy = percentages ['Happy' ],
91- Neutral = percentages ['Neutral' ],
92- Sad = percentages ['Sad' ],
93- Surprised = percentages ['Surprised' ]
94- )
145+ emotions = GetEmotionPercentagesResponse (** percentages )
146+
147+ if interval_s == 0 :
148+ timeline = _build_timeline_dynamic (timed_predictions , video_duration )
149+ else :
150+ timeline = _build_timeline_fixed (timed_predictions , interval_s , video_duration )
151+ self .logger .info (f"Timeline: { len (timeline )} entries" )
152+
153+ return EmotionAnalysisResponse (emotions = emotions , timeline = timeline )
0 commit comments