|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import asyncio |
| 4 | +import os |
| 5 | +import time |
| 6 | +from dataclasses import dataclass |
| 7 | +from pathlib import Path |
| 8 | +from typing import Mapping |
| 9 | + |
| 10 | +from azure.core.exceptions import AzureError |
| 11 | +from azure.identity import CredentialUnavailableError, DefaultAzureCredential |
| 12 | +from azure.storage.blob import ContentSettings |
| 13 | +from azure.storage.blob.aio import BlobServiceClient, ContainerClient |
| 14 | + |
| 15 | +from src import logger |
| 16 | + |
| 17 | + |
| 18 | +class BlobUploadError(RuntimeError): |
| 19 | + """Raised when uploading audio to Azure Blob Storage fails.""" |
| 20 | + |
| 21 | + |
| 22 | +@dataclass(frozen=True) |
| 23 | +class BlobStorageConfig: |
| 24 | + """Configuration describing how to reach the target container.""" |
| 25 | + |
| 26 | + connection_string: str |
| 27 | + container_name: str |
| 28 | + prefix: str |
| 29 | + |
| 30 | + @classmethod |
| 31 | + def from_env(cls) -> "BlobStorageConfig": |
| 32 | + connection_string = ( |
| 33 | + os.getenv("AUDIO_BLOB_CONNECTION_STRING") |
| 34 | + or os.getenv("AZURE_BLOB_CONNECTION_STRING") |
| 35 | + or os.getenv("AZURE_STORAGE_CONNECTION_STRING") |
| 36 | + ) |
| 37 | + if not connection_string: |
| 38 | + raise BlobUploadError( |
| 39 | + "AUDIO_BLOB_CONNECTION_STRING environment variable must be defined " |
| 40 | + "with the Azure Storage connection string." |
| 41 | + ) |
| 42 | + |
| 43 | + container_name = ( |
| 44 | + os.getenv("AUDIO_BLOB_CONTAINER_NAME") |
| 45 | + or os.getenv("AZURE_BLOB_CONTAINER_NAME") |
| 46 | + ) |
| 47 | + |
| 48 | + if not container_name: |
| 49 | + raise BlobUploadError( |
| 50 | + "AUDIO_BLOB_CONTAINER_NAME environment variable must be defined " |
| 51 | + "with the target container name." |
| 52 | + ) |
| 53 | + |
| 54 | + raw_prefix = os.getenv("AUDIO_BLOB_PREFIX") |
| 55 | + prefix = raw_prefix.strip("/") if raw_prefix else "transcriptions" |
| 56 | + return cls(connection_string=connection_string, container_name=container_name, prefix=prefix) |
| 57 | + |
| 58 | + |
| 59 | +_config_lock = asyncio.Lock() |
| 60 | +_container_client: ContainerClient | None = None |
| 61 | +_service_client: BlobServiceClient | None = None |
| 62 | +_config: BlobStorageConfig | None = None |
| 63 | + |
| 64 | + |
| 65 | +async def _initialise_clients() -> None: |
| 66 | + global _container_client, _service_client, _config |
| 67 | + |
| 68 | + cfg = BlobStorageConfig.from_env() |
| 69 | + try: |
| 70 | + service_client = BlobServiceClient.from_connection_string(cfg.connection_string) |
| 71 | + except Exception as exc: |
| 72 | + logger.error("Failed to create BlobServiceClient or ContainerClient: %s. Trying connection from App Service", exc) |
| 73 | + service_client = BlobServiceClient( |
| 74 | + os.getenv("AZURE_STORAGEBLOB_RESOURCEENDPOINT", ""), |
| 75 | + credential=DefaultAzureCredential() #type: ignore |
| 76 | + ) |
| 77 | + finally: |
| 78 | + container_client = service_client.get_container_client(cfg.container_name) |
| 79 | + |
| 80 | + try: |
| 81 | + await container_client.get_container_properties() |
| 82 | + except AzureError as exc: # pragma: no cover - network assert |
| 83 | + await container_client.close() |
| 84 | + await service_client.close() |
| 85 | + raise BlobUploadError( |
| 86 | + f"Failed to connect to blob container '{cfg.container_name}': {exc!s}" |
| 87 | + ) from exc |
| 88 | + |
| 89 | + _service_client = service_client |
| 90 | + _container_client = container_client |
| 91 | + _config = cfg |
| 92 | + |
| 93 | + |
| 94 | +async def _get_container_client() -> ContainerClient: |
| 95 | + global _container_client |
| 96 | + |
| 97 | + if _container_client is None: |
| 98 | + async with _config_lock: |
| 99 | + if _container_client is None: |
| 100 | + await _initialise_clients() |
| 101 | + if _container_client is None: # pragma: no cover - defensive |
| 102 | + raise BlobUploadError("Blob container client is not initialised") |
| 103 | + return _container_client |
| 104 | + |
| 105 | + |
| 106 | +async def ensure_blob_storage_ready() -> None: |
| 107 | + """Establish the container connection to surface configuration issues early.""" |
| 108 | + |
| 109 | + await _get_container_client() |
| 110 | + |
| 111 | + |
| 112 | +def _normalise_segment(value: str | None, fallback: str) -> str: |
| 113 | + candidate = (value or fallback).strip().lower().replace(" ", "-") |
| 114 | + return candidate or fallback |
| 115 | + |
| 116 | + |
| 117 | +def _build_blob_name(filename: str | None, prefix: str, language: str | None) -> str: |
| 118 | + safe_name = Path(filename or "audio.bin").name |
| 119 | + timestamp = int(time.time() * 1000) |
| 120 | + lang_segment = _normalise_segment(language, "unknown") |
| 121 | + segments = [segment for segment in (prefix, lang_segment, f"{timestamp}_{safe_name}") if segment] |
| 122 | + return "/".join(segments) |
| 123 | + |
| 124 | + |
| 125 | +async def upload_transcription_audio( |
| 126 | + data: bytes, |
| 127 | + *, |
| 128 | + filename: str | None, |
| 129 | + content_type: str | None, |
| 130 | + language: str | None, |
| 131 | + metadata: Mapping[str, str] | None = None, |
| 132 | +) -> str: |
| 133 | + """Upload the raw audio payload prior to transcription.""" |
| 134 | + |
| 135 | + if not data: |
| 136 | + raise BlobUploadError("Cannot upload empty audio payload to blob storage.") |
| 137 | + |
| 138 | + container = await _get_container_client() |
| 139 | + cfg = _config or BlobStorageConfig.from_env() |
| 140 | + blob_name = _build_blob_name(filename, cfg.prefix, language) |
| 141 | + |
| 142 | + content_settings = ContentSettings(content_type=content_type) if content_type else None |
| 143 | + safe_metadata: dict[str, str] = dict(metadata or {}) |
| 144 | + |
| 145 | + try: |
| 146 | + await container.upload_blob( |
| 147 | + name=blob_name, |
| 148 | + data=data, |
| 149 | + overwrite=True, |
| 150 | + metadata=safe_metadata, |
| 151 | + content_settings=content_settings, |
| 152 | + ) |
| 153 | + except AzureError as exc: |
| 154 | + logger.exception("Failed to upload transcription audio to blob storage: %s", exc) |
| 155 | + raise BlobUploadError("Uploading audio to blob storage failed.") from exc |
| 156 | + |
| 157 | + logger.info("Uploaded transcription audio blob '%s'", blob_name) |
| 158 | + return blob_name |
| 159 | + |
| 160 | + |
| 161 | +async def close_blob_storage() -> None: |
| 162 | + """Release Azure clients to avoid resource leaks on shutdown.""" |
| 163 | + |
| 164 | + global _container_client, _service_client, _config |
| 165 | + |
| 166 | + if _container_client is not None: |
| 167 | + await _container_client.close() |
| 168 | + _container_client = None |
| 169 | + if _service_client is not None: |
| 170 | + await _service_client.close() |
| 171 | + _service_client = None |
| 172 | + _config = None |
0 commit comments