Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build_tools/nix/flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly
gst_all_1.gst-libav
gst_all_1.gst-plugins-rs

# Node.js
nodejs_20
Expand All @@ -68,6 +69,7 @@
cargo-watch
cargo-nextest
rust-analyzer
streamlink

clang-tools
llvmPackages.bintools
Expand Down
3 changes: 3 additions & 0 deletions compositor_api/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize};

mod decklink;
mod decklink_into;
mod hls;
mod hls_into;
mod mp4;
mod mp4_into;
mod rtp;
Expand All @@ -11,6 +13,7 @@ mod whip;
mod whip_into;

pub use decklink::*;
pub use hls::*;
pub use mp4::*;
pub use rtp::*;
pub use whip::*;
Expand Down
20 changes: 20 additions & 0 deletions compositor_api/src/input/hls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use core::f64;
use std::sync::Arc;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Input stream from MP4 file.
/// Exactly one of `url` and `path` has to be defined.
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct HlsInput {
/// URL of the MP4 file.
pub url: Arc<str>,
/// (**default=`false`**) If input is required and frames are not processed
/// on time, then Smelter will delay producing output frames.
pub required: Option<bool>,
/// Offset in milliseconds relative to the pipeline start (start request). If offset is
/// not defined then stream is synchronized based on the first frames delivery time.
pub offset_ms: Option<f64>,
}
31 changes: 31 additions & 0 deletions compositor_api/src/input/hls_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::time::Duration;

use compositor_pipeline::{
pipeline::{self, input},
queue,
};

use crate::*;

impl TryFrom<HlsInput> for pipeline::RegisterInputOptions {
type Error = TypeError;

fn try_from(value: HlsInput) -> Result<Self, Self::Error> {
let HlsInput {
url,
required,
offset_ms,
} = value;

let queue_options = queue::QueueInputOptions {
required: required.unwrap_or(false),
offset: offset_ms.map(|offset_ms| Duration::from_secs_f64(offset_ms / 1000.0)),
buffer_duration: None,
};

Ok(pipeline::RegisterInputOptions {
input_options: input::InputOptions::Hls(input::hls::HlsInputOptions { url }),
queue_options,
})
}
}
2 changes: 1 addition & 1 deletion compositor_api/src/output/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::*;
#[serde(deny_unknown_fields)]
pub struct WhipOutput {
/// WHIP server endpoint
pub endpoint_url: String,
pub endpoint_url: Arc<str>,
// Bearer token
pub bearer_token: Option<Arc<str>>,
/// Video track configuration.
Expand Down
20 changes: 8 additions & 12 deletions compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compositor_render::{
InputId, OutputId,
};

use crate::pipeline::{decoder::AacDecoderError, output::whip, VideoCodec};
use crate::pipeline::{decoder::AacDecoderError, output::whip, AudioCodec, VideoCodec};
use fdk_aac_sys as fdk;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -42,9 +42,6 @@ pub enum RegisterOutputError {
#[error("Failed to register output stream. Stream \"{0}\" is already registered.")]
AlreadyRegistered(OutputId),

#[error("Encoder error while registering output stream for stream \"{0}\".")]
EncoderError(OutputId, #[source] EncoderInitError),

#[error("Output initialization error while registering output for stream \"{0}\".")]
OutputError(OutputId, #[source] OutputInitError),

Expand Down Expand Up @@ -85,9 +82,15 @@ pub enum UnregisterOutputError {

#[derive(Debug, thiserror::Error)]
pub enum OutputInitError {
#[error("An unsupported codec was requested: {0:?}.")]
#[error("Failed to initialize encoder.")]
EncoderError(#[from] EncoderInitError),

#[error("An unsupported video codec was requested: {0:?}.")]
UnsupportedVideoCodec(VideoCodec),

#[error("An unsupported audio codec was requested: {0:?}.")]
UnsupportedAudioCodec(AudioCodec),

#[error(transparent)]
SocketError(#[from] std::io::Error),

Expand Down Expand Up @@ -204,7 +207,6 @@ impl From<&RegisterInputError> for PipelineErrorInfo {
}

const OUTPUT_STREAM_ALREADY_REGISTERED: &str = "OUTPUT_STREAM_ALREADY_REGISTERED";
const ENCODER_ERROR: &str = "OUTPUT_STREAM_ENCODER_ERROR";
const OUTPUT_ERROR: &str = "OUTPUT_STREAM_OUTPUT_ERROR";
const UNSUPPORTED_RESOLUTION: &str = "UNSUPPORTED_RESOLUTION";
const NO_VIDEO_OR_AUDIO_FOR_OUTPUT: &str = "NO_VIDEO_OR_AUDIO_FOR_OUTPUT";
Expand All @@ -216,15 +218,9 @@ impl From<&RegisterOutputError> for PipelineErrorInfo {
RegisterOutputError::AlreadyRegistered(_) => {
PipelineErrorInfo::new(OUTPUT_STREAM_ALREADY_REGISTERED, ErrorType::UserError)
}

RegisterOutputError::EncoderError(_, _) => {
PipelineErrorInfo::new(ENCODER_ERROR, ErrorType::ServerError)
}

RegisterOutputError::OutputError(_, _) => {
PipelineErrorInfo::new(OUTPUT_ERROR, ErrorType::ServerError)
}

RegisterOutputError::UnsupportedResolution(_) => {
PipelineErrorInfo::new(UNSUPPORTED_RESOLUTION, ErrorType::UserError)
}
Expand Down
41 changes: 29 additions & 12 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use crossbeam_channel::{bounded, Receiver};
use glyphon::fontdb;
use input::InputInitInfo;
use input::RawDataInputOptions;
use output::encoded_data::EncodedDataOutput;
use output::new_external_output;
use output::raw_data::RawDataOutput;
use output::EncodedDataOutputOptions;
use output::OutputOptions;
use output::RawDataOutputOptions;
Expand Down Expand Up @@ -307,9 +310,9 @@ impl Pipeline {
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
register_options.audio,
|ctx, output_id| new_external_output(ctx, output_id, register_options.output_options),
)
}

Expand All @@ -321,9 +324,13 @@ impl Pipeline {
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
register_options.audio,
|ctx, output_id| {
let (output, result) =
EncodedDataOutput::new(output_id, ctx, register_options.output_options)?;
Ok((Box::new(output), result))
},
)
}

Expand All @@ -335,9 +342,12 @@ impl Pipeline {
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
register_options.audio,
|_ctx, _output_id| {
let (output, result) = RawDataOutput::new(register_options.output_options)?;
Ok((Box::new(output), result))
},
)
}

Expand Down Expand Up @@ -394,7 +404,13 @@ impl Pipeline {
return Err(RequestKeyframeError::OutputNotRegistered(output_id.clone()));
};

output.output.request_keyframe(output_id)
match output.output.video() {
Some(video) => video
.keyframe_request_sender
.send(())
.map_err(|_| RequestKeyframeError::KeyframesUnsupported(output_id.clone())),
None => Err(RequestKeyframeError::NoVideoOutput(output_id.clone())),
}
}

pub fn register_font(&self, font_source: fontdb::Source) {
Expand Down Expand Up @@ -439,17 +455,18 @@ impl Pipeline {
}
}

let (Some(resolution), Some(frame_format)) = (
output.output.resolution(),
output.output.output_frame_format(),
) else {
let Some(video_output) = output.output.video() else {
return Err(UpdateSceneError::AudioVideoNotMatching(output_id));
};

info!(?output_id, "Update scene {:#?}", scene_root);
info!(?output_id, "Update scene {:?}", scene_root);

self.renderer
.update_scene(output_id, resolution, frame_format, scene_root)
self.renderer.update_scene(
output_id,
video_output.resolution,
video_output.frame_format,
scene_root,
)
}

fn update_audio(
Expand All @@ -470,7 +487,7 @@ impl Pipeline {
}
}

info!(?output_id, "Update audio mixer {:#?}", audio);
info!(?output_id, "Update audio mixer {:?}", audio);
self.audio_mixer.update_output(output_id, audio)
}

Expand Down
75 changes: 22 additions & 53 deletions compositor_pipeline/src/pipeline/decoder/audio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use compositor_render::InputId;
use crossbeam_channel::{bounded, Receiver, Sender};
use log::{debug, error};
use tracing::{span, trace, Level};
use tracing::{info, span, trace, Level};

extern crate opus as lib_opus;
use crate::{
Expand Down Expand Up @@ -31,18 +31,14 @@ pub enum DecodingError {
trait AudioDecoderExt {
fn decode(&mut self, encoded_chunk: EncodedChunk)
-> Result<Vec<DecodedSamples>, DecodingError>;

fn decoded_sample_rate(&self) -> u32;
}

pub fn start_audio_resampler_only_thread(
input_sample_rate: u32,
mixing_sample_rate: u32,
raw_samples_receiver: Receiver<PipelineEvent<DecodedSamples>>,
samples_sender: Sender<PipelineEvent<InputSamples>>,
input_id: InputId,
) -> Result<(), InputInitError> {
let (decoder_init_result_sender, decoder_init_result_receiver) = bounded(0);
) {
std::thread::Builder::new()
.name(format!("Decoder thread for input {}", input_id.clone()))
.spawn(move || {
Expand All @@ -53,44 +49,17 @@ pub fn start_audio_resampler_only_thread(
)
.entered();

run_resampler_only_thread(
input_sample_rate,
mixing_sample_rate,
raw_samples_receiver,
samples_sender,
decoder_init_result_sender,
);
run_resampler_only_thread(mixing_sample_rate, raw_samples_receiver, samples_sender);
})
.unwrap();

match decoder_init_result_receiver.recv() {
Ok(Ok(())) => Ok(()),
Ok(Err(init_err)) => Err(init_err),
Err(_recv_err) => Err(InputInitError::CannotReadInitResult),
}
}

fn run_resampler_only_thread(
input_sample_rate: u32,
mixing_sample_rate: u32,
raw_samples_receiver: Receiver<PipelineEvent<DecodedSamples>>,
samples_sender: Sender<PipelineEvent<InputSamples>>,
init_result_sender: Sender<Result<(), InputInitError>>,
) {
let mut resampler = match Resampler::new(input_sample_rate, mixing_sample_rate) {
Ok(resampler) => {
if init_result_sender.send(Ok(())).is_err() {
error!("Failed to send rescaler init result.");
}
resampler
}
Err(err) => {
if init_result_sender.send(Err(err)).is_err() {
error!("Failed to send rescaler init result.");
}
return;
}
};
let mut resampler = Resampler::new(mixing_sample_rate);

for event in raw_samples_receiver {
let PipelineEvent::Data(raw_sample) = event else {
Expand Down Expand Up @@ -213,7 +182,10 @@ fn run_decoding<F>(
// report a success or failure.
send_result(Ok(()));
let first_chunk = match chunks_receiver.recv() {
Ok(PipelineEvent::Data(first_chunk)) => first_chunk,
Ok(PipelineEvent::Data(first_chunk)) => {
info!("first audio chunk");
first_chunk
}
Ok(PipelineEvent::EOS) => {
return;
}
Expand All @@ -222,25 +194,22 @@ fn run_decoding<F>(
return;
}
};
let init_res = AacDecoder::new(aac_decoder_opts, &first_chunk)
.map(|decoder| {
let resampler =
Resampler::new(decoder.decoded_sample_rate(), mixing_sample_rate)?;
Ok((decoder, resampler))
})
.and_then(|res| res);

match init_res {
Ok((mut decoder, mut resampler)) => run_decoding_loop(
chunks_receiver,
&mut decoder,
&mut resampler,
samples_sender,
),
let mut decoder = match AacDecoder::new(aac_decoder_opts, first_chunk) {
Ok(decoder) => decoder,
Err(err) => {
error!("Fatal AAC decoder initialization error. {}", err);
return;
}
}
};

let mut resampler = Resampler::new(mixing_sample_rate);

run_decoding_loop(
chunks_receiver,
&mut decoder,
&mut resampler,
samples_sender,
)
}
}
}
Expand Down Expand Up @@ -281,6 +250,6 @@ fn init_opus_decoder(
mixing_sample_rate: u32,
) -> Result<(OpusDecoder, Resampler), InputInitError> {
let decoder = OpusDecoder::new(opus_decoder_opts, mixing_sample_rate)?;
let resampler = Resampler::new(decoder.decoded_sample_rate(), mixing_sample_rate)?;
let resampler = Resampler::new(mixing_sample_rate);
Ok((decoder, resampler))
}
Loading