diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index f1faf54670..67bca285a3 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -39,6 +39,7 @@ use dynamo_llm::{self as llm_rs}; use dynamo_llm::{entrypoint::RouterConfig, kv_router::KvRouterConfig}; use crate::llm::local_model::ModelRuntimeConfig; +use crate::llm::preprocessor::{MediaDecoder, MediaFetcher}; #[pyclass(eq, eq_int)] #[derive(Clone, Debug, PartialEq)] @@ -161,6 +162,8 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -217,7 +220,7 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) /// Create an engine and attach it to an endpoint to make it visible to the frontend. /// This is the main way you create a Dynamo worker / backend. #[pyfunction] -#[pyo3(signature = (model_input, model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0, runtime_config=None, user_data=None, custom_template_path=None))] +#[pyo3(signature = (model_input, model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0, runtime_config=None, user_data=None, custom_template_path=None, media_decoder=None, media_fetcher=None))] #[allow(clippy::too_many_arguments)] fn register_llm<'p>( py: Python<'p>, @@ -233,6 +236,8 @@ fn register_llm<'p>( runtime_config: Option, user_data: Option<&Bound<'p, PyDict>>, custom_template_path: Option<&str>, + media_decoder: Option, + media_fetcher: Option, ) -> PyResult> { // Validate Prefill model type requirements if model_type.inner == llm_rs::model_type::ModelType::Prefill { @@ -305,7 +310,9 @@ fn register_llm<'p>( .migration_limit(Some(migration_limit)) .runtime_config(runtime_config.unwrap_or_default().inner) .user_data(user_data_json) - .custom_template_path(custom_template_path_owned); + .custom_template_path(custom_template_path_owned) + .media_decoder(media_decoder.map(|m| m.inner)) + .media_fetcher(media_fetcher.map(|m| m.inner)); // Load the ModelDeploymentCard let mut local_model = builder.build().await.map_err(to_pyerr)?; // Advertise ourself on etcd so ingress can find us diff --git a/lib/bindings/python/rust/llm/preprocessor.rs b/lib/bindings/python/rust/llm/preprocessor.rs index 0504635377..0fc8686f60 100644 --- a/lib/bindings/python/rust/llm/preprocessor.rs +++ b/lib/bindings/python/rust/llm/preprocessor.rs @@ -3,9 +3,11 @@ use super::*; use crate::llm::model_card::ModelDeploymentCard; +use std::time::Duration; use llm_rs::{ preprocessor::OpenAIPreprocessor, + preprocessor::media::{MediaDecoder as RsMediaDecoder, MediaFetcher as RsMediaFetcher}, protocols::common::llm_backend::{BackendOutput, PreprocessedRequest}, types::{ Annotated, @@ -74,3 +76,62 @@ impl OAIChatPreprocessor { }) } } + +#[pyclass] +#[derive(Clone)] +pub struct MediaDecoder { + pub(crate) inner: RsMediaDecoder, +} + +#[pymethods] +impl MediaDecoder { + #[new] + fn new() -> Self { + Self { + inner: RsMediaDecoder::default(), + } + } + + fn image_decoder(&mut self, image_decoder: &Bound<'_, PyDict>) -> PyResult<()> { + let image_decoder = pythonize::depythonize(image_decoder).map_err(|err| { + PyErr::new::(format!("Failed to parse image_decoder: {}", err)) + })?; + self.inner.image_decoder = image_decoder; + Ok(()) + } +} + +#[pyclass] +#[derive(Clone)] +pub struct MediaFetcher { + pub(crate) inner: RsMediaFetcher, +} + +#[pymethods] +impl MediaFetcher { + #[new] + fn new() -> Self { + Self { + inner: RsMediaFetcher::default(), + } + } + fn user_agent(&mut self, user_agent: String) { + self.inner.user_agent = user_agent; + } + + fn allow_direct_ip(&mut self, allow: bool) { + self.inner.allow_direct_ip = allow; + } + + fn allow_direct_port(&mut self, allow: bool) { + self.inner.allow_direct_port = allow; + } + + fn allowed_media_domains(&mut self, domains: Vec) { + self.inner.allowed_media_domains = Some(domains.into_iter().collect()); + } + + fn timeout_ms(&mut self, timeout_ms: u64) { + self.inner.timeout = Some(Duration::from_millis(timeout_ms)); + } +} diff --git a/lib/bindings/python/src/dynamo/llm/__init__.py b/lib/bindings/python/src/dynamo/llm/__init__.py index 8c406ba6bc..3d812b311e 100644 --- a/lib/bindings/python/src/dynamo/llm/__init__.py +++ b/lib/bindings/python/src/dynamo/llm/__init__.py @@ -18,6 +18,8 @@ from dynamo._core import KvRecorder as KvRecorder from dynamo._core import KvRouterConfig as KvRouterConfig from dynamo._core import KvStats as KvStats +from dynamo._core import MediaDecoder as MediaDecoder +from dynamo._core import MediaFetcher as MediaFetcher from dynamo._core import ModelInput as ModelInput from dynamo._core import ModelRuntimeConfig as ModelRuntimeConfig from dynamo._core import ModelType as ModelType diff --git a/lib/llm/src/local_model.rs b/lib/llm/src/local_model.rs index de869047c5..5c734abbd2 100644 --- a/lib/llm/src/local_model.rs +++ b/lib/llm/src/local_model.rs @@ -14,6 +14,7 @@ use crate::entrypoint::RouterConfig; use crate::mocker::protocols::MockEngineArgs; use crate::model_card::{self, ModelDeploymentCard}; use crate::model_type::{ModelInput, ModelType}; +use crate::preprocessor::media::{MediaDecoder, MediaFetcher}; use crate::request_template::RequestTemplate; pub mod runtime_config; @@ -52,6 +53,8 @@ pub struct LocalModelBuilder { namespace: Option, custom_backend_metrics_endpoint: Option, custom_backend_metrics_polling_interval: Option, + media_decoder: Option, + media_fetcher: Option, } impl Default for LocalModelBuilder { @@ -77,6 +80,8 @@ impl Default for LocalModelBuilder { namespace: Default::default(), custom_backend_metrics_endpoint: Default::default(), custom_backend_metrics_polling_interval: Default::default(), + media_decoder: Default::default(), + media_fetcher: Default::default(), } } } @@ -184,6 +189,16 @@ impl LocalModelBuilder { self } + pub fn media_decoder(&mut self, media_decoder: Option) -> &mut Self { + self.media_decoder = media_decoder; + self + } + + pub fn media_fetcher(&mut self, media_fetcher: Option) -> &mut Self { + self.media_fetcher = media_fetcher; + self + } + /// Make an LLM ready for use: /// - Download it from Hugging Face (and NGC in future) if necessary /// - Resolve the path @@ -219,6 +234,8 @@ impl LocalModelBuilder { self.runtime_config.max_num_batched_tokens = mocker_engine_args.max_num_batched_tokens.map(|v| v as u64); self.runtime_config.data_parallel_size = mocker_engine_args.dp_size; + self.media_decoder = Some(MediaDecoder::default()); + self.media_fetcher = Some(MediaFetcher::default()); } // frontend and echo engine don't need a path. @@ -230,6 +247,8 @@ impl LocalModelBuilder { card.migration_limit = self.migration_limit; card.user_data = self.user_data.take(); card.runtime_config = self.runtime_config.clone(); + card.media_decoder = self.media_decoder.clone(); + card.media_fetcher = self.media_fetcher.clone(); return Ok(LocalModel { card, @@ -280,6 +299,8 @@ impl LocalModelBuilder { card.migration_limit = self.migration_limit; card.user_data = self.user_data.take(); card.runtime_config = self.runtime_config.clone(); + card.media_decoder = self.media_decoder.clone(); + card.media_fetcher = self.media_fetcher.clone(); Ok(LocalModel { card, diff --git a/lib/llm/src/model_card.rs b/lib/llm/src/model_card.rs index f19c938cd2..0674e1c0a1 100644 --- a/lib/llm/src/model_card.rs +++ b/lib/llm/src/model_card.rs @@ -25,6 +25,7 @@ use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned}; use serde::{Deserialize, Serialize}; use tokenizers::Tokenizer as HfTokenizer; +use crate::preprocessor::media::{MediaDecoder, MediaFetcher}; use crate::protocols::TokenIdType; /// Identify model deployment cards in the key-value store @@ -217,6 +218,14 @@ pub struct ModelDeploymentCard { #[serde(default)] pub runtime_config: ModelRuntimeConfig, + /// Media decoding configuration + #[serde(default)] + pub media_decoder: Option, + + /// Media fetching configuration + #[serde(default)] + pub media_fetcher: Option, + #[serde(skip, default)] checksum: OnceLock, } @@ -520,6 +529,8 @@ impl ModelDeploymentCard { model_input: Default::default(), // set later user_data: None, runtime_config: ModelRuntimeConfig::default(), + media_decoder: None, + media_fetcher: None, checksum: OnceLock::new(), }) } diff --git a/lib/llm/src/preprocessor/media.rs b/lib/llm/src/preprocessor/media.rs index 5104af8e21..0c0e3e6b12 100644 --- a/lib/llm/src/preprocessor/media.rs +++ b/lib/llm/src/preprocessor/media.rs @@ -7,4 +7,4 @@ mod loader; pub use common::EncodedMediaData; pub use decoders::{Decoder, ImageDecoder, MediaDecoder}; -pub use loader::MediaLoader; +pub use loader::{MediaFetcher, MediaLoader};