diff --git a/apps/web/app/s/[videoId]/_components/AuthOverlay.tsx b/apps/web/app/s/[videoId]/_components/AuthOverlay.tsx index a0a8ed33fb..e621fe0012 100644 --- a/apps/web/app/s/[videoId]/_components/AuthOverlay.tsx +++ b/apps/web/app/s/[videoId]/_components/AuthOverlay.tsx @@ -10,8 +10,8 @@ import { signIn } from "next-auth/react"; import { useId, useState } from "react"; import { toast } from "sonner"; import { trackEvent } from "@/app/utils/analytics"; -import OtpForm from "./OtpForm"; import { usePublicEnv } from "@/utils/public-env"; +import OtpForm from "./OtpForm"; interface AuthOverlayProps { isOpen: boolean; diff --git a/apps/web/app/s/[videoId]/_components/Sidebar.tsx b/apps/web/app/s/[videoId]/_components/Sidebar.tsx index 131653f2e9..3310e56316 100644 --- a/apps/web/app/s/[videoId]/_components/Sidebar.tsx +++ b/apps/web/app/s/[videoId]/_components/Sidebar.tsx @@ -97,7 +97,7 @@ export const Sidebar = forwardRef<{ scrollToBottom: () => void }, SidebarProps>( : !( videoSettings?.disableTranscript ?? data.orgSettings?.disableTranscript - ) + ) ? "transcript" : "activity"; diff --git a/crates/media-info/src/lib.rs b/crates/media-info/src/lib.rs index 4c3f09a77c..290f986c97 100644 --- a/crates/media-info/src/lib.rs +++ b/crates/media-info/src/lib.rs @@ -24,7 +24,7 @@ pub enum AudioInfoError { } impl AudioInfo { - pub const MAX_AUDIO_CHANNELS: u16 = 8; + pub const MAX_AUDIO_CHANNELS: u16 = 16; pub const fn new( sample_format: Sample, @@ -133,29 +133,48 @@ impl AudioInfo { frame } - pub fn wrap_frame(&self, data: &[u8]) -> frame::Audio { - let sample_size = self.sample_size(); - let interleaved_chunk_size = sample_size * self.channels; - let samples = data.len() / interleaved_chunk_size; + /// Always expects packed input data + pub fn wrap_frame_with_max_channels( + &self, + packed_data: &[u8], + max_channels: usize, + ) -> frame::Audio { + let out_channels = self.channels.min(max_channels); - let mut frame = frame::Audio::new(self.sample_format, samples, self.channel_layout()); + let sample_size = self.sample_size(); + let packed_sample_size = sample_size * self.channels; + let samples = packed_data.len() / packed_sample_size; + + let mut frame = frame::Audio::new( + self.sample_format, + samples, + ChannelLayout::default(out_channels as i32), + ); frame.set_rate(self.sample_rate); if self.channels == 0 { unreachable!() - } else if self.channels == 1 || frame.is_packed() { - frame.data_mut(0)[0..data.len()].copy_from_slice(data) + } else if self.channels == 1 || (frame.is_packed() && self.channels <= max_channels) { + frame.data_mut(0)[0..packed_data.len()].copy_from_slice(packed_data) + } else if frame.is_packed() && self.channels > out_channels { + for (chunk_index, packed_chunk) in packed_data.chunks(packed_sample_size).enumerate() { + let start = chunk_index * sample_size * out_channels; + let end = start + sample_size * out_channels; + + frame.data_mut(0)[start..end].copy_from_slice(&packed_chunk[0..(end - start)]); + } } else { // cpal *always* returns interleaved data (i.e. the first sample from every channel, followed // by the second sample from every channel, et cetera). Many audio codecs work better/primarily // with planar data, so we de-interleave it here if there is more than one channel. - for (chunk_index, interleaved_chunk) in data.chunks(interleaved_chunk_size).enumerate() + for (chunk_index, interleaved_chunk) in + packed_data.chunks(packed_sample_size).enumerate() { let start = chunk_index * sample_size; let end = start + sample_size; - for channel in 0..self.channels { + for channel in 0..self.channels.min(max_channels) { let channel_start = channel * sample_size; let channel_end = channel_start + sample_size; frame.data_mut(channel)[start..end] @@ -166,6 +185,17 @@ impl AudioInfo { frame } + + /// Always expects packed input data + pub fn wrap_frame(&self, data: &[u8]) -> frame::Audio { + self.wrap_frame_with_max_channels(data, self.channels) + } + + pub fn with_max_channels(&self, channels: u16) -> Self { + let mut this = *self; + this.channels = this.channels.min(channels as usize); + this + } } pub enum RawVideoFormat { @@ -292,3 +322,58 @@ pub fn ffmpeg_sample_format_for(sample_format: SampleFormat) -> Option { _ => None, } } + +#[cfg(test)] +mod tests { + use super::*; + + mod audio_info { + use super::*; + + #[test] + fn wrap_packed_frame() { + let info = AudioInfo::new_raw(Sample::U8(Type::Packed), 2, 4); + + let input = &[1, 2, 3, 4, 1, 2, 3, 4]; + let frame = info.wrap_frame(input); + + assert_eq!(&frame.data(0)[0..input.len()], input); + } + + #[test] + fn wrap_planar_frame() { + let info = AudioInfo::new_raw(Sample::U8(Type::Planar), 2, 4); + + let input = &[1, 2, 3, 4, 1, 2, 3, 4]; + let frame = info.wrap_frame(input); + + assert_eq!(frame.planes(), 4); + assert_eq!(&frame.data(0)[0..2], &[1, 1]); + assert_eq!(&frame.data(1)[0..2], &[2, 2]); + assert_eq!(&frame.data(2)[0..2], &[3, 3]); + assert_eq!(&frame.data(3)[0..2], &[4, 4]); + } + + #[test] + fn wrap_packed_frame_max_channels() { + let info = AudioInfo::new_raw(Sample::U8(Type::Packed), 2, 4); + + let input = &[1, 2, 3, 4, 1, 2, 3, 4]; + let frame = info.wrap_frame_with_max_channels(input, 2); + + assert_eq!(&frame.data(0)[0..4], &[1, 2, 1, 2]); + } + + #[test] + fn wrap_planar_frame_max_channels() { + let info = AudioInfo::new_raw(Sample::U8(Type::Planar), 2, 4); + + let input = &[1, 2, 3, 4, 1, 2, 3, 4]; + let frame = info.wrap_frame_with_max_channels(input, 2); + + assert_eq!(frame.planes(), 2); + assert_eq!(&frame.data(0)[0..2], &[1, 1]); + assert_eq!(&frame.data(1)[0..2], &[2, 2]); + } + } +} diff --git a/crates/recording/examples/recording-cli.rs b/crates/recording/examples/recording-cli.rs index 3310e5e7bc..e2a9343b58 100644 --- a/crates/recording/examples/recording-cli.rs +++ b/crates/recording/examples/recording-cli.rs @@ -1,6 +1,7 @@ -use cap_recording::{screen_capture::ScreenCaptureTarget, *}; +use cap_recording::{feeds::*, screen_capture::ScreenCaptureTarget, *}; +use kameo::Actor as _; use scap_targets::Display; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::*; #[tokio::main] @@ -42,20 +43,14 @@ pub async fn main() { // mic_feed // .ask(microphone::SetInput { - // label: - // // MicrophoneFeed::list() - // // .into_iter() - // // .find(|(k, _)| k.contains("Focusrite")) - // MicrophoneFeed::default() - // .map(|v| v.0) - // .unwrap(), + // label: MicrophoneFeed::default_device().map(|v| v.0).unwrap(), // }) // .await // .unwrap() // .await // .unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + // tokio::time::sleep(Duration::from_millis(10)).await; let handle = instant_recording::Actor::builder( dir.path().into(), @@ -63,10 +58,11 @@ pub async fn main() { id: Display::primary().id(), }, ) - // .with_system_audio(true) + .with_system_audio(true) // .with_camera_feed(std::sync::Arc::new( // camera_feed.ask(feeds::camera::Lock).await.unwrap(), // )) + // .with_mic_feed(Arc::new(mic_feed.ask(microphone::Lock).await.unwrap())) .build( #[cfg(target_os = "macos")] cidre::sc::ShareableContent::current().await.unwrap(), diff --git a/crates/recording/src/output_pipeline/core.rs b/crates/recording/src/output_pipeline/core.rs index 31bb91ecc4..b68eb5ad2b 100644 --- a/crates/recording/src/output_pipeline/core.rs +++ b/crates/recording/src/output_pipeline/core.rs @@ -46,6 +46,7 @@ impl OutputPipeline { } } +#[derive(Default)] pub struct SetupCtx { tasks: TaskPool, } @@ -114,6 +115,7 @@ impl OutputPipelineBuilder { } } +#[derive(Default)] pub struct TaskPool(Vec<(&'static str, JoinHandle>)>); impl TaskPool { @@ -176,7 +178,8 @@ impl OutputPipelineBuilder> { .. } = self; - let (mut setup_ctx, stop_token, done_tx, done_rx, pause_flag) = setup_build(); + let mut setup_ctx = SetupCtx::default(); + let build_ctx = BuildCtx::new(); let (video_source, video_rx) = setup_video_source::(video.config, &mut setup_ctx).await?; @@ -184,12 +187,17 @@ impl OutputPipelineBuilder> { let video_info = video_source.video_info(); let (first_tx, first_rx) = oneshot::channel(); + let audio = + setup_audio_sources(&mut setup_ctx, audio_sources, build_ctx.stop_token.clone()) + .await + .context("setup_audio_sources")?; + let muxer = setup_muxer::( muxer_config, &path, Some(video_info), - Some(AudioMixer::INFO), - &pause_flag, + audio.as_ref().map(|v| v.audio_info), + &build_ctx.pause_flag, &mut setup_ctx, ) .await?; @@ -199,18 +207,18 @@ impl OutputPipelineBuilder> { video_source, video_rx, first_tx, - stop_token.clone(), + build_ctx.stop_token.clone(), muxer.clone(), timestamps, ); finish_build( setup_ctx, - audio_sources, - stop_token.clone(), + audio, + build_ctx.stop_token.clone(), muxer, timestamps, - done_tx, + build_ctx.done_tx, None, &path, ) @@ -219,11 +227,11 @@ impl OutputPipelineBuilder> { Ok(OutputPipeline { path, first_timestamp_rx: first_rx, - stop_token: Some(stop_token.clone().drop_guard()), video_info: Some(video_info), - done_fut: done_rx, - pause_flag, - cancel_token: stop_token, + stop_token: Some(build_ctx.stop_token.clone().drop_guard()), + done_fut: build_ctx.done_rx, + pause_flag: build_ctx.pause_flag, + cancel_token: build_ctx.stop_token, }) } } @@ -244,27 +252,33 @@ impl OutputPipelineBuilder { return Err(anyhow!("Invariant: No audio sources")); } - let (mut setup_ctx, stop_token, done_tx, done_rx, pause_flag) = setup_build(); + let mut setup_ctx = SetupCtx::default(); + let build_ctx = BuildCtx::new(); let (first_tx, first_rx) = oneshot::channel(); + let audio = + setup_audio_sources(&mut setup_ctx, audio_sources, build_ctx.stop_token.clone()) + .await + .context("setup_audio_sources")?; + let muxer = setup_muxer::( muxer_config, &path, None, - Some(AudioMixer::INFO), - &pause_flag, + audio.as_ref().map(|v| v.audio_info), + &build_ctx.pause_flag, &mut setup_ctx, ) .await?; finish_build( setup_ctx, - audio_sources, - stop_token.clone(), + audio, + build_ctx.stop_token.clone(), muxer, timestamps, - done_tx, + build_ctx.done_tx, Some(first_tx), &path, ) @@ -273,48 +287,48 @@ impl OutputPipelineBuilder { Ok(OutputPipeline { path, first_timestamp_rx: first_rx, - stop_token: Some(stop_token.clone().drop_guard()), + stop_token: Some(build_ctx.stop_token.clone().drop_guard()), video_info: None, - done_fut: done_rx, - pause_flag, - cancel_token: stop_token, + done_fut: build_ctx.done_rx, + pause_flag: build_ctx.pause_flag, + cancel_token: build_ctx.stop_token, }) } } -fn setup_build() -> ( - SetupCtx, - CancellationToken, - oneshot::Sender>, - DoneFut, - Arc, -) { - let stop_token = CancellationToken::new(); - - let (done_tx, done_rx) = oneshot::channel(); - - ( - SetupCtx { - tasks: TaskPool(vec![]), - }, - stop_token, - done_tx, - done_rx - .map(|v| { - v.map_err(anyhow::Error::from) - .and_then(|v| v) - .map_err(|e| PipelineDoneError(Arc::new(e))) - }) - .boxed() - .shared(), - Arc::new(AtomicBool::new(false)), - ) +struct BuildCtx { + stop_token: CancellationToken, + done_tx: oneshot::Sender>, + done_rx: DoneFut, + pause_flag: Arc, +} + +impl BuildCtx { + pub fn new() -> Self { + let stop_token = CancellationToken::new(); + + let (done_tx, done_rx) = oneshot::channel(); + + Self { + stop_token, + done_tx, + done_rx: done_rx + .map(|v| { + v.map_err(anyhow::Error::from) + .and_then(|v| v) + .map_err(|e| PipelineDoneError(Arc::new(e))) + }) + .boxed() + .shared(), + pause_flag: Arc::new(AtomicBool::new(false)), + } + } } #[allow(clippy::too_many_arguments)] async fn finish_build( mut setup_ctx: SetupCtx, - audio_sources: Vec, + audio: Option, stop_token: CancellationToken, muxer: Arc>, timestamps: Timestamps, @@ -322,16 +336,15 @@ async fn finish_build( first_tx: Option>, path: &Path, ) -> anyhow::Result<()> { - configure_audio( - &mut setup_ctx, - audio_sources, - stop_token.clone(), - muxer.clone(), - timestamps, - first_tx, - ) - .await - .context("audio mixer setup")?; + if let Some(audio) = audio { + audio.configure( + &mut setup_ctx, + muxer.clone(), + stop_token.clone(), + timestamps, + first_tx, + ); + } tokio::spawn( async move { @@ -468,88 +481,114 @@ fn spawn_video_encoder, TVideo: V }); } -async fn configure_audio( +struct PreparedAudioSources { + audio_info: AudioInfo, + audio_rx: mpsc::Receiver, + erased_audio_sources: Vec, +} + +impl PreparedAudioSources { + pub fn configure( + mut self, + setup_ctx: &mut SetupCtx, + muxer: Arc>, + stop_token: CancellationToken, + timestamps: Timestamps, + mut first_tx: Option>, + ) { + setup_ctx.tasks().spawn("mux-audio", { + let stop_token = stop_token.child_token(); + let muxer = muxer.clone(); + async move { + stop_token + .run_until_cancelled(async { + while let Some(frame) = self.audio_rx.next().await { + if let Some(first_tx) = first_tx.take() { + let _ = first_tx.send(frame.timestamp); + } + + let timestamp = frame.timestamp.duration_since(timestamps); + if let Err(e) = muxer.lock().await.send_audio_frame(frame, timestamp) { + error!("Audio encoder: {e}"); + } + } + }) + .await; + + for source in &mut self.erased_audio_sources { + let _ = (source.stop_fn)(source.inner.as_mut()).await; + } + + muxer.lock().await.stop(); + + Ok(()) + } + }); + } +} + +async fn setup_audio_sources( setup_ctx: &mut SetupCtx, - audio_sources: Vec, + mut audio_sources: Vec, stop_token: CancellationToken, - muxer: Arc>, - timestamps: Timestamps, - mut first_tx: Option>, -) -> anyhow::Result<()> { +) -> anyhow::Result> { if audio_sources.is_empty() { - return Ok(()); + return Ok(None); } - let mut audio_mixer = AudioMixer::builder(); - let mut erased_audio_sources = vec![]; + let (audio_tx, audio_rx) = mpsc::channel(64); - for audio_source_setup in audio_sources { - let (tx, rx) = mpsc::channel(64); - let source = (audio_source_setup)(tx, setup_ctx).await?; - - audio_mixer.add_source(source.audio_info, rx); + let audio_info = if audio_sources.len() == 1 { + let source = (audio_sources.swap_remove(0))(audio_tx, setup_ctx).await?; + let info = source.audio_info; erased_audio_sources.push(source); - } + info + } else { + let mut audio_mixer = AudioMixer::builder(); + let stop_flag = Arc::new(AtomicBool::new(false)); + let (ready_tx, ready_rx) = oneshot::channel::>(); + + for audio_source_setup in audio_sources { + let (tx, rx) = mpsc::channel(64); + let source = (audio_source_setup)(tx, setup_ctx).await?; + + audio_mixer.add_source(source.audio_info, rx); + erased_audio_sources.push(source); + } - let (audio_tx, mut audio_rx) = mpsc::channel(64); - let (ready_tx, ready_rx) = oneshot::channel::>(); - let stop_flag = Arc::new(AtomicBool::new(false)); + setup_ctx.tasks().spawn_thread("audio-mixer", { + let stop_flag = stop_flag.clone(); + move || { + audio_mixer.run(audio_tx, ready_tx, stop_flag); + Ok(()) + } + }); - setup_ctx.tasks().spawn_thread("audio-mixer", { - let stop_flag = stop_flag.clone(); - move || { - audio_mixer.run(audio_tx, ready_tx, stop_flag); - Ok(()) - } - }); + ready_rx + .await + .map_err(|_| anyhow::format_err!("Audio mixer crashed"))??; - ready_rx - .await - .map_err(|_| anyhow::format_err!("Audio mixer crashed"))??; + setup_ctx.tasks().spawn( + "audio-mixer-stop", + stop_token.child_token().cancelled_owned().map(move |_| { + stop_flag.store(true, atomic::Ordering::Relaxed); + Ok(()) + }), + ); - setup_ctx.tasks().spawn( - "audio-mixer-stop", - stop_token.child_token().cancelled_owned().map(move |_| { - stop_flag.store(true, atomic::Ordering::Relaxed); - Ok(()) - }), - ); + AudioMixer::INFO + }; for source in &mut erased_audio_sources { (source.start_fn)(source.inner.as_mut()).await?; } - setup_ctx.tasks().spawn("mux-audio", { - let stop_token = stop_token.child_token(); - let muxer = muxer.clone(); - async move { - stop_token - .run_until_cancelled(async { - while let Some(frame) = audio_rx.next().await { - if let Some(first_tx) = first_tx.take() { - let _ = first_tx.send(frame.timestamp); - } - - let timestamp = frame.timestamp.duration_since(timestamps); - if let Err(e) = muxer.lock().await.send_audio_frame(frame, timestamp) { - error!("Audio encoder: {e}"); - } - } - }) - .await; - - for source in &mut erased_audio_sources { - let _ = (source.stop_fn)(source.inner.as_mut()).await; - } - - muxer.lock().await.stop(); - - Ok(()) - } - }); - - Ok(()) + Ok(Some(PreparedAudioSources { + audio_info, + audio_rx, + erased_audio_sources, + })) } pub type DoneFut = Shared>>; @@ -557,8 +596,8 @@ pub type DoneFut = Shared>>; pub struct OutputPipeline { path: PathBuf, pub first_timestamp_rx: oneshot::Receiver, - stop_token: Option, video_info: Option, + stop_token: Option, done_fut: DoneFut, pause_flag: Arc, cancel_token: CancellationToken, diff --git a/crates/recording/src/sources/microphone.rs b/crates/recording/src/sources/microphone.rs index f567e8cd2d..4d645737bb 100644 --- a/crates/recording/src/sources/microphone.rs +++ b/crates/recording/src/sources/microphone.rs @@ -37,7 +37,7 @@ impl AudioSource for Microphone { while let Ok(frame) = rx.recv_async().await { let _ = audio_tx .send(AudioFrame::new( - audio_info.wrap_frame(&frame.data), + audio_info.wrap_frame_with_max_channels(&frame.data, 2), frame.timestamp, )) .await; @@ -45,7 +45,7 @@ impl AudioSource for Microphone { }); Ok(Self { - info: audio_info, + info: audio_info.with_max_channels(2), _lock: feed_lock, }) } diff --git a/crates/recording/src/studio_recording.rs b/crates/recording/src/studio_recording.rs index 092b1cdc65..2730274274 100644 --- a/crates/recording/src/studio_recording.rs +++ b/crates/recording/src/studio_recording.rs @@ -155,8 +155,10 @@ impl Message for Actor { index, .. }) => { - let (cursors, next_cursor_id) = - self.stop_pipeline(pipeline, segment_start_time).await?; + let (cursors, next_cursor_id) = self + .stop_pipeline(pipeline, segment_start_time) + .await + .context("stop_pipeline")?; Some(ActorState::Paused { next_index: index + 1, @@ -265,10 +267,10 @@ impl Pipeline { Ok(FinishedPipeline { start_time: self.start_time, - screen: screen?, - microphone: microphone.transpose()?, - camera: camera.transpose()?, - system_audio: system_audio.transpose()?, + screen: screen.context("screen")?, + microphone: microphone.transpose().context("microphone")?, + camera: camera.transpose().context("camera")?, + system_audio: system_audio.transpose().context("system_audio")?, cursor: self.cursor, }) }