|
| 1 | +"""Groq Face Whisper Integration Module |
| 2 | +
|
| 3 | +This module provides integration with Groq Face's Whisper ASR service for transcribing audio content |
| 4 | +in vCon recordings. It handles the transcription process, error retries, and updates vCon objects with |
| 5 | +transcription results. |
| 6 | +""" |
| 7 | + |
| 8 | +import base64 |
| 9 | +import hashlib |
| 10 | +import logging |
| 11 | +import tempfile |
| 12 | +import time |
| 13 | +import os |
| 14 | +from typing import Optional, Dict, Any, Union |
| 15 | + |
| 16 | +import requests |
| 17 | +from tenacity import ( |
| 18 | + RetryError, |
| 19 | + before_sleep_log, |
| 20 | + retry, |
| 21 | + stop_after_attempt, |
| 22 | + wait_exponential, |
| 23 | +) |
| 24 | +from groq import Groq |
| 25 | + |
| 26 | +from lib.error_tracking import init_error_tracker |
| 27 | +from lib.logging_utils import init_logger |
| 28 | +from lib.metrics import init_metrics, stats_gauge, stats_count |
| 29 | +from server.lib.vcon_redis import VconRedis |
| 30 | + |
| 31 | +# Initialize services |
| 32 | +init_error_tracker() |
| 33 | +init_metrics() |
| 34 | +logger = init_logger(__name__) |
| 35 | + |
| 36 | +# Default configuration for the Whisper service |
| 37 | +default_options = { |
| 38 | + "minimum_duration": 30, # Minimum duration in seconds for audio to be transcribed |
| 39 | + "API_KEY": os.environ.get("GROQ_API_KEY", "YOUR_GROQ_API_KEY"), # IMPORTANT: Replace with actual API key in environment variables |
| 40 | + "Content-Type": "audio/flac", |
| 41 | +} |
| 42 | + |
| 43 | + |
| 44 | +def get_transcription(vcon: Any, index: int) -> Optional[dict]: |
| 45 | + """Retrieve existing transcription for a dialog at specified index. |
| 46 | +
|
| 47 | + Args: |
| 48 | + vcon: The vCon object containing the dialog |
| 49 | + index (int): Index of the dialog to check |
| 50 | +
|
| 51 | + Returns: |
| 52 | + Optional[dict]: The transcription analysis if found, None otherwise |
| 53 | + """ |
| 54 | + for a in vcon.analysis: |
| 55 | + if a["dialog"] == index and a["type"] == "transcript": |
| 56 | + return a |
| 57 | + return None |
| 58 | + |
| 59 | + |
| 60 | +def get_file_content(dialog: dict) -> bytes: |
| 61 | + """Get file content from either inline or external reference. |
| 62 | +
|
| 63 | + Args: |
| 64 | + dialog (dict): Dialog object containing file information |
| 65 | +
|
| 66 | + Returns: |
| 67 | + bytes: The file content |
| 68 | +
|
| 69 | + Raises: |
| 70 | + Exception: If file cannot be retrieved or verified |
| 71 | + """ |
| 72 | + if "body" in dialog: |
| 73 | + # body contains the base64 encoded content. Decode and return |
| 74 | + return base64.b64decode(dialog["body"]) |
| 75 | + |
| 76 | + elif "url" in dialog: |
| 77 | + # Handle external file |
| 78 | + response = requests.get(dialog["url"], verify=True) |
| 79 | + if response.status_code != 200: |
| 80 | + raise Exception(f"Failed to download file from {dialog['url']}") |
| 81 | + |
| 82 | + content = response.content |
| 83 | + |
| 84 | + # Verify file integrity if signature is provided |
| 85 | + if "signature" in dialog and "alg" in dialog: |
| 86 | + if dialog["alg"] == "SHA-512": |
| 87 | + file_hash = base64.urlsafe_b64encode( |
| 88 | + hashlib.sha512(content).digest()).decode('utf-8') |
| 89 | + if file_hash != dialog["signature"]: |
| 90 | + raise Exception("File signature verification failed") |
| 91 | + else: |
| 92 | + raise Exception(f"Unsupported hash algorithm: {dialog['alg']}") |
| 93 | + |
| 94 | + return content |
| 95 | + else: |
| 96 | + raise Exception("Dialog contains neither inline body nor external URL") |
| 97 | + |
| 98 | + |
| 99 | +@retry( |
| 100 | + wait=wait_exponential(multiplier=2, min=12, max=100), |
| 101 | + stop=stop_after_attempt(6), |
| 102 | + before_sleep=before_sleep_log(logger, logging.INFO), |
| 103 | +) |
| 104 | +def transcribe_groq_whisper(dialog: dict, opts: dict) -> Union[Dict[str, Any], Any]: |
| 105 | + """Send audio to Groq Whisper API for transcription using the Groq Python library. |
| 106 | +
|
| 107 | + Args: |
| 108 | + dialog (dict): Dialog object containing the audio file information |
| 109 | + opts (dict): Configuration options including API credentials and settings |
| 110 | +
|
| 111 | + Returns: |
| 112 | + Union[Dict[str, Any], Any]: Transcription result from the API, which may be a dict |
| 113 | + or a Groq library response object |
| 114 | +
|
| 115 | + Raises: |
| 116 | + RetryError: If all retry attempts fail |
| 117 | + """ |
| 118 | + # Get file content handling both inline and external references |
| 119 | + content = get_file_content(dialog) |
| 120 | + |
| 121 | + # Write content to temporary file |
| 122 | + with tempfile.NamedTemporaryFile(suffix='.flac', delete=True) as temp_file: |
| 123 | + temp_file.write(content) |
| 124 | + temp_file.flush() |
| 125 | + |
| 126 | + # Initialize Groq client with the API key |
| 127 | + client = Groq(api_key=opts['API_KEY']) |
| 128 | + |
| 129 | + # Open the audio file for the API request |
| 130 | + with open(temp_file.name, 'rb') as audio_file: |
| 131 | + # Make the transcription request using the Groq client |
| 132 | + response = client.audio.transcriptions.create( |
| 133 | + file=audio_file, |
| 134 | + model="distil-whisper-large-v3-en", |
| 135 | + response_format="verbose_json" |
| 136 | + ) |
| 137 | + |
| 138 | + # Return the response (could be a dict or an object depending on Groq library version) |
| 139 | + return response |
| 140 | + |
| 141 | + |
| 142 | +def run( |
| 143 | + vcon_uuid: str, |
| 144 | + link_name: str, |
| 145 | + opts: dict = default_options, |
| 146 | +) -> Optional[str]: |
| 147 | + """Process a vCon object through the Whisper transcription service. |
| 148 | +
|
| 149 | + This function: |
| 150 | + 1. Retrieves the vCon from Redis |
| 151 | + 2. Processes each recording dialog that meets the minimum duration requirement |
| 152 | + 3. Skips already transcribed dialogs |
| 153 | + 4. Adds transcription results as analysis entries |
| 154 | + 5. Updates the vCon in Redis |
| 155 | +
|
| 156 | + Args: |
| 157 | + vcon_uuid (str): UUID of the vCon to process |
| 158 | + link_name (str): Name of the link (unused but required for plugin interface) |
| 159 | + opts (dict): Optional configuration overrides |
| 160 | +
|
| 161 | + Returns: |
| 162 | + Optional[str]: The vcon_uuid if processing should continue, None to stop chain |
| 163 | + """ |
| 164 | + # Merge provided options with defaults |
| 165 | + merged_opts = default_options.copy() |
| 166 | + merged_opts.update(opts) |
| 167 | + opts = merged_opts |
| 168 | + |
| 169 | + logger.info("Starting whisper plugin for vCon: %s", vcon_uuid) |
| 170 | + |
| 171 | + vcon_redis = VconRedis() |
| 172 | + vCon = vcon_redis.get_vcon(vcon_uuid) |
| 173 | + |
| 174 | + for index, dialog in enumerate(vCon.dialog): |
| 175 | + # Skip non-recording dialogs |
| 176 | + if dialog["type"] != "recording": |
| 177 | + logger.info( |
| 178 | + "whisper plugin: skipping non-recording dialog %s in vCon: %s", |
| 179 | + index, |
| 180 | + vCon.uuid, |
| 181 | + ) |
| 182 | + continue |
| 183 | + |
| 184 | + # Skip short recordings |
| 185 | + if int(dialog["duration"]) < opts["minimum_duration"]: |
| 186 | + logger.info("Skipping short recording dialog %s in vCon: %s", |
| 187 | + index, vCon.uuid) |
| 188 | + continue |
| 189 | + |
| 190 | + # Skip already transcribed dialogs |
| 191 | + if get_transcription(vCon, index): |
| 192 | + logger.info("Dialog %s already transcribed on vCon: %s", index, |
| 193 | + vCon.uuid) |
| 194 | + continue |
| 195 | + |
| 196 | + try: |
| 197 | + # Attempt transcription with timing metrics |
| 198 | + start = time.time() |
| 199 | + logger.debug("Transcribing dialog %s in vCon: %s", index, |
| 200 | + vCon.uuid) |
| 201 | + result = transcribe_groq_whisper(dialog, opts) |
| 202 | + stats_gauge("conserver.link.groq_whisper.transcription_time", |
| 203 | + time.time() - start) |
| 204 | + except RetryError as re: |
| 205 | + logger.error( |
| 206 | + "Failed to transcribe vCon %s after multiple retry attempts: %s", |
| 207 | + vcon_uuid, re) |
| 208 | + stats_count("conserver.link.groq_whisper.transcription_failures") |
| 209 | + break |
| 210 | + except Exception as e: |
| 211 | + logger.error( |
| 212 | + "Unexpected error transcribing vCon %s: %s", |
| 213 | + vcon_uuid, e) |
| 214 | + stats_count("conserver.link.groq_whisper.transcription_failures") |
| 215 | + break |
| 216 | + |
| 217 | + if not result: |
| 218 | + logger.warning("No transcription generated for vCon %s", vcon_uuid) |
| 219 | + stats_count( |
| 220 | + "conserver.link.groq_whisper.transcription_failures") |
| 221 | + break |
| 222 | + |
| 223 | + logger.info("Transcribed vCon: %s", vCon.uuid) |
| 224 | + logger.info(result) |
| 225 | + |
| 226 | + # Handle different response formats from the Groq API |
| 227 | + # The result could be a dict, an object with model_dump method, or something else |
| 228 | + transcription_data = result |
| 229 | + if hasattr(result, 'model_dump'): |
| 230 | + transcription_data = result.model_dump() |
| 231 | + elif not isinstance(result, dict): |
| 232 | + transcription_data = { |
| 233 | + "text": str(result), |
| 234 | + "raw_response": str(result) |
| 235 | + } |
| 236 | + |
| 237 | + # Prepare vendor schema without sensitive data |
| 238 | + vendor_schema = { |
| 239 | + "opts": { |
| 240 | + k: v |
| 241 | + for k, v in opts.items() if k != "API_KEY" |
| 242 | + } |
| 243 | + } |
| 244 | + |
| 245 | + # Add transcription analysis to vCon |
| 246 | + vCon.add_analysis( |
| 247 | + type="transcript", |
| 248 | + dialog=index, |
| 249 | + vendor="groq_whisper", |
| 250 | + body=transcription_data, |
| 251 | + extra={ |
| 252 | + "vendor_schema": vendor_schema, |
| 253 | + }, |
| 254 | + ) |
| 255 | + |
| 256 | + # Store updated vCon |
| 257 | + vcon_redis.store_vcon(vCon) |
| 258 | + |
| 259 | + logger.info("Finished groq_whisper plugin for vCon: %s", vcon_uuid) |
| 260 | + return vcon_uuid |
0 commit comments