From 4ffdb414c817c356ed7a5b278178058281eba012 Mon Sep 17 00:00:00 2001 From: Jeffrey Huber Date: Thu, 27 Nov 2025 22:44:28 -0600 Subject: [PATCH] AI code - run Chroma as a server via Rust --- Cargo.lock | 4 +- rust/chroma/Cargo.toml | 5 + rust/chroma/examples/run_server.rs | 31 +++++ rust/chroma/src/lib.rs | 3 + rust/chroma/src/server.rs | 172 ++++++++++++++++++++++++ rust/chroma/tests/server_integration.rs | 144 ++++++++++++++++++++ rust/frontend/Cargo.toml | 1 - 7 files changed, 358 insertions(+), 2 deletions(-) create mode 100644 rust/chroma/examples/run_server.rs create mode 100644 rust/chroma/src/server.rs create mode 100644 rust/chroma/tests/server_integration.rs diff --git a/Cargo.lock b/Cargo.lock index bee45a4d18a..33b672bcb78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1407,7 +1407,10 @@ dependencies = [ "backon", "bon", "chroma-api-types", + "chroma-config", "chroma-error", + "chroma-frontend", + "chroma-system", "chroma-types", "dotenvy", "futures-util", @@ -1602,7 +1605,6 @@ dependencies = [ "async-trait", "axum 0.8.1", "backon", - "chroma", "chroma-api-types", "chroma-cache", "chroma-config", diff --git a/rust/chroma/Cargo.toml b/rust/chroma/Cargo.toml index 59f5562e0fd..b61ca84754a 100644 --- a/rust/chroma/Cargo.toml +++ b/rust/chroma/Cargo.toml @@ -23,12 +23,17 @@ tracing.workspace = true chroma-api-types.workspace = true chroma-error.workspace = true chroma-types.workspace = true +chroma-frontend = { workspace = true, optional = true } +chroma-system = { workspace = true, optional = true } +chroma-config = { workspace = true, optional = true } +uuid = { workspace = true, optional = true } [features] default = ["rustls", "ollama"] native-tls = ["reqwest/default-tls", "reqwest/default"] rustls = ["reqwest/rustls-tls"] ollama = [] +server = ["chroma-frontend", "chroma-system", "chroma-config", "uuid"] [dev-dependencies] dotenvy = "0.15.7" diff --git a/rust/chroma/examples/run_server.rs b/rust/chroma/examples/run_server.rs new file mode 100644 index 00000000000..540e925f39d --- /dev/null +++ b/rust/chroma/examples/run_server.rs @@ -0,0 +1,31 @@ +//! Example: Run a local Chroma server from Rust +//! +//! Run with: cargo run -p chroma --features server --example run_server + +use chroma::server::{ChromaServer, FrontendServerConfig}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Configure the server + let mut config = FrontendServerConfig::single_node_default(); + config.port = 8000; + config.listen_address = "127.0.0.1".to_string(); + config.persist_path = "./chroma_data".to_string(); + + println!("Starting Chroma server on http://127.0.0.1:8000 ..."); + println!("Data will be stored in: ./chroma_data"); + + // Start the server (this will block until ready) + let server = ChromaServer::with_config(config).await?; + + println!("Server is ready at: {}", server.endpoint()); + println!("Press Ctrl+C to stop"); + + // Keep running until interrupted + tokio::signal::ctrl_c().await?; + + println!("\nShutting down..."); + drop(server); + + Ok(()) +} diff --git a/rust/chroma/src/lib.rs b/rust/chroma/src/lib.rs index c4d9907fdc3..b8cb007ea23 100644 --- a/rust/chroma/src/lib.rs +++ b/rust/chroma/src/lib.rs @@ -97,12 +97,15 @@ //! - `native-tls` - Use native system TLS (OpenSSL on Linux, Secure Transport on macOS) //! - `rustls` - Use pure-Rust TLS implementation //! - `opentelemetry` - Enable metrics collection for request latency and retry counts +//! - `server` - Enable running a local Chroma server (adds significant dependencies) #![deny(missing_docs)] pub mod client; mod collection; pub mod embed; +#[cfg(feature = "server")] +pub mod server; pub mod types; pub use client::ChromaHttpClient; diff --git a/rust/chroma/src/server.rs b/rust/chroma/src/server.rs new file mode 100644 index 00000000000..c54c63c5059 --- /dev/null +++ b/rust/chroma/src/server.rs @@ -0,0 +1,172 @@ +//! Server functionality for running a local Chroma instance. +//! +//! This module provides the ability to run a single-node Chroma server directly from Rust, +//! which can then be accessed via the HTTP client. +//! +//! # Example +//! +//! ```no_run +//! use chroma::{ChromaHttpClient, client::ChromaHttpClientOptions, client::ChromaAuthMethod}; +//! use chroma::server::ChromaServer; +//! +//! # async fn example() -> Result<(), Box> { +//! // Start a local Chroma server +//! let server = ChromaServer::local().await?; +//! +//! // Connect to it with the HTTP client +//! let client = ChromaHttpClient::new(ChromaHttpClientOptions { +//! endpoint: server.endpoint().parse()?, +//! auth_method: ChromaAuthMethod::None, +//! ..Default::default() +//! }); +//! +//! // Use the client +//! let collections = client.list_collections(None, None).await?; +//! +//! // Server shuts down when dropped +//! drop(server); +//! # Ok(()) +//! # } +//! ``` + +use std::sync::Arc; +use thiserror::Error; +use tokio::task::JoinHandle; + +pub use chroma_frontend::config::FrontendServerConfig; + +/// Errors that can occur when starting or running a Chroma server. +#[derive(Error, Debug)] +pub enum ChromaServerError { + /// Failed to create a directory for persistence. + #[error("Failed to create persistence directory: {0}")] + PersistenceError(#[from] std::io::Error), + /// Server failed to start within the timeout period. + #[error("Server failed to start within timeout")] + StartupTimeout, + /// Failed to connect to the server. + #[error("Failed to connect to server: {0}")] + ConnectionError(String), +} + +/// A running Chroma server instance. +/// +/// The server runs in a background task and will be shut down when this handle is dropped. +/// Use [`endpoint()`](Self::endpoint) to get the URL to connect to with [`ChromaHttpClient`](crate::ChromaHttpClient). +pub struct ChromaServer { + port: u16, + persist_path: String, + _handle: JoinHandle<()>, +} + +impl ChromaServer { + /// Start a local Chroma server with default settings. + /// + /// Uses port 8766 and a temporary storage directory. + /// The server will be shut down when the returned handle is dropped. + pub async fn local() -> Result { + let mut config = FrontendServerConfig::single_node_default(); + config.port = 8766; // Use a fixed port to avoid conflicts with default 8000 + config.listen_address = "127.0.0.1".to_string(); + + // Create a temp directory for persistence + let temp_dir = std::env::temp_dir().join(format!("chroma_test_{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&temp_dir)?; + config.persist_path = temp_dir.to_string_lossy().to_string(); + + Self::with_config(config).await + } + + /// Start a Chroma server with custom configuration. + /// + /// # Example + /// + /// ```no_run + /// use chroma::server::{ChromaServer, FrontendServerConfig}; + /// + /// # async fn example() -> Result<(), Box> { + /// let mut config = FrontendServerConfig::single_node_default(); + /// config.port = 8080; + /// config.persist_path = "./my_data".to_string(); + /// + /// let server = ChromaServer::with_config(config).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn with_config(config: FrontendServerConfig) -> Result { + let port = config.port; + let persist_path = config.persist_path.clone(); + let listen_address = config.listen_address.clone(); + + // Ensure persist directory exists + std::fs::create_dir_all(&persist_path)?; + + let handle = tokio::spawn(async move { + chroma_frontend::frontend_service_entrypoint_with_config( + Arc::new(()), + Arc::new(()), + &config, + false, // Don't initialize OTel tracing + ) + .await; + }); + + // Wait for server to be ready by polling the heartbeat endpoint + let endpoint = format!("http://{}:{}", listen_address, port); + Self::wait_for_ready(&endpoint).await?; + + Ok(Self { + port, + persist_path, + _handle: handle, + }) + } + + async fn wait_for_ready(endpoint: &str) -> Result<(), ChromaServerError> { + let client = reqwest::Client::new(); + let heartbeat_url = format!("{}/api/v2/heartbeat", endpoint); + + for _ in 0..60 { + // Wait up to 30 seconds (60 * 500ms) + match client.get(&heartbeat_url).send().await { + Ok(resp) if resp.status().is_success() => { + return Ok(()); + } + _ => { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + } + } + + Err(ChromaServerError::StartupTimeout) + } + + /// Get the HTTP endpoint URL for this server. + /// + /// Use this to configure a [`ChromaHttpClient`](crate::ChromaHttpClient) to connect to this server. + pub fn endpoint(&self) -> String { + format!("http://127.0.0.1:{}", self.port) + } + + /// Get the port the server is listening on. + pub fn port(&self) -> u16 { + self.port + } + + /// Get the persistence path where data is stored. + pub fn persist_path(&self) -> &str { + &self.persist_path + } +} + +impl Drop for ChromaServer { + fn drop(&mut self) { + // Abort the server task + self._handle.abort(); + + // Clean up temp directory if it looks like a test directory + if self.persist_path.contains("chroma_test_") { + let _ = std::fs::remove_dir_all(&self.persist_path); + } + } +} diff --git a/rust/chroma/tests/server_integration.rs b/rust/chroma/tests/server_integration.rs new file mode 100644 index 00000000000..2c84cf5e322 --- /dev/null +++ b/rust/chroma/tests/server_integration.rs @@ -0,0 +1,144 @@ +//! Integration tests for the embedded Chroma server. + +#![cfg(feature = "server")] + +use chroma::client::{ChromaAuthMethod, ChromaHttpClientOptions}; +use chroma::server::ChromaServer; +use chroma::ChromaHttpClient; + +#[tokio::test] +async fn test_local_server_starts_and_responds() { + // Start a local server + let server = ChromaServer::local().await.expect("Failed to start server"); + + // Connect with the HTTP client + let client = ChromaHttpClient::new(ChromaHttpClientOptions { + endpoint: server.endpoint().parse().unwrap(), + auth_method: ChromaAuthMethod::None, + ..Default::default() + }); + + // Test heartbeat + let heartbeat = client.heartbeat().await.expect("Heartbeat failed"); + assert!(heartbeat.nanosecond_heartbeat > 0); + + // Test listing collections (should be empty initially) + let collections = client + .list_collections(100, None) + .await + .expect("List collections failed"); + assert!(collections.is_empty()); +} + +#[tokio::test] +async fn test_create_and_delete_collection() { + let server = ChromaServer::local().await.expect("Failed to start server"); + + let client = ChromaHttpClient::new(ChromaHttpClientOptions { + endpoint: server.endpoint().parse().unwrap(), + auth_method: ChromaAuthMethod::None, + ..Default::default() + }); + + // Create a collection + let collection = client + .create_collection("test_collection".to_string(), None, None) + .await + .expect("Failed to create collection"); + + assert_eq!(collection.name(), "test_collection"); + + // List should show one collection + let collections = client + .list_collections(100, None) + .await + .expect("List collections failed"); + assert_eq!(collections.len(), 1); + + // Delete the collection + client + .delete_collection("test_collection".to_string()) + .await + .expect("Failed to delete collection"); + + // Verify it's gone + let collections = client + .list_collections(100, None) + .await + .expect("List collections failed"); + assert!(collections.is_empty()); +} + +#[tokio::test] +async fn test_add_and_query_documents() { + let server = ChromaServer::local().await.expect("Failed to start server"); + + let client = ChromaHttpClient::new(ChromaHttpClientOptions { + endpoint: server.endpoint().parse().unwrap(), + auth_method: ChromaAuthMethod::None, + ..Default::default() + }); + + // Create a collection + let collection = client + .create_collection("vector_test".to_string(), None, None) + .await + .expect("Failed to create collection"); + + // Add some documents with embeddings + let embeddings = vec![ + vec![1.0, 0.0, 0.0], + vec![0.0, 1.0, 0.0], + vec![0.0, 0.0, 1.0], + ]; + let ids = vec![ + "doc1".to_string(), + "doc2".to_string(), + "doc3".to_string(), + ]; + let documents = Some(vec![ + Some("First document".to_string()), + Some("Second document".to_string()), + Some("Third document".to_string()), + ]); + + collection + .add(ids, embeddings, documents, None, None) + .await + .expect("Failed to add documents"); + + // Count documents + let count = collection.count().await.expect("Failed to count"); + assert_eq!(count, 3); + + // Get a specific document + let results = collection + .get(Some(vec!["doc1".to_string()]), None, None, None, None) + .await + .expect("Failed to get documents"); + + assert_eq!(results.ids.len(), 1); + assert_eq!(results.ids[0], "doc1"); + + // Query by vector similarity + let query_results = collection + .query( + vec![vec![1.0, 0.1, 0.0]], // Query vector close to doc1 + Some(2), // n_results + None, // where + None, // where_document + None, // include + ) + .await + .expect("Failed to query"); + + // doc1 should be the closest match + assert!(!query_results.ids.is_empty()); + assert_eq!(query_results.ids[0][0], "doc1"); + + // Cleanup + client + .delete_collection("vector_test".to_string()) + .await + .expect("Failed to delete collection"); +} diff --git a/rust/frontend/Cargo.toml b/rust/frontend/Cargo.toml index 61445fe9f6e..cf1e08eae57 100644 --- a/rust/frontend/Cargo.toml +++ b/rust/frontend/Cargo.toml @@ -31,7 +31,6 @@ mdac = { workspace = true } opentelemetry.workspace = true validator = { workspace = true } rust-embed = { workspace = true } -chroma = { workspace = true } chroma-api-types = { workspace = true, features = ["utoipa"] } chroma-cache = { workspace = true } chroma-config = { workspace = true }