1+ from fastapi import FastAPI , UploadFile , File , HTTPException , Form
2+ from fastapi .middleware .cors import CORSMiddleware
3+ from fastapi .responses import JSONResponse
4+ from typing import List , Dict , Optional , Union , Any
5+ import numpy as np
6+ import librosa
7+ import soundfile as sf
8+ import io
9+ import tempfile
10+ import os
11+ import uuid
12+ from pydantic import BaseModel
13+ import random
14+
15+ app = FastAPI (title = "Audio Emotion Recognition API" )
16+
17+ # Configure CORS
18+ app .add_middleware (
19+ CORSMiddleware ,
20+ allow_origins = ["*" ],
21+ allow_credentials = True ,
22+ allow_methods = ["*" ],
23+ allow_headers = ["*" ],
24+ )
25+
26+ # Models for responses
27+ class EmotionScores (BaseModel ):
28+ angry : float
29+ sad : float
30+ fear : float
31+ happy : float
32+ neutral : float
33+
34+ class EmotionSegment (BaseModel ):
35+ start : float
36+ end : float
37+ mainEmotion : str
38+ emotions : EmotionScores
39+
40+ class PredictionResponse (BaseModel ):
41+ segments : List [EmotionSegment ]
42+ overallEmotion : str
43+ audioLength : float
44+
45+ # Audio chunking utilities
46+ def find_silence_points (audio_data : np .ndarray , sr : int ,
47+ min_silence_len : float = 0.3 ,
48+ silence_thresh : float = - 40 ) -> List [float ]:
49+ """
50+ Find potential chunking points at silence regions.
51+
52+ Args:
53+ audio_data: Audio signal
54+ sr: Sample rate
55+ min_silence_len: Minimum silence length in seconds
56+ silence_thresh: Threshold in dB for silence detection
57+
58+ Returns:
59+ List of timestamps (in seconds) for silence regions
60+ """
61+ # Convert to dB
62+ audio_db = librosa .amplitude_to_db (np .abs (audio_data ), ref = np .max )
63+
64+ # Find silence regions
65+ silence_mask = audio_db < silence_thresh
66+
67+ # Find silence regions of sufficient length
68+ min_samples = int (min_silence_len * sr )
69+ silence_regions = []
70+
71+ in_silence = False
72+ silence_start = 0
73+
74+ for i , is_silent in enumerate (silence_mask ):
75+ if is_silent and not in_silence :
76+ in_silence = True
77+ silence_start = i
78+ elif not is_silent and in_silence :
79+ in_silence = False
80+ silence_duration = i - silence_start
81+ if silence_duration >= min_samples :
82+ # Add the middle point of the silence as a possible chunk boundary
83+ silence_regions .append ((silence_start + silence_duration // 2 ) / sr )
84+
85+ return silence_regions
86+
87+ def chunk_audio (audio_data : np .ndarray , sr : int , max_chunk_len : int = 60 ,
88+ overlap : float = 0.5 ) -> List [Dict [str , Any ]]:
89+ """
90+ Chunk audio into segments optimally.
91+
92+ Args:
93+ audio_data: Audio signal
94+ sr: Sample rate
95+ max_chunk_len: Maximum chunk length in seconds
96+ overlap: Overlap between chunks in seconds
97+
98+ Returns:
99+ List of chunks with start and end times
100+ """
101+ audio_length = len (audio_data ) / sr
102+
103+ # If audio is already shorter than max_chunk_len, return as is
104+ if audio_length <= max_chunk_len :
105+ return [{"start" : 0 , "end" : audio_length , "audio" : audio_data }]
106+
107+ # Find silence points to use as natural chunk boundaries
108+ silence_points = find_silence_points (audio_data , sr )
109+
110+ chunks = []
111+ current_pos = 0
112+
113+ while current_pos < audio_length :
114+ # Calculate the ideal end position for this chunk
115+ ideal_end = min (current_pos + max_chunk_len , audio_length )
116+
117+ # Look for silence points near the ideal end
118+ best_end = ideal_end
119+
120+ if silence_points :
121+ # Find the closest silence point to the ideal end
122+ closest_silence = min (silence_points , key = lambda x : abs (x - ideal_end ))
123+
124+ # Use the silence point if it's reasonably close to the ideal end
125+ if abs (closest_silence - ideal_end ) < max_chunk_len * 0.2 : # Within 20% of chunk length
126+ best_end = closest_silence
127+
128+ # Extract the chunk
129+ chunk_start_samples = int (current_pos * sr )
130+ chunk_end_samples = int (best_end * sr )
131+ chunk_audio = audio_data [chunk_start_samples :chunk_end_samples ]
132+
133+ chunks .append ({
134+ "start" : current_pos ,
135+ "end" : best_end ,
136+ "audio" : chunk_audio
137+ })
138+
139+ # Move to next chunk with the specified overlap
140+ current_pos = best_end - overlap if best_end < audio_length else audio_length
141+
142+ return chunks
143+
144+ # ToDo: Yubee
145+ def predict_from_model (audio_chunk : np .ndarray , sr : int ) -> Dict [str , Any ]:
146+ """
147+ Mock function to simulate emotion predictions from a model.
148+ Will be replaced with actual model predictions.
149+
150+ Args:
151+ audio_chunk: Audio chunk data
152+ sr: Sample rate
153+
154+ Returns:
155+ Dictionary with emotion predictions
156+ """
157+ # Generate mock emotion scores that sum to 100
158+ emotions = {
159+ "angry" : random .randint (5 , 60 ),
160+ "sad" : random .randint (5 , 60 ),
161+ "fear" : random .randint (5 , 60 ),
162+ "happy" : random .randint (5 , 60 ),
163+ "neutral" : random .randint (5 , 70 )
164+ }
165+
166+ # Normalize to sum to 100
167+ total = sum (emotions .values ())
168+ for emotion in emotions :
169+ emotions [emotion ] = round ((emotions [emotion ] / total ) * 100 )
170+
171+ # Ensure they sum to 100 after rounding
172+ adjustment = 100 - sum (emotions .values ())
173+ emotions ["neutral" ] += adjustment
174+
175+ # Determine main emotion
176+ main_emotion = max (emotions , key = emotions .get )
177+
178+ return {
179+ "mainEmotion" : main_emotion ,
180+ "emotions" : emotions
181+ }
182+
183+ @app .post ("/predict" , response_model = PredictionResponse )
184+ async def predict (file : Optional [UploadFile ] = File (None ),
185+ recorded_audio : Optional [str ] = Form (None )):
186+ """
187+ Endpoint to predict emotions from uploaded or recorded audio.
188+
189+ Args:
190+ file: Uploaded audio file
191+ recorded_audio: Base64 encoded audio data from frontend recording
192+
193+ Returns:
194+ JSON with emotion predictions for each audio segment
195+ """
196+ if not file and not recorded_audio :
197+ raise HTTPException (status_code = 400 , detail = "No audio provided" )
198+
199+ try :
200+ # Process uploaded file
201+ if file :
202+ audio_data , sr = librosa .load (io .BytesIO (await file .read ()), sr = None )
203+ # Process recorded audio from frontend
204+ else :
205+ import base64
206+ audio_bytes = base64 .b64decode (recorded_audio .split (',' )[1 ] if ',' in recorded_audio else recorded_audio )
207+
208+ # Save to temporary file and read with librosa
209+ with tempfile .NamedTemporaryFile (suffix = '.wav' , delete = False ) as temp_file :
210+ temp_file .write (audio_bytes )
211+ temp_file_path = temp_file .name
212+
213+ audio_data , sr = librosa .load (temp_file_path , sr = None )
214+ os .unlink (temp_file_path ) # Clean up temp file
215+
216+ # Get audio length in seconds
217+ audio_length = len (audio_data ) / sr
218+
219+ # Chunk the audio
220+ chunks = chunk_audio (audio_data , sr )
221+
222+ # Process each chunk
223+ segments = []
224+ for chunk in chunks :
225+ # Get predictions for this chunk
226+ prediction = predict_from_model (chunk ["audio" ], sr )
227+
228+ # Create segment with start/end times and predictions
229+ segment = {
230+ "start" : chunk ["start" ],
231+ "end" : chunk ["end" ],
232+ "mainEmotion" : prediction ["mainEmotion" ],
233+ "emotions" : prediction ["emotions" ]
234+ }
235+ segments .append (segment )
236+
237+ # Determine overall emotion (most frequent or weighted average)
238+ emotion_counts = {}
239+ for segment in segments :
240+ emotion = segment ["mainEmotion" ]
241+ duration = segment ["end" ] - segment ["start" ]
242+ emotion_counts [emotion ] = emotion_counts .get (emotion , 0 ) + duration
243+
244+ overall_emotion = max (emotion_counts , key = emotion_counts .get )
245+
246+ return {
247+ "segments" : segments ,
248+ "overallEmotion" : overall_emotion ,
249+ "audioLength" : audio_length
250+ }
251+
252+ except Exception as e :
253+ raise HTTPException (status_code = 500 , detail = f"Error processing audio: { str (e )} " )
254+
255+ if __name__ == "__main__" :
256+ import uvicorn
257+ uvicorn .run (app , host = "0.0.0.0" , port = 8767 )
0 commit comments