diff --git a/desktop/.env.example b/desktop/.env.example index 87d0a94fb7..6c25ff1689 100644 --- a/desktop/.env.example +++ b/desktop/.env.example @@ -17,9 +17,6 @@ # Production: https://api.omi.me OMI_API_URL=http://localhost:8080 -# DeepGram API key — required for real-time transcription -DEEPGRAM_API_KEY= - # ─── AI (optional) ────────────────────────────────────────────────── # Gemini API key for proactive assistants and embeddings # Falls back to backend-side processing if not set diff --git a/desktop/CHANGELOG.json b/desktop/CHANGELOG.json index 614cda711d..e9ad191f41 100644 --- a/desktop/CHANGELOG.json +++ b/desktop/CHANGELOG.json @@ -1,5 +1,7 @@ { - "unreleased": [], + "unreleased": [ + "Removed client-side Deepgram API key — transcription now routes securely through the Omi backend" + ], "releases": [ { "version": "0.11.90", diff --git a/desktop/Desktop/Sources/AppState.swift b/desktop/Desktop/Sources/AppState.swift index 862bfaf070..474fec6ceb 100644 --- a/desktop/Desktop/Sources/AppState.swift +++ b/desktop/Desktop/Sources/AppState.swift @@ -135,13 +135,16 @@ class AppState: ObservableObject { // Transcription services private var audioCaptureService: AudioCaptureService? - private var transcriptionService: TranscriptionService? + private var transcriptionService: BackendTranscriptionService? private var systemAudioCaptureService: Any? // SystemAudioCaptureService (macOS 14.4+) private var audioMixer: AudioMixer? private var vadGateService: VADGateService? - // Batch transcription mode + // Batch transcription mode (disabled — backend handles everything via /v4/listen) private var useBatchTranscription: Bool = false + // When true, backend owns conversation creation via /v4/listen lifecycle manager. + // Desktop skips createConversationFromSegments() to avoid duplicates. + private var backendOwnsConversation: Bool = false private var recordingStartCATime: Double = 0 // CACurrentMediaTime at recording start // Speaker segments for diarized transcription (sliding window — older segments are in SQLite) @@ -430,12 +433,7 @@ class AppState: ObservableObject { } } - // Log final state of important keys - if getenv("DEEPGRAM_API_KEY") != nil { - log("DEEPGRAM_API_KEY is set") - } else { - log("WARNING: DEEPGRAM_API_KEY is NOT set") - } + // DEEPGRAM_API_KEY no longer needed — STT routed through backend /v4/listen } private func shouldSkipBundledAnthropicKey(key: String, sourcePath: String, bundledEnvPath: String?) -> Bool { @@ -1152,165 +1150,143 @@ class AppState: ObservableObject { } } - do { - // Get effective language from settings (handles auto-detect vs single language) - let effectiveLanguage = AssistantSettings.shared.effectiveTranscriptionLanguage - let vocabulary = AssistantSettings.shared.effectiveVocabulary - log("Transcription: Using language=\(effectiveLanguage) (autoDetect=\(AssistantSettings.shared.transcriptionAutoDetect), selected=\(AssistantSettings.shared.transcriptionLanguage))") - log("Transcription: Custom vocabulary: \(vocabulary.joined(separator: ", "))") - - // Determine transcription mode - useBatchTranscription = AssistantSettings.shared.batchTranscriptionEnabled && effectiveSource == .microphone - - if !useBatchTranscription { - // Streaming mode: initialize WebSocket transcription service - transcriptionService = try TranscriptionService(language: effectiveLanguage, vocabulary: vocabulary) + // Get effective language from settings (handles auto-detect vs single language) + let effectiveLanguage = AssistantSettings.shared.effectiveTranscriptionLanguage + let vocabulary = AssistantSettings.shared.effectiveVocabulary + log("Transcription: Using language=\(effectiveLanguage) (autoDetect=\(AssistantSettings.shared.transcriptionAutoDetect), selected=\(AssistantSettings.shared.transcriptionLanguage))") + log("Transcription: Custom vocabulary: \(vocabulary.joined(separator: ", "))") + + // Always use streaming mode through the backend — batch mode not needed + // (backend handles STT, diarization, and memory creation server-side) + useBatchTranscription = false + // Backend owns conversation creation via /v4/listen lifecycle manager + backendOwnsConversation = true + + // Set conversation source based on audio source + let sourceValue: String + if effectiveSource == .bleDevice, let device = DeviceProvider.shared.connectedDevice { + currentConversationSource = ConversationSource.from(deviceType: device.type) + recordingInputDeviceName = device.displayName + sourceValue = currentConversationSource.rawValue + } else { + currentConversationSource = .desktop + recordingInputDeviceName = AudioCaptureService.getCurrentMicrophoneName() + sourceValue = "desktop" + } + + transcriptionService = BackendTranscriptionService(language: effectiveLanguage, source: sourceValue) + + // Initialize audio services based on source + if effectiveSource == .microphone { + // Initialize audio capture service + audioCaptureService = AudioCaptureService() + + // Initialize audio mixer for combining mic and system audio + audioMixer = AudioMixer() + + // VAD gate is optional for streaming mode (silence gating) + if AssistantSettings.shared.vadGateEnabled { + let gate = VADGateService() + vadGateService = gate + log("Transcription: VAD gate enabled") } else { - log("Transcription: Batch mode enabled — skipping WebSocket") + vadGateService = nil + } + + // Initialize system audio capture if supported (macOS 14.4+) + // Can be disabled via: defaults write com.omi.desktop-dev disableSystemAudioCapture -bool true + // or: defaults write com.omi.computer-macos disableSystemAudioCapture -bool true + let systemAudioDisabled = UserDefaults.standard.bool(forKey: "disableSystemAudioCapture") + if systemAudioDisabled { + log("Transcription: System audio capture DISABLED by user preference (disableSystemAudioCapture)") + } else if #available(macOS 14.4, *) { + systemAudioCaptureService = SystemAudioCaptureService() + log("Transcription: System audio capture initialized (macOS 14.4+)") + } else { + log("Transcription: System audio capture not available (requires macOS 14.4+)") } + } + // For BLE device, BleAudioService will be used in startAudioCapture - // Set conversation source based on audio source - if effectiveSource == .bleDevice, let device = DeviceProvider.shared.connectedDevice { - currentConversationSource = ConversationSource.from(deviceType: device.type) - recordingInputDeviceName = device.displayName - } else { - currentConversationSource = .desktop - recordingInputDeviceName = AudioCaptureService.getCurrentMicrophoneName() - } - - // Initialize audio services based on source - if effectiveSource == .microphone { - // Initialize audio capture service - audioCaptureService = AudioCaptureService() - - // Initialize audio mixer for combining mic and system audio - audioMixer = AudioMixer() - - // VAD gate is always needed for batch mode (chunk boundaries), - // and optional for streaming mode (silence gating) - if useBatchTranscription || AssistantSettings.shared.vadGateEnabled { - let gate = VADGateService() - if useBatchTranscription && !gate.modelAvailable { - // Batch mode requires working VAD — fall back to streaming - log("Transcription: VAD models unavailable, falling back from batch to streaming mode") - useBatchTranscription = false - vadGateService = nil - transcriptionService = try TranscriptionService(language: effectiveLanguage, vocabulary: vocabulary) - } else { - vadGateService = gate - log("Transcription: VAD gate enabled\(useBatchTranscription ? " (batch mode)" : "")") - } - } else { - vadGateService = nil + // Start backend transcription service, then audio on connect + transcriptionService?.start( + onTranscript: { [weak self] segment in + Task { @MainActor in + self?.handleTranscriptSegment(segment) } - - // Initialize system audio capture if supported (macOS 14.4+) - // Can be disabled via: defaults write com.omi.desktop-dev disableSystemAudioCapture -bool true - // or: defaults write com.omi.computer-macos disableSystemAudioCapture -bool true - let systemAudioDisabled = UserDefaults.standard.bool(forKey: "disableSystemAudioCapture") - if systemAudioDisabled { - log("Transcription: System audio capture DISABLED by user preference (disableSystemAudioCapture)") - } else if #available(macOS 14.4, *) { - systemAudioCaptureService = SystemAudioCaptureService() - log("Transcription: System audio capture initialized (macOS 14.4+)") - } else { - log("Transcription: System audio capture not available (requires macOS 14.4+)") + }, + onError: { [weak self] error in + Task { @MainActor in + logError("Transcription error", error: error) + AnalyticsManager.shared.recordingError(error: error.localizedDescription) + self?.stopTranscription() } - } - // For BLE device, BleAudioService will be used in startAudioCapture - - if useBatchTranscription { - // Batch mode: start audio capture directly (no WebSocket to wait for) - recordingStartCATime = CACurrentMediaTime() - Task { @MainActor [weak self] in + }, + onConnected: { [weak self] in + Task { @MainActor in + log("Transcription: Connected to backend") + // Start audio capture once connected await self?.startAudioCapture(source: effectiveSource) } - } else { - // Streaming mode: start transcription service first, then audio on connect - transcriptionService?.start( - onTranscript: { [weak self] segment in - Task { @MainActor in - self?.handleTranscriptSegment(segment) - } - }, - onError: { [weak self] error in - Task { @MainActor in - logError("Transcription error", error: error) - AnalyticsManager.shared.recordingError(error: error.localizedDescription) - self?.stopTranscription() - } - }, - onConnected: { [weak self] in - Task { @MainActor in - log("Transcription: Connected to DeepGram") - // Start audio capture once connected - await self?.startAudioCapture(source: effectiveSource) - } - }, - onDisconnected: { - log("Transcription: Disconnected from DeepGram") - } - ) + }, + onDisconnected: { + log("Transcription: Disconnected from backend") } + ) - isTranscribing = true - AssistantSettings.shared.transcriptionEnabled = true - audioSource = effectiveSource - currentTranscript = "" - speakerSegments = [] - totalSegmentCount = 0 - totalWordCount = 0 - liveSpeakerPersonMap = [:] - LiveTranscriptMonitor.shared.clear() - recordingStartTime = Date() - AudioLevelMonitor.shared.reset() - RecordingTimer.shared.start() + isTranscribing = true + AssistantSettings.shared.transcriptionEnabled = true + audioSource = effectiveSource + currentTranscript = "" + speakerSegments = [] + totalSegmentCount = 0 + totalWordCount = 0 + liveSpeakerPersonMap = [:] + LiveTranscriptMonitor.shared.clear() + recordingStartTime = Date() + AudioLevelMonitor.shared.reset() + RecordingTimer.shared.start() - log("Transcription: Using source: \(effectiveSource.rawValue), device: \(recordingInputDeviceName ?? "Unknown")") + log("Transcription: Using source: \(effectiveSource.rawValue), device: \(recordingInputDeviceName ?? "Unknown")") - // Create crash-safe DB session for persistence - Task { - do { - let sessionId = try await TranscriptionStorage.shared.startSession( - source: currentConversationSource.rawValue, - language: effectiveLanguage, - timezone: TimeZone.current.identifier, - inputDeviceName: recordingInputDeviceName - ) - await MainActor.run { - self.currentSessionId = sessionId - // Start live notes session - LiveNotesMonitor.shared.startSession(sessionId: sessionId) - } - log("Transcription: Created DB session \(sessionId)") - } catch { - logError("Transcription: Failed to create DB session", error: error) - // Non-fatal - continue recording even if DB fails + // Create crash-safe DB session for persistence + Task { + do { + let sessionId = try await TranscriptionStorage.shared.startSession( + source: currentConversationSource.rawValue, + language: effectiveLanguage, + timezone: TimeZone.current.identifier, + inputDeviceName: recordingInputDeviceName + ) + await MainActor.run { + self.currentSessionId = sessionId + // Start live notes session + LiveNotesMonitor.shared.startSession(sessionId: sessionId) } + log("Transcription: Created DB session \(sessionId)") + } catch { + logError("Transcription: Failed to create DB session", error: error) + // Non-fatal - continue recording even if DB fails } + } - // Start 4-hour max recording timer - maxRecordingTimer = Timer.scheduledTimer(withTimeInterval: maxRecordingDuration, repeats: false) { [weak self] _ in - Task { @MainActor in - guard let self = self, self.isTranscribing else { return } - log("Transcription: 4-hour limit reached - finalizing conversation") - _ = await self.finalizeConversation() - // Start a new recording session automatically - self.stopAudioCapture() - self.clearTranscriptionState() - self.startTranscription() - } + // Start 4-hour max recording timer + maxRecordingTimer = Timer.scheduledTimer(withTimeInterval: maxRecordingDuration, repeats: false) { [weak self] _ in + Task { @MainActor in + guard let self = self, self.isTranscribing else { return } + log("Transcription: 4-hour limit reached - finalizing conversation") + _ = await self.finalizeConversation() + // Start a new recording session automatically + self.stopAudioCapture() + self.clearTranscriptionState() + self.startTranscription() } + } - // Track transcription started - AnalyticsManager.shared.transcriptionStarted() - - log("Transcription: Starting...") + // Track transcription started + AnalyticsManager.shared.transcriptionStarted() - } catch { - AnalyticsManager.shared.recordingError(error: error.localizedDescription) - showAlert(title: "Transcription Error", message: error.localizedDescription) - } + log("Transcription: Starting...") } /// Start audio capture and pipe to transcription service @@ -1330,23 +1306,12 @@ class AppState: ObservableObject { guard let audioCaptureService = audioCaptureService, let audioMixer = audioMixer else { return } - // Start the audio mixer - it will send stereo audio to transcription service - // Branch on batch vs streaming mode - audioMixer.start { [weak self] stereoData in + // Start the audio mixer in mono mode — backend handles diarization server-side + audioMixer.start(outputMode: .mono) { [weak self] monoData in guard let self = self else { return } - if self.useBatchTranscription { - // Batch mode: accumulate audio in VAD gate, transcribe on silence - guard let gate = self.vadGateService else { return } - let output = gate.processAudioBatch(stereoData) - if output.isComplete, let audioBuffer = output.audioBuffer { - let wallStartTime = output.speechStartWallTime - Task { @MainActor [weak self] in - await self?.batchTranscribeChunk(audioBuffer: audioBuffer, wallStartTime: wallStartTime) - } - } - } else if let gate = self.vadGateService { + if let gate = self.vadGateService { // Streaming mode with VAD gate - let output = gate.processAudio(stereoData) + let output = gate.processAudio(monoData) if !output.audioToSend.isEmpty { self.transcriptionService?.sendAudio(output.audioToSend) } else if gate.needsKeepalive() { @@ -1357,7 +1322,7 @@ class AppState: ObservableObject { } } else { // Streaming mode without VAD gate - self.transcriptionService?.sendAudio(stereoData) + self.transcriptionService?.sendAudio(monoData) } } @@ -1411,10 +1376,12 @@ class AppState: ObservableObject { return } - // Start BLE audio processing and pipe directly to transcription + // Start BLE audio processing and pipe mono PCM directly to backend transcription await BleAudioService.shared.startProcessing( from: connection, - transcriptionService: transcriptionService, + audioSink: { [weak transcriptionService] pcmData in + transcriptionService?.sendAudio(pcmData) + }, audioDataHandler: { _ in // Audio level is updated by BleAudioService Task { @MainActor in @@ -2095,6 +2062,19 @@ class AppState: ObservableObject { log("Transcription: Finalizing conversation with \(segmentsToUpload.count) segments") + // When backend owns conversation creation (via /v4/listen lifecycle manager), + // skip client-side createConversationFromSegments() to avoid duplicates. + // The backend already has all segments from the live stream and will process + // the conversation on timeout or next connection. + if backendOwnsConversation { + log("Transcription: Backend owns conversation — skipping client-side upload (\(segmentsToUpload.count) segments streamed)") + if let sessionId = sessionId { + // Mark session as completed — no retry needed since backend has the data + try? await TranscriptionStorage.shared.markSessionCompleted(id: sessionId, backendId: "backend-owned") + } + return .saved + } + // Convert SpeakerSegment to API request format (include person_id from live naming) let speakerPersonMap = liveSpeakerPersonMap let apiSegments = segmentsToUpload.map { segment in diff --git a/desktop/Desktop/Sources/Audio/AudioSourceManager.swift b/desktop/Desktop/Sources/Audio/AudioSourceManager.swift index 9444be7d99..3247adc62c 100644 --- a/desktop/Desktop/Sources/Audio/AudioSourceManager.swift +++ b/desktop/Desktop/Sources/Audio/AudioSourceManager.swift @@ -301,7 +301,6 @@ final class AudioSourceManager: ObservableObject { // Start BLE audio processing with direct audio callback and WAL recording await bleAudioService.startProcessing( from: connection, - transcriptionService: nil, // We'll handle routing ourselves audioDataHandler: { [weak self] pcmData in // Convert decoded PCM mono to stereo and forward self?.handleBleAudio(pcmData) diff --git a/desktop/Desktop/Sources/Audio/BleAudioService.swift b/desktop/Desktop/Sources/Audio/BleAudioService.swift index 0cb9bb527f..9078653668 100644 --- a/desktop/Desktop/Sources/Audio/BleAudioService.swift +++ b/desktop/Desktop/Sources/Audio/BleAudioService.swift @@ -27,7 +27,7 @@ final class BleAudioService: ObservableObject { private var cancellables = Set() // Audio delivery - private var transcriptionService: TranscriptionService? + private var audioSink: ((Data) -> Void)? private var audioDataHandler: ((Data) -> Void)? private var rawFrameHandler: ((Data) -> Void)? @@ -44,12 +44,12 @@ final class BleAudioService: ObservableObject { /// Start processing audio from a device connection /// - Parameters: /// - connection: The device connection to get audio from - /// - transcriptionService: Optional transcription service to send audio to + /// - audioSink: Optional closure to receive decoded mono PCM audio (e.g., send to transcription service) /// - audioDataHandler: Optional handler for decoded PCM data (alternative to transcription) /// - rawFrameHandler: Optional handler for raw encoded frames (for WAL recording) func startProcessing( from connection: DeviceConnection, - transcriptionService: TranscriptionService? = nil, + audioSink: ((Data) -> Void)? = nil, audioDataHandler: ((Data) -> Void)? = nil, rawFrameHandler: ((Data) -> Void)? = nil ) async { @@ -58,7 +58,7 @@ final class BleAudioService: ObservableObject { return } - self.transcriptionService = transcriptionService + self.audioSink = audioSink self.audioDataHandler = audioDataHandler self.rawFrameHandler = rawFrameHandler @@ -126,7 +126,7 @@ final class BleAudioService: ObservableObject { cancellables.removeAll() isProcessing = false - transcriptionService = nil + audioSink = nil audioDataHandler = nil rawFrameHandler = nil @@ -194,37 +194,13 @@ final class BleAudioService: ObservableObject { // Calculate audio level updateAudioLevel(from: pcmData) - // Send to transcription service (mono channel) - if let transcription = transcriptionService { - // TranscriptionService expects stereo (2 channels) for multichannel transcription - // For BLE device audio, we duplicate to both channels (device is the "user") - let stereoData = convertToStereo(pcmData) - transcription.sendAudio(stereoData) - } + // Send decoded mono PCM to audio sink (e.g., transcription service) + audioSink?(pcmData) // Send to custom handler audioDataHandler?(pcmData) } - /// Convert mono PCM to stereo (duplicate to both channels) - private func convertToStereo(_ monoData: Data) -> Data { - // Mono: [S0, S1, S2, ...] - // Stereo: [S0, S0, S1, S1, S2, S2, ...] (interleaved) - var stereoData = Data(capacity: monoData.count * 2) - - monoData.withUnsafeBytes { bytes in - let samples = bytes.bindMemory(to: Int16.self) - for i in 0.. Void // MARK: - Properties private var onStereoChunk: StereoAudioHandler? private var isRunning = false + private(set) var outputMode: OutputMode = .stereo // Audio buffers (16kHz mono Int16 PCM) private var micBuffer = Data() @@ -29,15 +36,18 @@ class AudioMixer { // MARK: - Public Methods /// Start the mixer - /// - Parameter onStereoChunk: Callback receiving interleaved stereo 16-bit PCM at 16kHz - func start(onStereoChunk: @escaping StereoAudioHandler) { + /// - Parameters: + /// - outputMode: `.stereo` for interleaved multichannel, `.mono` for averaged single-channel + /// - onStereoChunk: Callback receiving mixed 16-bit PCM at 16kHz + func start(outputMode: OutputMode = .stereo, onStereoChunk: @escaping StereoAudioHandler) { bufferLock.lock() + self.outputMode = outputMode self.onStereoChunk = onStereoChunk self.isRunning = true micBuffer = Data() systemBuffer = Data() bufferLock.unlock() - log("AudioMixer: Started") + log("AudioMixer: Started (output=\(outputMode))") } /// Stop the mixer and flush remaining audio @@ -105,12 +115,17 @@ class AudioMixer { if flush { // When flushing, process whatever is available bytesToProcess = max(micBuffer.count, systemBuffer.count) + } else if micBuffer.count >= minBufferBytes && systemBuffer.count >= minBufferBytes { + // Both buffers have data — use shorter to stay in sync + bytesToProcess = (min(micBuffer.count, systemBuffer.count) / 2) * 2 + } else if micBuffer.count >= minBufferBytes { + // Only mic has data (system audio disabled/unavailable) — pad system with silence + bytesToProcess = (micBuffer.count / 2) * 2 + } else if systemBuffer.count >= minBufferBytes { + // Only system has data — pad mic with silence + bytesToProcess = (systemBuffer.count / 2) * 2 } else { - // Normal operation: process when both have data - let minAvailable = min(micBuffer.count, systemBuffer.count) - guard minAvailable >= minBufferBytes else { return } - // Align to sample boundary (2 bytes per Int16 sample) - bytesToProcess = (minAvailable / 2) * 2 + return } guard bytesToProcess >= 2 else { return } @@ -137,11 +152,17 @@ class AudioMixer { systemBuffer = Data() } - // Interleave into stereo - let stereoData = interleave(mic: micData, system: sysData) + // Mix according to output mode + let mixedData: Data + switch outputMode { + case .stereo: + mixedData = interleave(mic: micData, system: sysData) + case .mono: + mixedData = mixToMono(mic: micData, system: sysData) + } // Send to callback - onStereoChunk?(stereoData) + onStereoChunk?(mixedData) } /// Interleave two mono Int16 streams into stereo @@ -174,4 +195,32 @@ class AudioMixer { Data(buffer: buffer) } } + + /// Average two mono Int16 streams into a single mono stream + /// Output format: [(mic0+sys0)/2, (mic1+sys1)/2, ...] + private func mixToMono(mic: Data, system: Data) -> Data { + let sampleCount = mic.count / 2 + + var monoSamples = [Int16]() + monoSamples.reserveCapacity(sampleCount) + + mic.withUnsafeBytes { micPtr in + system.withUnsafeBytes { sysPtr in + let micSamples = micPtr.bindMemory(to: Int16.self) + let sysSamples = sysPtr.bindMemory(to: Int16.self) + + for i in 0.. Void + typealias ErrorHandler = (Error) -> Void + typealias ConnectionHandler = () -> Void + + enum BackendTranscriptionError: LocalizedError { + case notSignedIn + case connectionFailed(Error) + case invalidResponse + case webSocketError(String) + + var errorDescription: String? { + switch self { + case .notSignedIn: + return "Not signed in — cannot connect to backend" + case .connectionFailed(let error): + return "Connection failed: \(error.localizedDescription)" + case .invalidResponse: + return "Invalid response from backend" + case .webSocketError(let message): + return "WebSocket error: \(message)" + } + } + } + + // MARK: - Properties + + private var webSocketTask: URLSessionWebSocketTask? + private var urlSession: URLSession? + private var isConnected = false + private var shouldReconnect = false + + // Callbacks + private var onTranscript: TranscriptHandler? + private var onError: ErrorHandler? + private var onConnected: ConnectionHandler? + private var onDisconnected: ConnectionHandler? + + // Configuration + private let language: String + private let sampleRate = 16000 + private let codec = "pcm16" + private let channels = 1 // Always mono — backend handles diarization + private let source: String + private let conversationTimeout: Int + + // Reconnection + private var reconnectAttempts = 0 + private let maxReconnectAttempts = 10 + private var reconnectTask: Task? + + // Keepalive — send empty data periodically to prevent timeout + private var keepaliveTask: Task? + private let keepaliveInterval: TimeInterval = 8.0 + + // Watchdog: detect stale connections where WebSocket dies silently + private var watchdogTask: Task? + private var lastDataReceivedAt: Date? + private var lastKeepaliveSuccessAt: Date? + private let watchdogInterval: TimeInterval = 30.0 + private let staleThreshold: TimeInterval = 60.0 + + // Audio buffering + private var audioBuffer = Data() + private let audioBufferSize = 3200 // ~100ms of 16kHz 16-bit mono (16000 * 2 * 0.1) + private let audioBufferLock = NSLock() + + // MARK: - Initialization + + /// Initialize the backend transcription service + /// - Parameters: + /// - language: Language code for transcription (e.g., "en", "multi") + /// - source: Audio source identifier for backend analytics (e.g., "desktop", "omi", "bee") + /// - conversationTimeout: Seconds of silence before the backend creates a memory + init(language: String = "en", source: String = "desktop", conversationTimeout: Int = 120) { + self.language = language + self.source = source + self.conversationTimeout = conversationTimeout + log("BackendTranscriptionService: Initialized with language=\(language), source=\(source)") + } + + // MARK: - Public Methods + + /// Start the transcription service + func start( + onTranscript: @escaping TranscriptHandler, + onError: ErrorHandler? = nil, + onConnected: ConnectionHandler? = nil, + onDisconnected: ConnectionHandler? = nil + ) { + self.onTranscript = onTranscript + self.onError = onError + self.onConnected = onConnected + self.onDisconnected = onDisconnected + self.shouldReconnect = true + self.reconnectAttempts = 0 + + connect() + } + + /// Stop the transcription service + func stop() { + shouldReconnect = false + reconnectTask?.cancel() + reconnectTask = nil + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + + flushAudioBuffer() + disconnect() + } + + /// Signal the backend that no more audio will be sent, but keep connection open + /// to receive final transcription results. Call stop() later to fully disconnect. + func finishStream() { + shouldReconnect = false + reconnectTask?.cancel() + reconnectTask = nil + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + + flushAudioBuffer() + + // Backend doesn't have a CloseStream message like Deepgram. + // The connection will be closed when stop() is called. + log("BackendTranscriptionService: finishStream called, waiting for final results") + } + + /// Send audio data to the backend (buffered for efficiency) + func sendAudio(_ data: Data) { + guard isConnected else { return } + + audioBufferLock.lock() + audioBuffer.append(data) + + if audioBuffer.count >= audioBufferSize { + let chunk = audioBuffer + audioBuffer = Data() + audioBufferLock.unlock() + sendAudioChunk(chunk) + } else { + audioBufferLock.unlock() + } + } + + /// Flush any remaining audio in the buffer + private func flushAudioBuffer() { + audioBufferLock.lock() + let chunk = audioBuffer + audioBuffer = Data() + audioBufferLock.unlock() + + if !chunk.isEmpty { + sendAudioChunk(chunk) + } + } + + /// Actually send an audio chunk over the WebSocket + private func sendAudioChunk(_ data: Data) { + guard isConnected, let webSocketTask = webSocketTask else { return } + + let message = URLSessionWebSocketTask.Message.data(data) + webSocketTask.send(message) { [weak self] error in + if let error = error { + logError("BackendTranscriptionService: Send error", error: error) + self?.handleDisconnection() + } + } + } + + /// No-op for backend (Deepgram-specific Finalize message not needed) + func sendFinalize() { + // Backend handles segmentation server-side + } + + /// Public keepalive for VAD gate to call during extended silence + func sendKeepalivePublic() { + sendKeepalive() + } + + /// Check if connected + var connected: Bool { + return isConnected + } + + // MARK: - Connection + + private func connect() { + Task { + do { + let token = try await AuthService.shared.getIdToken() + let baseURL = await APIClient.shared.baseURL + self.connectWithToken(token, baseURL: baseURL) + } catch { + logError("BackendTranscriptionService: Failed to get auth token", error: error) + self.onError?(BackendTranscriptionError.notSignedIn) + } + } + } + + private func connectWithToken(_ token: String, baseURL: String) { + + // Convert http(s) to ws(s) + let wsBaseURL: String + if baseURL.hasPrefix("https://") { + wsBaseURL = "wss://" + baseURL.dropFirst("https://".count) + } else if baseURL.hasPrefix("http://") { + wsBaseURL = "ws://" + baseURL.dropFirst("http://".count) + } else { + wsBaseURL = "wss://" + baseURL + } + + // Strip trailing slash before appending path + let cleanBase = wsBaseURL.hasSuffix("/") ? String(wsBaseURL.dropLast()) : wsBaseURL + + var components = URLComponents(string: cleanBase + "/v4/listen")! + components.queryItems = [ + URLQueryItem(name: "language", value: language), + URLQueryItem(name: "sample_rate", value: String(sampleRate)), + URLQueryItem(name: "codec", value: codec), + URLQueryItem(name: "channels", value: String(channels)), + URLQueryItem(name: "source", value: source), + URLQueryItem(name: "include_speech_profile", value: "true"), + URLQueryItem(name: "speaker_auto_assign", value: "enabled"), + URLQueryItem(name: "conversation_timeout", value: String(conversationTimeout)), + ] + + guard let url = components.url else { + onError?(BackendTranscriptionError.connectionFailed(NSError(domain: "Invalid URL", code: -1))) + return + } + + log("BackendTranscriptionService: Connecting to \(url.absoluteString)") + + // Create URL request with Bearer auth header (same as mobile app) + var request = URLRequest(url: url) + request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization") + + // Create URLSession and WebSocket task + let configuration = URLSessionConfiguration.default + configuration.timeoutIntervalForRequest = 30 + configuration.timeoutIntervalForResource = 0 // No resource timeout for long-lived WebSocket + urlSession = URLSession(configuration: configuration) + webSocketTask = urlSession?.webSocketTask(with: request) + + // Start the connection + webSocketTask?.resume() + + // Start receiving messages + receiveMessage() + + // Mark as connected after a short delay (backend doesn't send a connect confirmation) + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { [weak self] in + guard let self = self, self.webSocketTask?.state == .running else { return } + self.isConnected = true + self.reconnectAttempts = 0 + self.lastDataReceivedAt = Date() + self.lastKeepaliveSuccessAt = Date() + log("BackendTranscriptionService: Connected") + self.startKeepalive() + self.startWatchdog() + self.onConnected?() + } + } + + // MARK: - Keepalive + + private func startKeepalive() { + keepaliveTask?.cancel() + keepaliveTask = Task { [weak self] in + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: UInt64(self?.keepaliveInterval ?? 8.0) * 1_000_000_000) + guard !Task.isCancelled, let self = self, self.isConnected else { break } + self.sendKeepalive() + } + } + } + + private func sendKeepalive() { + guard isConnected, let webSocketTask = webSocketTask else { return } + + // Send a small chunk of silence as keepalive (2 bytes of zero = 1 silent sample) + let silence = Data(repeating: 0, count: 2) + let message = URLSessionWebSocketTask.Message.data(silence) + webSocketTask.send(message) { [weak self] error in + if let error = error { + logError("BackendTranscriptionService: Keepalive error", error: error) + self?.handleDisconnection() + } else { + self?.lastKeepaliveSuccessAt = Date() + } + } + } + + // MARK: - Watchdog + + private func startWatchdog() { + watchdogTask?.cancel() + watchdogTask = Task { [weak self] in + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: UInt64(self?.watchdogInterval ?? 30.0) * 1_000_000_000) + guard !Task.isCancelled, let self = self, self.isConnected else { break } + + if let lastData = self.lastDataReceivedAt, + Date().timeIntervalSince(lastData) > self.staleThreshold { + if let lastKeepalive = self.lastKeepaliveSuccessAt, + Date().timeIntervalSince(lastKeepalive) < self.staleThreshold { + continue + } + log("BackendTranscriptionService: Watchdog detected stale connection — forcing reconnect") + self.handleDisconnection() + } + } + } + } + + // MARK: - Disconnect / Reconnect + + private func disconnect() { + isConnected = false + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + webSocketTask?.cancel(with: .normalClosure, reason: nil) + webSocketTask = nil + urlSession?.invalidateAndCancel() + urlSession = nil + log("BackendTranscriptionService: Disconnected") + onDisconnected?() + } + + private func handleDisconnection() { + guard isConnected else { return } + + isConnected = false + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + webSocketTask = nil + urlSession?.invalidateAndCancel() + urlSession = nil + onDisconnected?() + + if shouldReconnect && reconnectAttempts < maxReconnectAttempts { + reconnectAttempts += 1 + let delay = min(pow(2.0, Double(reconnectAttempts)), 32.0) + log("BackendTranscriptionService: Reconnecting in \(delay)s (attempt \(reconnectAttempts))") + + reconnectTask = Task { + try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + guard !Task.isCancelled, self.shouldReconnect else { return } + self.connect() + } + } else if reconnectAttempts >= maxReconnectAttempts { + log("BackendTranscriptionService: Max reconnect attempts reached") + onError?(BackendTranscriptionError.webSocketError("Max reconnect attempts reached")) + } + } + + // MARK: - Message Handling + + private func receiveMessage() { + webSocketTask?.receive { [weak self] result in + guard let self = self else { return } + + switch result { + case .success(let message): + self.handleMessage(message) + self.receiveMessage() + + case .failure(let error): + guard self.isConnected else { return } + logError("BackendTranscriptionService: Receive error", error: error) + self.handleDisconnection() + } + } + } + + private func handleMessage(_ message: URLSessionWebSocketTask.Message) { + lastDataReceivedAt = Date() + + switch message { + case .string(let text): + parseResponse(text) + case .data(let data): + if let text = String(data: data, encoding: .utf8) { + parseResponse(text) + } + @unknown default: + break + } + } + + private func parseResponse(_ text: String) { + // Handle heartbeat ping from backend + if text == "ping" { + return + } + + guard let data = text.data(using: .utf8) else { return } + + // Try parsing as array of transcript segments (main response format) + if let segments = try? JSONDecoder().decode([BackendSegment].self, from: data) { + for segment in segments { + // Map backend is_user to channel index: + // is_user=true → channelIndex=0 (mic/user) + // is_user=false → channelIndex=1 (system/others) + let channelIndex = segment.is_user ? 0 : 1 + + let transcriptSegment = TranscriptSegment( + text: segment.text, + isFinal: true, + speechFinal: true, + confidence: 1.0, + words: [TranscriptSegment.Word( + word: segment.text, + start: segment.start, + end: segment.end, + confidence: 1.0, + speaker: segment.speaker_id, + punctuatedWord: segment.text + )], + channelIndex: channelIndex + ) + onTranscript?(transcriptSegment) + } + return + } + + // Try parsing as event object (memory_created, service_status, etc.) + if let event = try? JSONDecoder().decode(BackendEvent.self, from: data) { + switch event.type { + case "memory_created": + log("BackendTranscriptionService: Memory created") + case "service_status": + log("BackendTranscriptionService: Service status: \(event.status ?? "unknown")") + default: + log("BackendTranscriptionService: Event: \(event.type)") + } + return + } + + // Unknown message — log for debugging + log("BackendTranscriptionService: Unknown message: \(text.prefix(200))") + } +} + +// MARK: - Backend Response Models + +/// Transcript segment from the OMI backend +private struct BackendSegment: Decodable { + let text: String + let speaker: String? + let speaker_id: Int? + let is_user: Bool + let start: Double + let end: Double + let person_id: String? +} + +/// Event message from the OMI backend +private struct BackendEvent: Decodable { + let type: String + let status: String? + + enum CodingKeys: String, CodingKey { + case type + case status + } + + init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + type = try container.decode(String.self, forKey: .type) + status = try container.decodeIfPresent(String.self, forKey: .status) + } +} diff --git a/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift b/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift index aee2f956d7..4156578928 100644 --- a/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift +++ b/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift @@ -34,7 +34,7 @@ class PushToTalkManager: ObservableObject { private let doubleTapThreshold: TimeInterval = 0.4 // Transcription - private var transcriptionService: TranscriptionService? + private var transcriptionService: BackendTranscriptionService? private var audioCaptureService: AudioCaptureService? private var transcriptSegments: [String] = [] private var lastInterimText: String = "" @@ -302,58 +302,20 @@ class PushToTalkManager: ObservableObject { sound?.play() } - let isBatchMode = ShortcutSettings.shared.pttTranscriptionMode == .batch + // Flush remaining audio and wait for final transcript from backend + transcriptionService?.finishStream() + log("PushToTalkManager: finalizing — mic stopped, waiting for final transcript") - if isBatchMode { - // Batch mode: send accumulated audio to pre-recorded API - log("PushToTalkManager: finalizing (batch) — mic stopped, transcribing recorded audio") - batchAudioLock.lock() - let audioData = batchAudioBuffer - batchAudioBuffer = Data() - batchAudioLock.unlock() - - // Stop streaming service (was not used in batch mode, but clean up) - stopAudioTranscription() - - guard !audioData.isEmpty else { - log("PushToTalkManager: batch mode — no audio recorded") - sendTranscript() - return - } - - barState?.voiceTranscript = "Transcribing..." - - Task { - do { - let language = AssistantSettings.shared.effectiveTranscriptionLanguage - let transcript = try await TranscriptionService.batchTranscribe( - audioData: audioData, - language: language - ) - if let transcript, !transcript.isEmpty { - self.transcriptSegments = [transcript] - } - } catch { - logError("PushToTalkManager: batch transcription failed", error: error) - } + // Safety timeout: if backend doesn't send a final segment within 3s, send what we have + let timeout = DispatchWorkItem { [weak self] in + Task { @MainActor in + guard let self, self.state == .finalizing else { return } + log("PushToTalkManager: finalization timeout — sending transcript") self.sendTranscript() } - } else { - // Live mode: flush remaining audio and wait for final transcript from Deepgram - transcriptionService?.finishStream() - log("PushToTalkManager: finalizing (live) — mic stopped, waiting for final transcript") - - // Safety timeout: if Deepgram doesn't send a final segment within 3s, send what we have - let timeout = DispatchWorkItem { [weak self] in - Task { @MainActor in - guard let self, self.state == .finalizing else { return } - log("PushToTalkManager: live finalization timeout — sending transcript") - self.sendTranscript() - } - } - liveFinalizationTimeout = timeout - DispatchQueue.main.asyncAfter(deadline: .now() + 3.0, execute: timeout) } + liveFinalizationTimeout = timeout + DispatchQueue.main.asyncAfter(deadline: .now() + 3.0, execute: timeout) } private func sendTranscript() { @@ -421,50 +383,34 @@ class PushToTalkManager: ObservableObject { return } - let isBatchMode = ShortcutSettings.shared.pttTranscriptionMode == .batch + // Always use live streaming through the backend (no client-side batch mode) + startMicCapture() - if isBatchMode { - // Batch mode: just capture audio into buffer, no streaming connection - batchAudioLock.lock() - batchAudioBuffer = Data() - batchAudioLock.unlock() - startMicCapture(batchMode: true) - log("PushToTalkManager: started audio capture (batch mode)") - } else { - // Live mode: start mic capture and stream to Deepgram - startMicCapture() + let language = AssistantSettings.shared.effectiveTranscriptionLanguage + let service = BackendTranscriptionService(language: language) + transcriptionService = service - do { - let language = AssistantSettings.shared.effectiveTranscriptionLanguage - let service = try TranscriptionService(language: language, channels: 1) - transcriptionService = service - - service.start( - onTranscript: { [weak self] segment in - Task { @MainActor in - self?.handleTranscript(segment) - } - }, - onError: { [weak self] error in - Task { @MainActor in - logError("PushToTalkManager: transcription error", error: error) - self?.stopListening() - } - }, - onConnected: { - Task { @MainActor in - log("PushToTalkManager: DeepGram connected") - } - } - ) - } catch { - logError("PushToTalkManager: failed to create TranscriptionService", error: error) - stopListening() + service.start( + onTranscript: { [weak self] segment in + Task { @MainActor in + self?.handleTranscript(segment) + } + }, + onError: { [weak self] error in + Task { @MainActor in + logError("PushToTalkManager: transcription error", error: error) + self?.stopListening() + } + }, + onConnected: { + Task { @MainActor in + log("PushToTalkManager: backend connected") + } } - } + ) } - private func startMicCapture(batchMode: Bool = false) { + private func startMicCapture() { if audioCaptureService == nil { audioCaptureService = AudioCaptureService() } @@ -475,20 +421,12 @@ class PushToTalkManager: ObservableObject { do { try await capture.startCapture( onAudioChunk: { [weak self] audioData in - guard let self else { return } - if batchMode { - // Batch mode: accumulate audio in buffer - self.batchAudioLock.lock() - self.batchAudioBuffer.append(audioData) - self.batchAudioLock.unlock() - } else { - // Live mode: stream to Deepgram - self.transcriptionService?.sendAudio(audioData) - } + // Stream mono audio to backend + self?.transcriptionService?.sendAudio(audioData) }, onAudioLevel: { _ in } ) - log("PushToTalkManager: mic capture started (batch=\(batchMode))") + log("PushToTalkManager: mic capture started") } catch { logError("PushToTalkManager: mic capture failed", error: error) self.stopListening()