From 222412722fd61290c7a43ba2962dca3cfb953c76 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Thu, 9 Oct 2025 15:07:36 -0700 Subject: [PATCH 01/13] [ENH] s3heap, meet the compactor If you create a task, add data, watch, the compactor will print a SCHEDULED trace. --- Cargo.lock | 3 +- Cargo.toml | 1 + go/pkg/sysdb/coordinator/task.go | 10 +- rust/s3heap-service/Cargo.toml | 1 + rust/s3heap-service/src/lib.rs | 69 +++++++++-- rust/s3heap-service/src/scheduler.rs | 77 ++++++++++++ .../test_k8s_integration_00_heap_tender.rs | 63 +++++++--- rust/s3heap/Cargo.toml | 1 - rust/s3heap/src/dummy.rs | 19 --- rust/s3heap/src/internal.rs | 18 ++- rust/s3heap/src/lib.rs | 114 ++++++++++++------ rust/s3heap/tests/common.rs | 12 +- .../test_k8s_integration_01_empty_heap.rs | 11 ++ .../test_k8s_integration_02_basic_push.rs | 4 + .../test_k8s_integration_03_merge_buckets.rs | 4 + ...test_k8s_integration_04_prune_completed.rs | 6 + .../test_k8s_integration_05_peek_filtering.rs | 8 ++ .../test_k8s_integration_06_retry_logic.rs | 3 + ...t_k8s_integration_07_bucket_computation.rs | 24 +++- ...8s_integration_08_concurrent_operations.rs | 9 ++ rust/s3heap/tests/test_unit_tests.rs | 105 ++++++++++++---- rust/worker/Cargo.toml | 2 + .../src/compactor/compaction_manager.rs | 2 + rust/worker/src/compactor/mod.rs | 1 + rust/worker/src/compactor/scheduler.rs | 25 ++++ rust/worker/src/compactor/tasks.rs | 88 ++++++++++++++ 26 files changed, 560 insertions(+), 120 deletions(-) create mode 100644 rust/s3heap-service/src/scheduler.rs delete mode 100644 rust/s3heap/src/dummy.rs create mode 100644 rust/worker/src/compactor/tasks.rs diff --git a/Cargo.lock b/Cargo.lock index f95fedd49be..978f9367f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7141,7 +7141,6 @@ dependencies = [ "bytes", "chroma-error", "chroma-storage", - "chroma-sysdb", "chrono", "futures", "guacamole", @@ -10001,6 +10000,8 @@ dependencies = [ "random-port", "regex", "roaring", + "s3heap", + "s3heap-service", "serde", "serde_json", "serial_test", diff --git a/Cargo.toml b/Cargo.toml index 70b37e8c932..8b493162f4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ chroma-cli = { path = "rust/cli" } chroma-jemalloc-pprof-server = { path = "rust/jemalloc-pprof-server" } mdac = { path = "rust/mdac" } s3heap = { path = "rust/s3heap" } +s3heap-service = { path = "rust/s3heap-service" } wal3 = { path = "rust/wal3" } worker = { path = "rust/worker" } diff --git a/go/pkg/sysdb/coordinator/task.go b/go/pkg/sysdb/coordinator/task.go index 35a8205434f..eb6586a93fb 100644 --- a/go/pkg/sysdb/coordinator/task.go +++ b/go/pkg/sysdb/coordinator/task.go @@ -123,7 +123,7 @@ func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateT OperatorParams: paramsJSON, CompletionOffset: 0, LastRun: nil, - NextRun: nil, // Will be set to zero initially, scheduled by task scheduler + NextRun: &now, MinRecordsForTask: int64(req.MinRecordsForTask), CurrentAttempts: 0, CreatedAt: now, @@ -335,10 +335,10 @@ func (s *Coordinator) PeekScheduleByCollectionId(ctx context.Context, req *coord for _, task := range tasks { task_id := task.ID.String() entry := &coordinatorpb.ScheduleEntry{ - CollectionId: &task.InputCollectionID, - TaskId: &task_id, - TaskRunNonce: proto.String(task.NextNonce.String()), - WhenToRun: nil, + CollectionId: &task.InputCollectionID, + TaskId: &task_id, + TaskRunNonce: proto.String(task.NextNonce.String()), + WhenToRun: nil, } if task.NextRun != nil { whenToRun := uint64(task.NextRun.UnixMilli()) diff --git a/rust/s3heap-service/Cargo.toml b/rust/s3heap-service/Cargo.toml index c2244a6d2ac..d67f59bcf29 100644 --- a/rust/s3heap-service/Cargo.toml +++ b/rust/s3heap-service/Cargo.toml @@ -22,6 +22,7 @@ chroma-sysdb = { workspace = true } chroma-tracing = { workspace = true, features = ["grpc"] } chroma-types = { workspace = true } s3heap = { workspace = true } +uuid = { workspace = true } wal3 = { workspace = true } [dev-dependencies] diff --git a/rust/s3heap-service/src/lib.rs b/rust/s3heap-service/src/lib.rs index f0767f73ee2..5ebaba74187 100644 --- a/rust/s3heap-service/src/lib.rs +++ b/rust/s3heap-service/src/lib.rs @@ -20,23 +20,77 @@ use chroma_types::chroma_proto::heap_tender_service_server::{ }; use chroma_types::chroma_proto::{HeapSummaryRequest, HeapSummaryResponse}; use chroma_types::{dirty_log_path_from_hostname, CollectionUuid, DirtyMarker, ScheduleEntry}; -use s3heap::{Configuration, DummyScheduler, Error, HeapWriter, Schedule, Triggerable}; +use s3heap::{heap_path_from_hostname, Configuration, HeapWriter, Schedule, Triggerable}; use wal3::{ Cursor, CursorName, CursorStore, CursorStoreOptions, LogPosition, LogReader, LogReaderOptions, Witness, }; +mod scheduler; + +pub use scheduler::SysDbScheduler; + +/////////////////////////////////////////////// Error ////////////////////////////////////////////// + +/// Custom error type that can handle errors from multiple sources. +#[derive(Debug)] +pub enum Error { + /// Error from s3heap operations. + S3Heap(s3heap::Error), + /// Error from wal3 operations. + Wal3(wal3::Error), + /// Error from sysdb operations. + SysDb(chroma_sysdb::PeekScheduleError), + /// Error from JSON serialization/deserialization. + Json(serde_json::Error), + /// Internal error with a message. + Internal(String), +} + +impl From for Error { + fn from(e: s3heap::Error) -> Self { + Error::S3Heap(e) + } +} + +impl From for Error { + fn from(e: wal3::Error) -> Self { + Error::Wal3(e) + } +} + +impl From for Error { + fn from(e: chroma_sysdb::PeekScheduleError) -> Self { + Error::SysDb(e) + } +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::Json(e) + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::S3Heap(e) => write!(f, "s3heap error: {}", e), + Error::Wal3(e) => write!(f, "wal3 error: {}", e), + Error::SysDb(e) => write!(f, "sysdb error: {}", e), + Error::Json(e) => write!(f, "json error: {}", e), + Error::Internal(msg) => write!(f, "internal error: {}", msg), + } + } +} + +impl std::error::Error for Error {} + ///////////////////////////////////////////// constants //////////////////////////////////////////// const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml"; const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH"; -/// The path for the heap tended to on behalf of this hostname. -pub fn heap_path_from_hostname(hostname: &str) -> String { - format!("heap/{}", hostname) -} - /// The cursor name used by HeapTender to track its position in the dirty log. pub static HEAP_TENDER_CURSOR_NAME: CursorName = unsafe { CursorName::from_string_unchecked("heap_tender") }; @@ -295,8 +349,9 @@ impl Configurable for HeapTenderServer { "s3heap-tender".to_string(), ); let heap_prefix = heap_path_from_hostname(&config.my_member_id); - let scheduler = Arc::new(DummyScheduler) as _; + let scheduler = Arc::new(SysDbScheduler::new(sysdb.clone())) as _; let writer = HeapWriter::new(storage, heap_prefix, Arc::clone(&scheduler)) + .await .map_err(|e| -> Box { Box::new(e) })?; let tender = Arc::new(HeapTender { sysdb, diff --git a/rust/s3heap-service/src/scheduler.rs b/rust/s3heap-service/src/scheduler.rs new file mode 100644 index 00000000000..f65824082ea --- /dev/null +++ b/rust/s3heap-service/src/scheduler.rs @@ -0,0 +1,77 @@ +use std::collections::HashMap; + +use chroma_sysdb::SysDb; +use chroma_types::{CollectionUuid, ScheduleEntry}; +use s3heap::{HeapScheduler, Schedule, Triggerable}; +use uuid::Uuid; + +/// Scheduler that integrates with SysDb to manage task scheduling. +pub struct SysDbScheduler { + sysdb: SysDb, +} + +impl SysDbScheduler { + pub fn new(sysdb: SysDb) -> SysDbScheduler { + Self { sysdb } + } +} + +#[async_trait::async_trait] +impl HeapScheduler for SysDbScheduler { + async fn are_done(&self, items: &[(Triggerable, Uuid)]) -> Result, s3heap::Error> { + let collection_ids = items + .iter() + .map(|item| CollectionUuid(*item.0.partitioning.as_uuid())) + .collect::>(); + let schedules = self + .sysdb + .clone() + .peek_schedule_by_collection_id(&collection_ids) + .await + .map_err(|e| s3heap::Error::Internal(format!("sysdb error: {}", e)))?; + let mut by_triggerable: HashMap = HashMap::default(); + for schedule in schedules.into_iter() { + by_triggerable.insert( + Triggerable { + partitioning: schedule.collection_id.0.into(), + scheduling: schedule.task_id.into(), + }, + schedule, + ); + } + let mut results = Vec::with_capacity(items.len()); + for (triggerable, nonce) in items.iter() { + let Some(schedule) = by_triggerable.get(triggerable) else { + results.push(true); + continue; + }; + results.push(schedule.when_to_run.is_some() && schedule.task_run_nonce != *nonce); + } + Ok(results) + } + + async fn get_schedules(&self, ids: &[Uuid]) -> Result, s3heap::Error> { + let collection_ids = ids.iter().cloned().map(CollectionUuid).collect::>(); + let schedules = self + .sysdb + .clone() + .peek_schedule_by_collection_id(&collection_ids) + .await + .map_err(|e| s3heap::Error::Internal(format!("sysdb error: {}", e)))?; + let mut results = Vec::new(); + tracing::info!("schedules {schedules:?}"); + for schedule in schedules.into_iter() { + if let Some(when_to_run) = schedule.when_to_run { + results.push(Schedule { + triggerable: Triggerable { + partitioning: schedule.collection_id.0.into(), + scheduling: schedule.task_id.into(), + }, + nonce: schedule.task_run_nonce, + next_scheduled: when_to_run, + }); + } + } + Ok(results) + } +} diff --git a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs index 9c77e889d6f..ab1586c334c 100644 --- a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs +++ b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs @@ -5,16 +5,40 @@ use chroma_sysdb::{SysDb, TestSysDb}; use chroma_types::{CollectionUuid, DirtyMarker}; use wal3::{CursorStore, CursorStoreOptions, LogPosition, LogReader, LogReaderOptions}; -use s3heap::{DummyScheduler, Error, HeapWriter}; +use s3heap::HeapWriter; use s3heap_service::{HeapTender, HEAP_TENDER_CURSOR_NAME}; -fn test_heap_tender(storage: Storage, test_id: &str) -> HeapTender { +// Dummy scheduler for testing purposes +struct DummyScheduler; + +#[async_trait::async_trait] +impl s3heap::HeapScheduler for DummyScheduler { + async fn are_done( + &self, + items: &[(s3heap::Triggerable, uuid::Uuid)], + ) -> Result, s3heap::Error> { + Ok(vec![false; items.len()]) + } + + async fn get_schedules( + &self, + _ids: &[uuid::Uuid], + ) -> Result, s3heap::Error> { + Ok(vec![]) + } +} + +async fn test_heap_tender(storage: Storage, test_id: &str) -> HeapTender { let dirty_log_prefix = format!("test-dirty-log-{}", test_id); let heap_prefix = format!("test-heap-{}", test_id); - create_heap_tender(storage, &dirty_log_prefix, &heap_prefix) + create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await } -fn create_heap_tender(storage: Storage, dirty_log_prefix: &str, heap_prefix: &str) -> HeapTender { +async fn create_heap_tender( + storage: Storage, + dirty_log_prefix: &str, + heap_prefix: &str, +) -> HeapTender { let sysdb = SysDb::Test(TestSysDb::new()); let reader = LogReader::new( LogReaderOptions::default(), @@ -28,14 +52,16 @@ fn create_heap_tender(storage: Storage, dirty_log_prefix: &str, heap_prefix: &st "test-tender".to_string(), ); let scheduler = Arc::new(DummyScheduler) as _; - let writer = HeapWriter::new(storage, heap_prefix.to_string(), Arc::clone(&scheduler)).unwrap(); + let writer = HeapWriter::new(storage, heap_prefix.to_string(), Arc::clone(&scheduler)) + .await + .unwrap(); HeapTender::new(sysdb, reader, cursor, writer) } #[tokio::test] async fn test_k8s_integration_empty_dirty_log_returns_empty_list() { let storage = chroma_storage::s3_client_for_test_with_new_bucket().await; - let tender = test_heap_tender(storage, "empty"); + let tender = test_heap_tender(storage, "empty").await; let result = tender.read_and_coalesce_dirty_log().await; if let Err(ref e) = result { @@ -75,7 +101,7 @@ async fn test_k8s_integration_single_mark_dirty_returns_collection() { let marker_bytes = serde_json::to_vec(&marker).unwrap(); log_writer.append(marker_bytes).await.unwrap(); - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); @@ -131,7 +157,7 @@ async fn test_k8s_integration_multiple_markers_same_collection_keeps_max() { log_writer.append(marker_bytes).await.unwrap(); } - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); @@ -181,7 +207,7 @@ async fn test_k8s_integration_reinsert_count_nonzero_filters_marker() { log_writer.append(marker_bytes).await.unwrap(); } - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); @@ -235,7 +261,7 @@ async fn test_k8s_integration_purge_and_cleared_markers_ignored() { log_writer.append(marker_bytes).await.unwrap(); } - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); @@ -280,7 +306,7 @@ async fn test_k8s_integration_multiple_collections_all_processed() { log_writer.append(marker_bytes).await.unwrap(); } - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); @@ -320,7 +346,7 @@ async fn test_k8s_integration_cursor_initialized_on_first_run() { let marker_bytes = serde_json::to_vec(&marker).unwrap(); log_writer.append(marker_bytes).await.unwrap(); - let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix).await; let result = tender.tend_to_heap().await; assert!(result.is_ok()); @@ -366,7 +392,7 @@ async fn test_k8s_integration_cursor_advances_on_subsequent_runs() { .await .unwrap(); - let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix).await; tender.tend_to_heap().await.unwrap(); @@ -430,7 +456,7 @@ async fn test_k8s_integration_cursor_not_updated_when_no_new_data() { .await .unwrap(); - let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage.clone(), &dirty_log_prefix, &heap_prefix).await; tender.tend_to_heap().await.unwrap(); @@ -470,11 +496,14 @@ async fn test_k8s_integration_invalid_json_in_dirty_log_fails() { let invalid_json = b"not valid json at all".to_vec(); log_writer.append(invalid_json).await.unwrap(); - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_err()); - assert!(matches!(result.unwrap_err(), Error::Json(_))); + assert!(matches!( + result.unwrap_err(), + s3heap_service::Error::Json(_) + )); } #[tokio::test] @@ -514,7 +543,7 @@ async fn test_k8s_integration_handles_empty_markers_after_filtering() { .unwrap(); } - let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix); + let tender = create_heap_tender(storage, &dirty_log_prefix, &heap_prefix).await; let result = tender.read_and_coalesce_dirty_log().await; assert!(result.is_ok()); diff --git a/rust/s3heap/Cargo.toml b/rust/s3heap/Cargo.toml index a8f18c2d83a..7eef8ee6624 100644 --- a/rust/s3heap/Cargo.toml +++ b/rust/s3heap/Cargo.toml @@ -18,7 +18,6 @@ uuid = { workspace = true } chroma-error = { workspace = true } chroma-storage = { workspace = true } -chroma-sysdb = { workspace = true } wal3 = { workspace = true } [dev-dependencies] diff --git a/rust/s3heap/src/dummy.rs b/rust/s3heap/src/dummy.rs deleted file mode 100644 index 31f9701b679..00000000000 --- a/rust/s3heap/src/dummy.rs +++ /dev/null @@ -1,19 +0,0 @@ -use uuid::Uuid; - -use crate::{Error, HeapScheduler, Schedule, Triggerable}; - -/// A dummy scheduler implementation for testing purposes. -/// -/// This scheduler always reports that items are not done and have no scheduled times. -pub struct DummyScheduler; - -#[async_trait::async_trait] -impl HeapScheduler for DummyScheduler { - async fn are_done(&self, items: &[(Triggerable, Uuid)]) -> Result, Error> { - Ok(vec![false; items.len()]) - } - - async fn get_schedules(&self, ids: &[Uuid]) -> Result>, Error> { - Ok(vec![None; ids.len()]) - } -} diff --git a/rust/s3heap/src/internal.rs b/rust/s3heap/src/internal.rs index a8a7f82b965..e858e7985bb 100644 --- a/rust/s3heap/src/internal.rs +++ b/rust/s3heap/src/internal.rs @@ -200,6 +200,7 @@ impl Internal { .await?; first_1k .into_iter() + .filter(|x| !x.ends_with("INIT")) .map(|p| -> Result<_, Error> { let Some(dt) = p .strip_prefix(&self.prefix) @@ -563,10 +564,25 @@ fn construct_parquet(items: &[HeapItem]) -> Result, Error> { #[cfg(test)] mod tests { use super::*; + use crate::Schedule; use chrono::TimeZone; use std::time::Duration; - use crate::DummyScheduler; + /// A dummy scheduler implementation for testing purposes. + /// + /// This scheduler always reports that items are not done and have no scheduled times. + pub struct DummyScheduler; + + #[async_trait::async_trait] + impl HeapScheduler for DummyScheduler { + async fn are_done(&self, items: &[(Triggerable, Uuid)]) -> Result, Error> { + Ok(vec![false; items.len()]) + } + + async fn get_schedules(&self, _ids: &[Uuid]) -> Result, Error> { + Ok(vec![]) + } + } // HeapItem tests #[test] diff --git a/rust/s3heap/src/lib.rs b/rust/s3heap/src/lib.rs index 7ca20badb6b..65e36f20998 100644 --- a/rust/s3heap/src/lib.rs +++ b/rust/s3heap/src/lib.rs @@ -39,7 +39,7 @@ //! //! // Create heap components //! let scheduler = Arc::new(MyScheduler); -//! let writer = HeapWriter::new("my-heap".to_string(), storage, scheduler); +//! let writer = HeapWriter::new(storage, "my-heap".to_string(), scheduler).await?; //! //! // Schedule tasks //! let schedules = vec![ @@ -82,14 +82,35 @@ use chroma_storage::Storage; use chrono::{DateTime, Utc}; use uuid::Uuid; -// TODO(rescrv): Clean this up once the real pieces are doable. -mod dummy; mod internal; use internal::Internal; -pub use dummy::DummyScheduler; pub use internal::HeapItem; +////////////////////////////////////////////// heap_path /////////////////////////////////////////// + +/// Compute the heap path from a hostname. +/// +/// This function generates the S3 prefix for a heap based on the hostname +/// of the service instance managing it. The format is `heap/{hostname}`. +/// +/// # Arguments +/// * `hostname` - The hostname of the service instance +/// +/// # Returns +/// The S3 prefix path for the heap +/// +/// # Examples +/// ``` +/// use s3heap::heap_path_from_hostname; +/// +/// let path = heap_path_from_hostname("rust-log-service-0"); +/// assert_eq!(path, "heap/rust-log-service-0"); +/// ``` +pub fn heap_path_from_hostname(hostname: &str) -> String { + format!("heap/{}", hostname) +} + /////////////////////////////////////////////// Error ////////////////////////////////////////////// /// Errors that can occur during heap operations. @@ -117,12 +138,12 @@ pub enum Error { /// Invalid prefix format #[error("invalid prefix: {0}")] InvalidPrefix(String), + /// Uninitialized heap + #[error("uninitialized heap: {0}")] + UninitializedHeap(String), /// Storage backend error #[error("storage error: {0}")] Storage(#[from] chroma_storage::StorageError), - /// wal3 error - #[error("wal3 error: {0}")] - Wal3(#[from] wal3::Error), /// UUID parsing error #[error("uuid error: {0}")] Uuid(#[from] uuid::Error), @@ -142,9 +163,6 @@ pub enum Error { /// Date rounding error #[error("could not round date: {0}")] RoundError(#[from] chrono::RoundingError), - /// SysDb error - #[error("sysdb error: {0}")] - SysDb(#[from] chroma_sysdb::PeekScheduleError), } impl chroma_error::ChromaError for Error { @@ -157,15 +175,14 @@ impl chroma_error::ChromaError for Error { Error::InvalidBucket(_) => ErrorCodes::InvalidArgument, Error::PartialLoadFailure(..) => ErrorCodes::Internal, Error::InvalidPrefix(_) => ErrorCodes::InvalidArgument, + Error::UninitializedHeap(_) => ErrorCodes::FailedPrecondition, Error::Storage(e) => e.code(), - Error::Wal3(e) => e.code(), Error::Uuid(_) => ErrorCodes::InvalidArgument, Error::Parquet(_) => ErrorCodes::Internal, Error::Json(_) => ErrorCodes::Internal, Error::Arrow(_) => ErrorCodes::Internal, Error::ParseDate(_) => ErrorCodes::InvalidArgument, Error::RoundError(_) => ErrorCodes::Internal, - Error::SysDb(e) => e.code(), } } } @@ -293,7 +310,7 @@ impl RetryConfig { /// // Create custom limits /// let custom_limits = Limits::default().with_buckets(100); /// ``` -#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] pub struct Limits { /// Maximum number of buckets to read during a scan operation. /// If None, defaults to 1000 buckets. @@ -506,11 +523,11 @@ pub struct Schedule { /// async fn get_schedules( /// &self, /// ids: &[Uuid], -/// ) -> Result>, Error> { +/// ) -> Result, Error> { /// // Retrieve scheduled tasks from your system /// let schedules = self.schedules.lock(); /// Ok(ids.iter() -/// .map(|id| schedules.get(id).cloned()) +/// .filter_map(|id| schedules.get(id).cloned()) /// .collect()) /// } /// } @@ -558,19 +575,19 @@ pub trait HeapScheduler: Send + Sync { /// * `id` - The unique identifier of the scheduled task /// /// # Returns - /// * `Ok(Some((Triggerable, DateTime, Uuid)))` if the task exists - /// * `Ok(None)` if the task does not exist - /// * `Err` if there was an error retrieving the schedule + /// * `Ok(Some(Schedule))` if exactly one schedule exists for the task + /// * `Ok(None)` if no schedules exist for the task + /// * `Err` if there was an error retrieving the schedule or if multiple schedules exist async fn get_schedule(&self, id: Uuid) -> Result, Error> { - let mut results = self.get_schedules(&[id]).await?; - if results.len() != 1 { - return Err(Error::Internal(format!( - "get_schedules returned {} results for 1 item", - results.len() - ))); + let results = self.get_schedules(&[id]).await?; + match results.len() { + 0 => Ok(None), + 1 => Ok(Some(results.into_iter().next().unwrap())), + n => Err(Error::Internal(format!( + "get_schedules returned {} schedules for 1 id (expected 0 or 1)", + n + ))), } - // SAFETY(rescrv): result.len() == 1 - Ok(results.pop().unwrap()) } /// Get the schedules for multiple tasks by their IDs. @@ -579,13 +596,13 @@ pub trait HeapScheduler: Send + Sync { /// * `ids` - The unique identifiers of the scheduled tasks /// /// # Returns - /// * `Ok(Vec, Uuid)>>)` with one entry per ID + /// * `Ok(Vec)` containing all schedules for the given IDs /// * `Err` if there was an error retrieving the schedules /// - /// # Implementation Requirements - /// The returned vector must have exactly the same length as the input slice. - /// result[i] = get_schedule(ids[i]) - async fn get_schedules(&self, ids: &[Uuid]) -> Result>, Error>; + /// # Implementation Notes + /// The returned vector may contain zero, one, or many schedules per ID. + /// The length of the returned vector is not required to match the input slice length. + async fn get_schedules(&self, ids: &[Uuid]) -> Result, Error>; } //////////////////////////////////////////// HeapWriter //////////////////////////////////////////// @@ -615,10 +632,10 @@ pub trait HeapScheduler: Send + Sync { /// use uuid::Uuid; /// /// let writer = HeapWriter::new( -/// "my-heap".to_string(), /// storage, +/// "my-heap".to_string(), /// scheduler, -/// ); +/// ).await?; /// /// // Schedule a batch of tasks /// let schedules = vec![ @@ -671,15 +688,21 @@ impl HeapWriter { /// "production/task-queue".to_string(), /// storage, /// Arc::new(scheduler), - /// )?; + /// ).await?; /// ``` - pub fn new( + pub async fn new( storage: Storage, prefix: String, heap_scheduler: Arc, ) -> Result { let config = Configuration::default(); validate_prefix(&prefix)?; + + let init_path = format!("{}/INIT", prefix); + storage + .put_bytes(&init_path, vec![], chroma_storage::PutOptions::default()) + .await?; + Ok(Self { internal: Internal::new(storage, prefix, heap_scheduler, config.backoff.clone()), config, @@ -1057,6 +1080,7 @@ impl HeapReader { /// /// - Returns [`Error::InvalidPrefix`] if `prefix` is empty /// - Returns [`Error::InvalidPrefix`] if `prefix` contains "//" (double slashes) + /// - Returns [`Error::UninitializedHeap`] if the heap has not been initialized with a HeapWriter /// /// # Examples /// @@ -1068,15 +1092,31 @@ impl HeapReader { /// "production/task-queue".to_string(), /// storage, /// Arc::new(scheduler), - /// )?; + /// ).await?; /// ``` - pub fn new( + pub async fn new( storage: Storage, prefix: String, heap_scheduler: Arc, ) -> Result { let config = Configuration::default(); validate_prefix(&prefix)?; + + let init_path = format!("{}/INIT", prefix); + match storage + .get(&init_path, chroma_storage::GetOptions::default()) + .await + { + Ok(_) => {} + Err(chroma_storage::StorageError::NotFound { .. }) => { + return Err(Error::UninitializedHeap(format!( + "heap at prefix '{}' has not been initialized", + prefix + ))); + } + Err(e) => return Err(Error::Storage(e)), + } + Ok(Self { internal: Internal::new(storage, prefix, heap_scheduler, config.backoff), }) diff --git a/rust/s3heap/tests/common.rs b/rust/s3heap/tests/common.rs index 25e8c00cf48..6b22cd84a88 100644 --- a/rust/s3heap/tests/common.rs +++ b/rust/s3heap/tests/common.rs @@ -90,9 +90,12 @@ impl HeapScheduler for MockHeapScheduler { .collect()) } - async fn get_schedules(&self, ids: &[Uuid]) -> Result>, Error> { + async fn get_schedules(&self, ids: &[Uuid]) -> Result, Error> { let schedules = self.schedules.lock(); - Ok(ids.iter().map(|id| schedules.get(id).cloned()).collect()) + Ok(ids + .iter() + .filter_map(|id| schedules.get(id).cloned()) + .collect()) } } @@ -201,7 +204,10 @@ pub async fn verify_bucket_count( let buckets = storage .list_prefix(prefix, GetOptions::default()) .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|x| !x.ends_with("/INIT")) + .collect::>(); assert_eq!(buckets.len(), expected_count, "{}", message); buckets } diff --git a/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs b/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs index f091a624421..3440f5c222b 100644 --- a/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs +++ b/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs @@ -9,12 +9,22 @@ async fn test_k8s_integration_01_empty_heap() { let prefix = "test_k8s_integration_01_empty_heap"; let (storage, scheduler) = setup_test_environment().await; + // Initialize heap with writer first + let _writer = HeapWriter::new( + storage.clone(), + prefix.to_string().clone(), + scheduler.clone(), + ) + .await + .unwrap(); + // Create reader and verify empty heap let reader = HeapReader::new( storage.clone(), prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Peek should return empty results @@ -36,6 +46,7 @@ async fn test_k8s_integration_01_empty_writer() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Push empty list should succeed diff --git a/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs b/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs index 8ce215828f8..cded1fa4b02 100644 --- a/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs +++ b/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs @@ -28,6 +28,7 @@ async fn test_k8s_integration_02_basic_push() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1.clone(), schedule2.clone()]) @@ -49,6 +50,7 @@ async fn test_k8s_integration_02_basic_push() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 2, "Should read 2 items back"); @@ -86,6 +88,7 @@ async fn test_k8s_integration_02_push_with_no_schedule() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&[schedule2.clone()]).await.unwrap(); @@ -104,6 +107,7 @@ async fn test_k8s_integration_02_push_with_no_schedule() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 1, "Should have only 1 scheduled item"); diff --git a/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs b/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs index 60e95d71dc4..cd6271054a5 100644 --- a/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs +++ b/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs @@ -46,6 +46,7 @@ async fn test_k8s_integration_03_merge_same_bucket() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1.clone(), schedule2.clone(), schedule3.clone()]) @@ -67,6 +68,7 @@ async fn test_k8s_integration_03_merge_same_bucket() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 3, "Should read all 3 items from single bucket"); @@ -83,6 +85,7 @@ async fn test_k8s_integration_03_merge_multiple_pushes() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // First push - 2 items to same bucket @@ -146,6 +149,7 @@ async fn test_k8s_integration_03_merge_multiple_pushes() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 4, "Should have all 4 items after merging"); diff --git a/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs b/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs index def928cfa6c..6f4dabe0a34 100644 --- a/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs +++ b/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs @@ -34,6 +34,7 @@ async fn test_k8s_integration_04_prune_completed_items() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1.clone(), schedule2.clone(), schedule3.clone()]) @@ -55,6 +56,7 @@ async fn test_k8s_integration_04_prune_completed_items() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!( @@ -93,6 +95,7 @@ async fn test_k8s_integration_04_prune_empty_bucket() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1.clone(), schedule2.clone()]) @@ -117,6 +120,7 @@ async fn test_k8s_integration_04_prune_empty_bucket() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!( @@ -160,6 +164,7 @@ async fn test_k8s_integration_04_prune_multiple_buckets() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[ @@ -186,6 +191,7 @@ async fn test_k8s_integration_04_prune_multiple_buckets() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 2, "Two incomplete items should remain"); diff --git a/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs b/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs index bc5f03027ff..dcd095b6f41 100644 --- a/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs +++ b/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs @@ -61,6 +61,7 @@ async fn test_k8s_integration_05_peek_all_items() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[ @@ -78,6 +79,7 @@ async fn test_k8s_integration_05_peek_all_items() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Verify all items are present @@ -138,6 +140,7 @@ async fn test_k8s_integration_05_peek_with_filter() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[ @@ -155,6 +158,7 @@ async fn test_k8s_integration_05_peek_with_filter() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Filter to only get items 2 and 4 by their scheduling UUIDs @@ -238,6 +242,7 @@ async fn test_k8s_integration_05_peek_filters_completed() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1.clone(), schedule2.clone(), schedule3.clone()]) @@ -250,6 +255,7 @@ async fn test_k8s_integration_05_peek_filters_completed() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 1, "Should only return incomplete items"); @@ -308,6 +314,7 @@ async fn test_k8s_integration_05_peek_across_buckets() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[ @@ -324,6 +331,7 @@ async fn test_k8s_integration_05_peek_across_buckets() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Verify all items across buckets diff --git a/rust/s3heap/tests/test_k8s_integration_06_retry_logic.rs b/rust/s3heap/tests/test_k8s_integration_06_retry_logic.rs index 7db11f0c56f..2d5221621ac 100644 --- a/rust/s3heap/tests/test_k8s_integration_06_retry_logic.rs +++ b/rust/s3heap/tests/test_k8s_integration_06_retry_logic.rs @@ -24,12 +24,14 @@ async fn test_k8s_integration_06_concurrent_writes_with_retry() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let writer2 = HeapWriter::new( storage.clone(), prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); // Create items that go to same bucket @@ -85,6 +87,7 @@ async fn test_k8s_integration_06_prune_with_retry() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&[schedule]).await.unwrap(); diff --git a/rust/s3heap/tests/test_k8s_integration_07_bucket_computation.rs b/rust/s3heap/tests/test_k8s_integration_07_bucket_computation.rs index b4301f60d74..8a5c8abf1a4 100644 --- a/rust/s3heap/tests/test_k8s_integration_07_bucket_computation.rs +++ b/rust/s3heap/tests/test_k8s_integration_07_bucket_computation.rs @@ -55,6 +55,7 @@ async fn test_k8s_integration_07_bucket_rounding() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer .push(&[schedule1, schedule2, schedule3, schedule4]) @@ -65,7 +66,10 @@ async fn test_k8s_integration_07_bucket_rounding() { let buckets = storage .list_prefix(prefix, GetOptions::default()) .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|x| !x.ends_with("INIT")) + .collect::>(); assert_eq!( buckets.len(), 1, @@ -106,6 +110,7 @@ async fn test_k8s_integration_07_bucket_boundaries() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&[schedule1, schedule2]).await.unwrap(); @@ -113,7 +118,10 @@ async fn test_k8s_integration_07_bucket_boundaries() { let buckets = storage .list_prefix(prefix, GetOptions::default()) .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|x| !x.ends_with("INIT")) + .collect::>(); assert_eq!( buckets.len(), 2, @@ -146,6 +154,7 @@ async fn test_k8s_integration_07_bucket_path_format() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&[schedule]).await.unwrap(); @@ -153,7 +162,10 @@ async fn test_k8s_integration_07_bucket_path_format() { let buckets = storage .list_prefix(prefix, GetOptions::default()) .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|x| !x.ends_with("INIT")) + .collect::>(); assert_eq!(buckets.len(), 1); let bucket_path = &buckets[0]; @@ -195,6 +207,7 @@ async fn test_k8s_integration_07_multiple_buckets_ordering() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&schedules).await.unwrap(); @@ -202,7 +215,10 @@ async fn test_k8s_integration_07_multiple_buckets_ordering() { let buckets = storage .list_prefix(prefix, GetOptions::default()) .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|x| !x.ends_with("INIT")) + .collect::>(); assert_eq!( buckets.len(), 5, diff --git a/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs b/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs index f512cd82a54..e76a2dd4c36 100644 --- a/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs +++ b/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs @@ -41,6 +41,7 @@ async fn test_k8s_integration_08_concurrent_pushes() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let schedules: Vec<_> = (0..items_per_writer) .map(|j| { @@ -68,6 +69,7 @@ async fn test_k8s_integration_08_concurrent_pushes() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!( @@ -105,6 +107,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&initial_schedules).await.unwrap(); @@ -119,6 +122,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let scheduler_clone = scheduler.clone(); @@ -148,6 +152,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); read_handles.push(tokio::spawn(async move { @@ -172,6 +177,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let final_items = reader.peek(|_| true, Limits::default()).await.unwrap(); assert_eq!( @@ -214,6 +220,7 @@ async fn test_k8s_integration_08_concurrent_prune_push() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); writer.push(&initial_schedules).await.unwrap(); @@ -233,6 +240,7 @@ async fn test_k8s_integration_08_concurrent_prune_push() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let scheduler_clone = scheduler.clone(); let write_handle = tokio::spawn(async move { @@ -261,6 +269,7 @@ async fn test_k8s_integration_08_concurrent_prune_push() { prefix.to_string().clone(), scheduler.clone(), ) + .await .unwrap(); let final_items = reader.peek(|_| true, Limits::default()).await.unwrap(); diff --git a/rust/s3heap/tests/test_unit_tests.rs b/rust/s3heap/tests/test_unit_tests.rs index e0f2cd041c8..3b308d22c62 100644 --- a/rust/s3heap/tests/test_unit_tests.rs +++ b/rust/s3heap/tests/test_unit_tests.rs @@ -1,7 +1,7 @@ use parking_lot::Mutex; use s3heap::{ - DummyScheduler, Error, HeapPruner, HeapReader, HeapScheduler, HeapWriter, Limits, PruneStats, - RetryConfig, Schedule, Triggerable, + Error, HeapPruner, HeapReader, HeapScheduler, HeapWriter, Limits, PruneStats, RetryConfig, + Schedule, Triggerable, }; use std::collections::HashMap; use std::str::FromStr; @@ -9,6 +9,20 @@ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; +// Dummy scheduler for testing purposes +struct DummyScheduler; + +#[async_trait::async_trait] +impl HeapScheduler for DummyScheduler { + async fn are_done(&self, items: &[(Triggerable, Uuid)]) -> Result, Error> { + Ok(vec![false; items.len()]) + } + + async fn get_schedules(&self, _ids: &[Uuid]) -> Result, Error> { + Ok(vec![]) + } +} + // More sophisticated test scheduler for comprehensive testing struct ConfigurableScheduler { #[allow(clippy::type_complexity)] @@ -49,22 +63,22 @@ impl HeapScheduler for ConfigurableScheduler { .collect()) } - async fn get_schedules(&self, ids: &[Uuid]) -> Result>, Error> { + async fn get_schedules(&self, ids: &[Uuid]) -> Result, Error> { let scheduled_items = self.scheduled_items.lock(); Ok(ids .iter() - .map(|id| scheduled_items.get(id).cloned().flatten()) + .filter_map(|id| scheduled_items.get(id).cloned().flatten()) .collect()) } } // Tests for prefix validation -#[test] -fn heap_components_error_on_empty_prefix() { +#[tokio::test] +async fn heap_components_error_on_empty_prefix() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); - let writer_result = HeapWriter::new(storage.clone(), String::new(), scheduler.clone()); + let writer_result = HeapWriter::new(storage.clone(), String::new(), scheduler.clone()).await; assert!(writer_result.is_err()); match writer_result { Err(Error::InvalidPrefix(msg)) => assert!(msg.contains("empty")), @@ -78,7 +92,7 @@ fn heap_components_error_on_empty_prefix() { _ => panic!("Expected InvalidPrefix error for HeapPruner"), } - let reader_result = HeapReader::new(storage, String::new(), scheduler); + let reader_result = HeapReader::new(storage, String::new(), scheduler).await; assert!(reader_result.is_err()); match reader_result { Err(Error::InvalidPrefix(msg)) => assert!(msg.contains("empty")), @@ -86,8 +100,8 @@ fn heap_components_error_on_empty_prefix() { } } -#[test] -fn heap_components_accept_valid_prefix() { +#[tokio::test] +async fn heap_components_accept_valid_prefix() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); @@ -97,6 +111,7 @@ fn heap_components_accept_valid_prefix() { "valid-prefix".to_string(), scheduler.clone(), ) + .await .unwrap(); let _pruner = HeapPruner::new( storage.clone(), @@ -104,11 +119,13 @@ fn heap_components_accept_valid_prefix() { scheduler.clone(), ) .unwrap(); - let _reader = HeapReader::new(storage, "valid-prefix".to_string(), scheduler).unwrap(); + let _reader = HeapReader::new(storage, "valid-prefix".to_string(), scheduler) + .await + .unwrap(); } -#[test] -fn heap_components_error_on_double_slash() { +#[tokio::test] +async fn heap_components_error_on_double_slash() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); @@ -116,7 +133,8 @@ fn heap_components_error_on_double_slash() { storage.clone(), "prefix//with//slashes".to_string(), scheduler.clone(), - ); + ) + .await; assert!(writer_result.is_err()); match writer_result { Err(Error::InvalidPrefix(msg)) => assert!(msg.contains("double slashes")), @@ -134,7 +152,8 @@ fn heap_components_error_on_double_slash() { _ => panic!("Expected InvalidPrefix error for HeapPruner"), } - let reader_result = HeapReader::new(storage, "prefix//with//slashes".to_string(), scheduler); + let reader_result = + HeapReader::new(storage, "prefix//with//slashes".to_string(), scheduler).await; assert!(reader_result.is_err()); match reader_result { Err(Error::InvalidPrefix(msg)) => assert!(msg.contains("double slashes")), @@ -185,14 +204,14 @@ fn limits_equality() { } #[test] -fn limits_clone() { +fn limits_copy() { let original = Limits { buckets_to_read: Some(500), max_items: None, }; - let cloned = original.clone(); - assert_eq!(original, cloned); - assert_eq!(cloned.buckets_to_read, Some(500)); + let copied = original; + assert_eq!(original, copied); + assert_eq!(copied.buckets_to_read, Some(500)); } // Tests for Triggerable @@ -273,7 +292,9 @@ fn error_display() { async fn writer_push_empty_items() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); - let writer = HeapWriter::new(storage, "test-prefix".to_string(), scheduler).unwrap(); + let writer = HeapWriter::new(storage, "test-prefix".to_string(), scheduler) + .await + .unwrap(); // Pushing empty items should succeed without doing anything let result = writer.push(&[]).await; @@ -284,7 +305,9 @@ async fn writer_push_empty_items() { async fn writer_push_with_no_scheduled_items() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(ConfigurableScheduler::new()); - let writer = HeapWriter::new(storage, "test-no-schedule".to_string(), scheduler).unwrap(); + let writer = HeapWriter::new(storage, "test-no-schedule".to_string(), scheduler) + .await + .unwrap(); // Push empty schedules should succeed but not create any buckets let result = writer.push(&[]).await; @@ -296,7 +319,9 @@ async fn writer_push_empty_schedules_succeeds() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(ConfigurableScheduler::new()); - let writer = HeapWriter::new(storage, "test-empty".to_string(), scheduler).unwrap(); + let writer = HeapWriter::new(storage, "test-empty".to_string(), scheduler) + .await + .unwrap(); let result = writer.push(&[]).await; assert!(result.is_ok()); @@ -335,7 +360,17 @@ async fn pruner_respects_limits() { async fn reader_peek_empty_heap() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); - let reader = HeapReader::new(storage, "empty-reader".to_string(), scheduler).unwrap(); + // Initialize heap first + let _writer = HeapWriter::new( + storage.clone(), + "empty-reader".to_string(), + scheduler.clone(), + ) + .await + .unwrap(); + let reader = HeapReader::new(storage, "empty-reader".to_string(), scheduler) + .await + .unwrap(); let items = reader.peek(|_| true, Limits::default()).await; assert!(items.is_ok()); @@ -346,7 +381,17 @@ async fn reader_peek_empty_heap() { async fn reader_peek_with_filter() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); - let reader = HeapReader::new(storage, "filtered-reader".to_string(), scheduler).unwrap(); + // Initialize heap first + let _writer = HeapWriter::new( + storage.clone(), + "filtered-reader".to_string(), + scheduler.clone(), + ) + .await + .unwrap(); + let reader = HeapReader::new(storage, "filtered-reader".to_string(), scheduler) + .await + .unwrap(); // Filter that rejects everything let items = reader.peek(|_| false, Limits::default()).await; @@ -358,7 +403,17 @@ async fn reader_peek_with_filter() { async fn reader_respects_limits() { let (_temp_dir, storage) = chroma_storage::test_storage(); let scheduler = Arc::new(DummyScheduler); - let reader = HeapReader::new(storage, "limited-reader".to_string(), scheduler).unwrap(); + // Initialize heap first + let _writer = HeapWriter::new( + storage.clone(), + "limited-reader".to_string(), + scheduler.clone(), + ) + .await + .unwrap(); + let reader = HeapReader::new(storage, "limited-reader".to_string(), scheduler) + .await + .unwrap(); let limits = Limits { buckets_to_read: Some(3), diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index aec35b8da9b..c6c82ef09d6 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -61,6 +61,8 @@ chroma-system = { workspace = true } chroma-sysdb = { workspace = true } chroma-types = { workspace = true } chroma-jemalloc-pprof-server = { workspace = true } +s3heap = { workspace = true } +s3heap-service = { workspace = true } fastrace = "0.7" fastrace-opentelemetry = "0.8" diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index c5bd5404419..1cff1b647ca 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -432,6 +432,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager { my_ip, log.clone(), sysdb.clone(), + storage.clone(), policy, max_concurrent_jobs, min_compaction_size, @@ -901,6 +902,7 @@ mod tests { my_member.member_id.clone(), log.clone(), sysdb.clone(), + storage.clone(), Box::new(LasCompactionTimeSchedulerPolicy {}), max_concurrent_jobs, min_compaction_size, diff --git a/rust/worker/src/compactor/mod.rs b/rust/worker/src/compactor/mod.rs index 1c07272a7bb..7b7ff1191d2 100644 --- a/rust/worker/src/compactor/mod.rs +++ b/rust/worker/src/compactor/mod.rs @@ -2,6 +2,7 @@ mod compaction_manager; pub(crate) mod config; mod scheduler; mod scheduler_policy; +mod tasks; mod types; pub(crate) use compaction_manager::*; diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 9cf183a0322..428046dfa0c 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -1,18 +1,22 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use chroma_config::assignment::assignment_policy::AssignmentPolicy; use chroma_log::{CollectionInfo, CollectionRecord, Log}; use chroma_memberlist::memberlist_provider::Memberlist; +use chroma_storage::Storage; use chroma_sysdb::{GetCollectionsOptions, SysDb}; use chroma_types::CollectionUuid; use figment::providers::Env; use figment::Figment; +use s3heap_service::SysDbScheduler; use serde::Deserialize; use uuid::Uuid; use crate::compactor::scheduler_policy::SchedulerPolicy; +use crate::compactor::tasks::TaskHeapReader; use crate::compactor::types::CompactionJob; #[derive(Debug, Clone)] @@ -97,6 +101,7 @@ pub(crate) struct Scheduler { dead_jobs: HashSet, max_failure_count: u8, metrics: SchedulerMetrics, + tasks: TaskHeapReader, } #[derive(Deserialize, Debug)] @@ -110,6 +115,7 @@ impl Scheduler { my_ip: String, log: Log, sysdb: SysDb, + storage: Storage, policy: Box, max_concurrent_jobs: usize, min_compaction_size: usize, @@ -118,6 +124,10 @@ impl Scheduler { job_expiry_seconds: u64, max_failure_count: u8, ) -> Scheduler { + let heap_scheduler = + Arc::new(SysDbScheduler::new(sysdb.clone())) as Arc; + let tasks = TaskHeapReader::new(storage, heap_scheduler); + Scheduler { my_member_id: my_ip, log, @@ -138,6 +148,7 @@ impl Scheduler { max_failure_count, dead_jobs: HashSet::new(), metrics: SchedulerMetrics::default(), + tasks, } } @@ -476,6 +487,13 @@ impl Scheduler { // Recompute disabled list. self.recompute_disabled_collections(); let collections = self.get_collections_with_new_data().await; + let tasks = self + .tasks + .get_tasks_scheduled_for_execution( + s3heap::Limits::default().with_items(self.max_concurrent_jobs), + ) + .await; + tracing::info!("SCHEDULING TASKS FOR {tasks:?}"); if collections.is_empty() { return; } @@ -505,11 +523,14 @@ mod tests { use chroma_config::assignment::assignment_policy::RendezvousHashingAssignmentPolicy; use chroma_log::in_memory_log::{InMemoryLog, InternalLogRecord}; use chroma_memberlist::memberlist_provider::Member; + use chroma_storage::s3_client_for_test_with_new_bucket; use chroma_sysdb::TestSysDb; use chroma_types::{Collection, LogRecord, Operation, OperationRecord}; #[tokio::test] async fn test_scheduler() { + let storage = s3_client_for_test_with_new_bucket().await; + let mut log = Log::InMemory(InMemoryLog::new()); let in_memory_log = match log { Log::InMemory(ref mut in_memory_log) => in_memory_log, @@ -609,6 +630,7 @@ mod tests { my_member.member_id.clone(), log, sysdb.clone(), + storage, scheduler_policy, max_concurrent_jobs, 1, @@ -738,6 +760,8 @@ mod tests { #[tokio::test] #[should_panic(expected = "is less than offset")] async fn test_scheduler_panic() { + let storage = s3_client_for_test_with_new_bucket().await; + let mut log = Log::InMemory(InMemoryLog::new()); let in_memory_log = match log { Log::InMemory(ref mut in_memory_log) => in_memory_log, @@ -862,6 +886,7 @@ mod tests { my_member.member_id.clone(), log, sysdb.clone(), + storage, scheduler_policy, max_concurrent_jobs, 1, diff --git a/rust/worker/src/compactor/tasks.rs b/rust/worker/src/compactor/tasks.rs new file mode 100644 index 00000000000..7c4497a2813 --- /dev/null +++ b/rust/worker/src/compactor/tasks.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use chroma_storage::Storage; +use chroma_types::CollectionUuid; +use s3heap::{heap_path_from_hostname, Error, HeapReader, HeapScheduler, Limits}; + +/// A task that has been scheduled for execution. +#[derive(Clone, Debug)] +pub struct SchedulableTask { + pub collection: CollectionUuid, + pub scheduling: uuid::Uuid, + pub nonce: uuid::Uuid, +} + +/// Reader for fetching scheduled tasks from multiple heap instances. +pub struct TaskHeapReader { + storage: Storage, + heap_scheduler: Arc, +} + +impl TaskHeapReader { + /// Create a new TaskHeapReader with the given dependencies. + pub fn new(storage: Storage, heap_scheduler: Arc) -> Self { + Self { + storage, + heap_scheduler, + } + } + + /// Get tasks scheduled for execution across all rust-log-service heaps. + /// + /// This method queries heap/rust-log-service-0, heap/rust-log-service-1, etc., + /// until it encounters an empty heap or error, collecting up to `limit` tasks. + pub async fn get_tasks_scheduled_for_execution(&self, limits: Limits) -> Vec { + let mut all_tasks = Vec::new(); + let mut service_index = 0; + let max_items = limits.max_items.unwrap_or(1000); + + loop { + if all_tasks.len() >= max_items { + break; + } + + let heap_prefix = + heap_path_from_hostname(&format!("rust-log-service-{}", service_index)); + + let reader_result = HeapReader::new( + self.storage.clone(), + heap_prefix.clone(), + Arc::clone(&self.heap_scheduler), + ) + .await; + + let reader = match reader_result { + Ok(r) => r, + Err(Error::UninitializedHeap(_)) => { + break; + } + Err(e) => { + tracing::error!("Error creating heap reader for {}: {:?}", heap_prefix, e); + service_index += 1; + continue; + } + }; + + match reader.peek(|_| true, limits).await { + Ok(items) => { + tracing::trace!("Found {} tasks in {}", items.len(), heap_prefix); + for item in items { + all_tasks.push(SchedulableTask { + collection: CollectionUuid(*item.trigger.partitioning.as_uuid()), + scheduling: *item.trigger.scheduling.as_uuid(), + nonce: item.nonce, + }); + } + } + Err(e) => { + tracing::trace!("Error reading from {}: {:?}", heap_prefix, e); + } + } + + service_index += 1; + } + + all_tasks.truncate(max_items); + all_tasks + } +} From da2371f298315395cc4bec61c00c378b7e0db0cc Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 13 Oct 2025 13:41:20 -0700 Subject: [PATCH 02/13] state space diagram for reasoning through what happens when. --- rust/s3heap-service/README.md | 73 +++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 rust/s3heap-service/README.md diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md new file mode 100644 index 00000000000..f1018281eaa --- /dev/null +++ b/rust/s3heap-service/README.md @@ -0,0 +1,73 @@ +# s3heap-service + +The s3heap-service integrates with the task manager to trigger tasks at no-faster than a particular +cadence, with reasonable guarantees that writing data will cause a task to run. + +This document lays refines the design of the heap-tender and heap service until it can be +implemented safely. + +## Abstract: A heap and a sysdb. + +At the most abstract level, we have a heap and the sysdb. An item is either in the heap or not in +the heap. For the sysdb, an item is not in the sysdb, in the sysdb and should be scheduled, or in +the sysdb and waiting for writes to trigger the next scheduled run. + +That gives this chart + +| Heap State | Sysdb State | +|------------|-------------| +| Not in heap | Not in sysdb | +| Not in heap | In sysdb, should be scheduled | +| Not in heap | In sysdb, waiting for writes | +| In heap | Not in sysdb | +| In heap | In sysdb, should be scheduled | +| In heap | In sysdb, waiting for writes | + +More abstractly, view it like this: + + | On Heap | Not On Heap | +--------------------|------------|-------------| +Not in sysdb | A_1 | A_2 | +In sysdb, scheduled | B_1 | B_2 | +In sysdb, waiting | C_1 | C_2 | + +When viewed like this, we can establish rules for state transitions in our system. Each operation +operates on either the sysdb or the heap, never both because there is no transactionality between S3 +and databases. Thus, we can reason that we can jump to any row within the same column, or to +another column within the same row. + +## State space diagram + + From + | | A_1 | A_2 | B_1 | B_2 | C_1 | C_2 | + |------|------|------|------|------|------|------| + | A_1 | - | IMP1 | YES1 | X | YES1 | X | + | A_2 | GC1 | - | X | GC2 | X | YES1 | +To | B_1 | IMP2 | X |- | NEW2 | YES3 | X | + | B_2 | X | NEW1 | IMP3 | - | X | YES3 | + | C_1 | IMP2 | X | YES2 | X | - | IMP4 | + | C_2 | X | NO1 | X | YES2 | IMP3 | - | + +GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2. +GC2: Garbage collection. + +NEW1: Create a new task in the sysdb. +NEW2: Finish the new operation by primiing the task and putting it on the heap. + +YES1: Task gets deleted from sysdb. +YES2: This implies that we move from scheduled to waiting while the task is on heap. This happens + when a job completes and reads all data from the log. +YES3: There was a write, the heap needed to schedule, so it picked a time and updated sysdb. + +NO1: This implies that the state transitioned from being not-in-sysdb to in-sysdb. A new task + will always run straight away, so it should not be put into waiting state. + +IMP1: The item is not on heap or in the database. First transition is to B_2 or C_2. +IMP2: Task UUIDs are not re-used. Starting from A_1 implies the task was created and then put on + the heap and subsequently removed from sysdb. There should be no means by which it reappears + in the sysdb. Therefore this path is impossible. +IMP3: We never take something off the heap until the sysdb is updated to reflect the job being + done. Therefore we don't take this transition. +IMP4: We don't add something to the heap until it has been scheduled. + +X: Impossible. From ddec8ce268ed2c3e7b1796955a3ba572072d2dcf Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 13 Oct 2025 13:44:51 -0700 Subject: [PATCH 03/13] fixtable --- rust/s3heap-service/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index f1018281eaa..730a1adf753 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -40,7 +40,7 @@ another column within the same row. From | | A_1 | A_2 | B_1 | B_2 | C_1 | C_2 | - |------|------|------|------|------|------|------| +-----|------|------|------|------|------|------|------| | A_1 | - | IMP1 | YES1 | X | YES1 | X | | A_2 | GC1 | - | X | GC2 | X | YES1 | To | B_1 | IMP2 | X |- | NEW2 | YES3 | X | From c844996225ad6b0d5654cf510637a535f2046c7e Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 13 Oct 2025 13:48:31 -0700 Subject: [PATCH 04/13] fixtable2 --- rust/s3heap-service/README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index 730a1adf753..a621b37194f 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -39,14 +39,14 @@ another column within the same row. ## State space diagram From - | | A_1 | A_2 | B_1 | B_2 | C_1 | C_2 | ------|------|------|------|------|------|------|------| - | A_1 | - | IMP1 | YES1 | X | YES1 | X | - | A_2 | GC1 | - | X | GC2 | X | YES1 | -To | B_1 | IMP2 | X |- | NEW2 | YES3 | X | - | B_2 | X | NEW1 | IMP3 | - | X | YES3 | - | C_1 | IMP2 | X | YES2 | X | - | IMP4 | - | C_2 | X | NO1 | X | YES2 | IMP3 | - | +| | | A_1 | A_2 | B_1 | B_2 | C_1 | C_2 | +|-----|------|------|------|------|------|------|------| +| | A_1 | - | IMP1 | YES1 | X | YES1 | X | +| | A_2 | GC1 | - | X | GC2 | X | YES1 | +| To | B_1 | IMP2 | X |- | NEW2 | YES3 | X | +| | B_2 | X | NEW1 | IMP3 | - | X | YES3 | +| | C_1 | IMP2 | X | YES2 | X | - | IMP4 | +| | C_2 | X | NO1 | X | YES2 | IMP3 | - | GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2. GC2: Garbage collection. From c3f061a0153b3af9b0eb8b4d28410320573c5293 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 13 Oct 2025 13:51:00 -0700 Subject: [PATCH 05/13] another table and bullets --- rust/s3heap-service/README.md | 46 ++++++++++++++--------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index a621b37194f..6731e53ea10 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -25,11 +25,11 @@ That gives this chart More abstractly, view it like this: - | On Heap | Not On Heap | ---------------------|------------|-------------| -Not in sysdb | A_1 | A_2 | -In sysdb, scheduled | B_1 | B_2 | -In sysdb, waiting | C_1 | C_2 | +| | On Heap | Not On Heap | +|---------------------|------------|-------------| +| Not in sysdb | A_1 | A_2 | +| In sysdb, scheduled | B_1 | B_2 | +| In sysdb, waiting | C_1 | C_2 | When viewed like this, we can establish rules for state transitions in our system. Each operation operates on either the sysdb or the heap, never both because there is no transactionality between S3 @@ -48,26 +48,16 @@ another column within the same row. | | C_1 | IMP2 | X | YES2 | X | - | IMP4 | | | C_2 | X | NO1 | X | YES2 | IMP3 | - | -GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2. -GC2: Garbage collection. - -NEW1: Create a new task in the sysdb. -NEW2: Finish the new operation by primiing the task and putting it on the heap. - -YES1: Task gets deleted from sysdb. -YES2: This implies that we move from scheduled to waiting while the task is on heap. This happens - when a job completes and reads all data from the log. -YES3: There was a write, the heap needed to schedule, so it picked a time and updated sysdb. - -NO1: This implies that the state transitioned from being not-in-sysdb to in-sysdb. A new task - will always run straight away, so it should not be put into waiting state. - -IMP1: The item is not on heap or in the database. First transition is to B_2 or C_2. -IMP2: Task UUIDs are not re-used. Starting from A_1 implies the task was created and then put on - the heap and subsequently removed from sysdb. There should be no means by which it reappears - in the sysdb. Therefore this path is impossible. -IMP3: We never take something off the heap until the sysdb is updated to reflect the job being - done. Therefore we don't take this transition. -IMP4: We don't add something to the heap until it has been scheduled. - -X: Impossible. +- GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2. +- GC2: Garbage collection. +- NEW1: Create a new task in the sysdb. +- NEW2: Finish the new operation by primiing the task and putting it on the heap. +- YES1: Task gets deleted from sysdb. +- YES2: This implies that we move from scheduled to waiting while the task is on heap. This happens when a job completes and reads all data from the log. +- YES3: There was a write, the heap needed to schedule, so it picked a time and updated sysdb. +- NO1: This implies that the state transitioned from being not-in-sysdb to in-sysdb. A new task will always run straight away, so it should not be put into waiting state. +- IMP1: The item is not on heap or in the database. First transition is to B_2 or C_2. +- IMP2: Task UUIDs are not re-used. Starting from A_1 implies the task was created and then put on the heap and subsequently removed from sysdb. There should be no means by which it reappears in the sysdb. Therefore this path is impossible. +- IMP3: We never take something off the heap until the sysdb is updated to reflect the job being done. Therefore we don't take this transition. +- IMP4: We don't add something to the heap until it has been scheduled. +- X: Impossible. From 648a9f2f89007a955fc60059fbac12003b464336 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:02:03 -0700 Subject: [PATCH 06/13] Make peek take a bucket param --- Cargo.lock | 1 + rust/s3heap/src/lib.rs | 17 ++++++++++------- .../tests/test_k8s_integration_01_empty_heap.rs | 2 +- .../tests/test_k8s_integration_02_basic_push.rs | 8 ++++---- .../test_k8s_integration_03_merge_buckets.rs | 6 +++--- .../test_k8s_integration_04_prune_completed.rs | 10 +++++----- .../test_k8s_integration_05_peek_filtering.rs | 12 ++++++------ ..._k8s_integration_08_concurrent_operations.rs | 8 ++++---- rust/s3heap/tests/test_unit_tests.rs | 6 +++--- rust/worker/Cargo.toml | 1 + rust/worker/src/compactor/compaction_manager.rs | 4 ++++ rust/worker/src/compactor/scheduler.rs | 10 +++++++++- rust/worker/src/compactor/tasks.rs | 16 ++++++++++------ 13 files changed, 61 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 978f9367f5d..290ae4d6be0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9980,6 +9980,7 @@ dependencies = [ "chroma-system", "chroma-tracing", "chroma-types", + "chrono", "clap", "criterion", "fastrace", diff --git a/rust/s3heap/src/lib.rs b/rust/s3heap/src/lib.rs index 65e36f20998..741c00d38c6 100644 --- a/rust/s3heap/src/lib.rs +++ b/rust/s3heap/src/lib.rs @@ -1176,9 +1176,9 @@ impl HeapReader { /// ``` pub async fn peek( &self, - should_return: impl for<'a> Fn(&'a Triggerable) -> bool + Send + Sync, + should_return: impl for<'a> Fn(&'a Triggerable, DateTime) -> bool + Send + Sync, limits: Limits, - ) -> Result, Error> { + ) -> Result, HeapItem)>, Error> { let heap_scheduler = self.internal.heap_scheduler(); let buckets = self.internal.list_approx_first_1k_buckets().await?; let mut returns = vec![]; @@ -1188,7 +1188,7 @@ impl HeapReader { let (entries, _) = self.internal.load_bucket_or_empty(bucket).await?; let triggerable_and_nonce = entries .iter() - .filter(|hi| should_return(&hi.trigger)) + .filter(|hi| should_return(&hi.trigger, bucket)) .map(|hi| (hi.trigger, hi.nonce)) .collect::>(); let are_done = heap_scheduler.are_done(&triggerable_and_nonce).await?; @@ -1201,10 +1201,13 @@ impl HeapReader { } for ((triggerable, uuid), is_done) in triggerable_and_nonce.iter().zip(are_done) { if !is_done { - returns.push(HeapItem { - trigger: *triggerable, - nonce: *uuid, - }); + returns.push(( + bucket, + HeapItem { + trigger: *triggerable, + nonce: *uuid, + }, + )); if returns.len() >= max_items { break 'outer; } diff --git a/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs b/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs index 3440f5c222b..8122330957c 100644 --- a/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs +++ b/rust/s3heap/tests/test_k8s_integration_01_empty_heap.rs @@ -28,7 +28,7 @@ async fn test_k8s_integration_01_empty_heap() { .unwrap(); // Peek should return empty results - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 0, "Empty heap should return no items"); // Verify no buckets exist diff --git a/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs b/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs index cded1fa4b02..9acaf2b5040 100644 --- a/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs +++ b/rust/s3heap/tests/test_k8s_integration_02_basic_push.rs @@ -52,13 +52,13 @@ async fn test_k8s_integration_02_basic_push() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 2, "Should read 2 items back"); // Verify items have correct data let partitioning_uuids: Vec = items .iter() - .map(|i| *i.trigger.partitioning.as_uuid()) + .map(|(_bucket, item)| *item.trigger.partitioning.as_uuid()) .collect(); assert!( partitioning_uuids.contains(schedule1.triggerable.partitioning.as_uuid()), @@ -109,10 +109,10 @@ async fn test_k8s_integration_02_push_with_no_schedule() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 1, "Should have only 1 scheduled item"); assert_eq!( - items[0].trigger.partitioning.as_uuid(), + items[0].1.trigger.partitioning.as_uuid(), schedule2.triggerable.partitioning.as_uuid(), "Should be the scheduled item" ); diff --git a/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs b/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs index cd6271054a5..65aaad852eb 100644 --- a/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs +++ b/rust/s3heap/tests/test_k8s_integration_03_merge_buckets.rs @@ -70,7 +70,7 @@ async fn test_k8s_integration_03_merge_same_bucket() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 3, "Should read all 3 items from single bucket"); } @@ -151,13 +151,13 @@ async fn test_k8s_integration_03_merge_multiple_pushes() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 4, "Should have all 4 items after merging"); // Verify all items are present let uuids: Vec<_> = items .iter() - .map(|i| *i.trigger.scheduling.as_uuid()) + .map(|(_bucket, item)| *item.trigger.scheduling.as_uuid()) .collect(); assert!(uuids.contains(item1.scheduling.as_uuid())); assert!(uuids.contains(item2.scheduling.as_uuid())); diff --git a/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs b/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs index 6f4dabe0a34..ed494904e1d 100644 --- a/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs +++ b/rust/s3heap/tests/test_k8s_integration_04_prune_completed.rs @@ -58,14 +58,14 @@ async fn test_k8s_integration_04_prune_completed_items() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!( items.len(), 1, "Only incomplete item should remain after pruning" ); assert_eq!( - items[0].trigger.scheduling.as_uuid(), + items[0].1.trigger.scheduling.as_uuid(), schedule2.triggerable.scheduling.as_uuid(), "Should be the incomplete item" ); @@ -122,7 +122,7 @@ async fn test_k8s_integration_04_prune_empty_bucket() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!( items.len(), 0, @@ -193,12 +193,12 @@ async fn test_k8s_integration_04_prune_multiple_buckets() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 2, "Two incomplete items should remain"); let uuids: Vec<_> = items .iter() - .map(|i| *i.trigger.scheduling.as_uuid()) + .map(|(_bucket, item)| *item.trigger.scheduling.as_uuid()) .collect(); assert!( uuids.contains(schedule2.triggerable.scheduling.as_uuid()), diff --git a/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs b/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs index dcd095b6f41..a89e5f665e9 100644 --- a/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs +++ b/rust/s3heap/tests/test_k8s_integration_05_peek_filtering.rs @@ -83,7 +83,7 @@ async fn test_k8s_integration_05_peek_all_items() { .unwrap(); // Verify all items are present - let all_items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let all_items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(all_items.len(), 5, "Should have all 5 items"); } @@ -166,7 +166,7 @@ async fn test_k8s_integration_05_peek_with_filter() { let target_uuid4 = *item4.scheduling.as_uuid(); let filtered_items = reader .peek( - |triggerable| { + |triggerable, _| { let uuid = *triggerable.scheduling.as_uuid(); uuid == target_uuid2 || uuid == target_uuid4 }, @@ -182,7 +182,7 @@ async fn test_k8s_integration_05_peek_with_filter() { ); let returned_uuids: Vec<_> = filtered_items .iter() - .map(|item| *item.trigger.scheduling.as_uuid()) + .map(|(_bucket, item)| *item.trigger.scheduling.as_uuid()) .collect(); assert!( returned_uuids.contains(&target_uuid2), @@ -257,10 +257,10 @@ async fn test_k8s_integration_05_peek_filters_completed() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(items.len(), 1, "Should only return incomplete items"); assert_eq!( - items[0].trigger.scheduling.as_uuid(), + items[0].1.trigger.scheduling.as_uuid(), item2.scheduling.as_uuid(), "Should be the pending task" ); @@ -335,6 +335,6 @@ async fn test_k8s_integration_05_peek_across_buckets() { .unwrap(); // Verify all items across buckets - let all_items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let all_items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!(all_items.len(), 4, "Should find all items across buckets"); } diff --git a/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs b/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs index e76a2dd4c36..0ffc816a4a1 100644 --- a/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs +++ b/rust/s3heap/tests/test_k8s_integration_08_concurrent_operations.rs @@ -71,7 +71,7 @@ async fn test_k8s_integration_08_concurrent_pushes() { ) .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!( items.len(), (num_writers * items_per_writer) as usize, @@ -156,7 +156,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { .unwrap(); read_handles.push(tokio::spawn(async move { - let items = reader.peek(|_| true, Limits::default()).await?; + let items = reader.peek(|_, _| true, Limits::default()).await?; // Items count will vary as writes complete assert!(items.len() >= 5, "Should have at least initial items"); Ok::<_, s3heap::Error>(items.len()) @@ -179,7 +179,7 @@ async fn test_k8s_integration_08_concurrent_read_write() { ) .await .unwrap(); - let final_items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let final_items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); assert_eq!( final_items.len(), 20, @@ -271,7 +271,7 @@ async fn test_k8s_integration_08_concurrent_prune_push() { ) .await .unwrap(); - let final_items = reader.peek(|_| true, Limits::default()).await.unwrap(); + let final_items = reader.peek(|_, _| true, Limits::default()).await.unwrap(); // Should have: 5 incomplete initial items (odds) + 5 new items assert!( diff --git a/rust/s3heap/tests/test_unit_tests.rs b/rust/s3heap/tests/test_unit_tests.rs index 3b308d22c62..f0634f1f740 100644 --- a/rust/s3heap/tests/test_unit_tests.rs +++ b/rust/s3heap/tests/test_unit_tests.rs @@ -372,7 +372,7 @@ async fn reader_peek_empty_heap() { .await .unwrap(); - let items = reader.peek(|_| true, Limits::default()).await; + let items = reader.peek(|_, _| true, Limits::default()).await; assert!(items.is_ok()); assert_eq!(items.unwrap().len(), 0); } @@ -394,7 +394,7 @@ async fn reader_peek_with_filter() { .unwrap(); // Filter that rejects everything - let items = reader.peek(|_| false, Limits::default()).await; + let items = reader.peek(|_, _| false, Limits::default()).await; assert!(items.is_ok()); assert_eq!(items.unwrap().len(), 0); } @@ -421,7 +421,7 @@ async fn reader_respects_limits() { }; // Should respect the bucket limit - let items = reader.peek(|_| true, limits).await; + let items = reader.peek(|_, _| true, limits).await; assert!(items.is_ok()); } diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index c6c82ef09d6..a578d12de30 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -28,6 +28,7 @@ arrow = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } roaring = { workspace = true } figment = { workspace = true } futures = { workspace = true } diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 1cff1b647ca..11bc49d5722 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -713,6 +713,7 @@ mod tests { use chroma_log::in_memory_log::{InMemoryLog, InternalLogRecord}; use chroma_memberlist::memberlist_provider::Member; use chroma_storage::local::LocalStorage; + use chroma_storage::s3_client_for_test_with_new_bucket; use chroma_sysdb::TestSysDb; use chroma_system::{Dispatcher, DispatcherConfig}; use chroma_types::SegmentUuid; @@ -1024,6 +1025,8 @@ mod tests { let dispatcher = Dispatcher::new(DispatcherConfig::default()); let _dispatcher_handle = system.start_component(dispatcher); + let storage = s3_client_for_test_with_new_bucket().await; + // Create test scheduler with dead jobs let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::default()); assignment_policy.set_members(vec!["test-member".to_string()]); @@ -1032,6 +1035,7 @@ mod tests { "test-member".to_string(), Log::InMemory(InMemoryLog::new()), SysDb::Test(TestSysDb::new()), + storage, Box::new(LasCompactionTimeSchedulerPolicy {}), 10, 100, diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 428046dfa0c..7f3febb7503 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -493,7 +493,15 @@ impl Scheduler { s3heap::Limits::default().with_items(self.max_concurrent_jobs), ) .await; - tracing::info!("SCHEDULING TASKS FOR {tasks:?}"); + for task in tasks { + tracing::info!( + "SCHEDULING TASKS FOR {:?} {:?} {:?} {:?}", + task.bucket, + task.collection_id, + task.task_id, + task.nonce, + ); + } if collections.is_empty() { return; } diff --git a/rust/worker/src/compactor/tasks.rs b/rust/worker/src/compactor/tasks.rs index 7c4497a2813..ecf4f48beac 100644 --- a/rust/worker/src/compactor/tasks.rs +++ b/rust/worker/src/compactor/tasks.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use chrono::{DateTime, Utc}; + use chroma_storage::Storage; use chroma_types::CollectionUuid; use s3heap::{heap_path_from_hostname, Error, HeapReader, HeapScheduler, Limits}; @@ -7,9 +9,10 @@ use s3heap::{heap_path_from_hostname, Error, HeapReader, HeapScheduler, Limits}; /// A task that has been scheduled for execution. #[derive(Clone, Debug)] pub struct SchedulableTask { - pub collection: CollectionUuid, - pub scheduling: uuid::Uuid, + pub collection_id: CollectionUuid, + pub task_id: uuid::Uuid, pub nonce: uuid::Uuid, + pub bucket: DateTime, } /// Reader for fetching scheduled tasks from multiple heap instances. @@ -63,14 +66,15 @@ impl TaskHeapReader { } }; - match reader.peek(|_| true, limits).await { + match reader.peek(|_, _| true, limits).await { Ok(items) => { tracing::trace!("Found {} tasks in {}", items.len(), heap_prefix); - for item in items { + for (bucket, item) in items { all_tasks.push(SchedulableTask { - collection: CollectionUuid(*item.trigger.partitioning.as_uuid()), - scheduling: *item.trigger.scheduling.as_uuid(), + collection_id: CollectionUuid(*item.trigger.partitioning.as_uuid()), + task_id: *item.trigger.scheduling.as_uuid(), nonce: item.nonce, + bucket, }); } } From 6c7e23c2332de7aea4fbb9800bea0255411254fc Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:04:05 -0700 Subject: [PATCH 07/13] s/primiing/priming/ --- rust/s3heap-service/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index 6731e53ea10..e823571f2b1 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -51,7 +51,7 @@ another column within the same row. - GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2. - GC2: Garbage collection. - NEW1: Create a new task in the sysdb. -- NEW2: Finish the new operation by primiing the task and putting it on the heap. +- NEW2: Finish the new operation by priming the task and putting it on the heap. - YES1: Task gets deleted from sysdb. - YES2: This implies that we move from scheduled to waiting while the task is on heap. This happens when a job completes and reads all data from the log. - YES3: There was a write, the heap needed to schedule, so it picked a time and updated sysdb. From cf7022a31ae35ccc0bbad34206a46da76f5f827b Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:04:44 -0700 Subject: [PATCH 08/13] refines. --- rust/s3heap-service/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index e823571f2b1..9bd088e4741 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -3,8 +3,8 @@ The s3heap-service integrates with the task manager to trigger tasks at no-faster than a particular cadence, with reasonable guarantees that writing data will cause a task to run. -This document lays refines the design of the heap-tender and heap service until it can be -implemented safely. +This document refines the design of the heap-tender and heap service until it can be implemented +safely. ## Abstract: A heap and a sysdb. From 1d9c7a50d8ad4bd591201130d2ea6b8a17b0d665 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:05:09 -0700 Subject: [PATCH 09/13] s/no-faster/no faster/ --- rust/s3heap-service/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/s3heap-service/README.md b/rust/s3heap-service/README.md index 9bd088e4741..c8e398d7031 100644 --- a/rust/s3heap-service/README.md +++ b/rust/s3heap-service/README.md @@ -1,6 +1,6 @@ # s3heap-service -The s3heap-service integrates with the task manager to trigger tasks at no-faster than a particular +The s3heap-service integrates with the task manager to trigger tasks at no faster than a particular cadence, with reasonable guarantees that writing data will cause a task to run. This document refines the design of the heap-tender and heap service until it can be implemented From 6e1c380660d7baeddb75aa064d31e02734da693d Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:42:08 -0700 Subject: [PATCH 10/13] Most review comments --- rust/s3heap-service/src/scheduler.rs | 2 +- rust/s3heap/src/lib.rs | 22 +++++++++------------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/rust/s3heap-service/src/scheduler.rs b/rust/s3heap-service/src/scheduler.rs index f65824082ea..044282fd3de 100644 --- a/rust/s3heap-service/src/scheduler.rs +++ b/rust/s3heap-service/src/scheduler.rs @@ -45,7 +45,7 @@ impl HeapScheduler for SysDbScheduler { results.push(true); continue; }; - results.push(schedule.when_to_run.is_some() && schedule.task_run_nonce != *nonce); + results.push(schedule.task_run_nonce != *nonce); } Ok(results) } diff --git a/rust/s3heap/src/lib.rs b/rust/s3heap/src/lib.rs index 741c00d38c6..5e394e76e61 100644 --- a/rust/s3heap/src/lib.rs +++ b/rust/s3heap/src/lib.rs @@ -579,15 +579,8 @@ pub trait HeapScheduler: Send + Sync { /// * `Ok(None)` if no schedules exist for the task /// * `Err` if there was an error retrieving the schedule or if multiple schedules exist async fn get_schedule(&self, id: Uuid) -> Result, Error> { - let results = self.get_schedules(&[id]).await?; - match results.len() { - 0 => Ok(None), - 1 => Ok(Some(results.into_iter().next().unwrap())), - n => Err(Error::Internal(format!( - "get_schedules returned {} schedules for 1 id (expected 0 or 1)", - n - ))), - } + let mut results = self.get_schedules(&[id]).await?; + Ok(results.pop()) } /// Get the schedules for multiple tasks by their IDs. @@ -699,14 +692,17 @@ impl HeapWriter { validate_prefix(&prefix)?; let init_path = format!("{}/INIT", prefix); + let internal = Internal::new( + storage.clone(), + prefix, + heap_scheduler, + config.backoff.clone(), + ); storage .put_bytes(&init_path, vec![], chroma_storage::PutOptions::default()) .await?; - Ok(Self { - internal: Internal::new(storage, prefix, heap_scheduler, config.backoff.clone()), - config, - }) + Ok(Self { config, internal }) } /// Schedule a batch of tasks in the heap. From 4b3c1ba11a12f4b94108ddd46425c801155bf529 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 10:53:10 -0700 Subject: [PATCH 11/13] fix example --- rust/s3heap/examples/s3heap-benchmark.rs | 31 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/rust/s3heap/examples/s3heap-benchmark.rs b/rust/s3heap/examples/s3heap-benchmark.rs index 9ff4bb3e53e..cc56d94101c 100644 --- a/rust/s3heap/examples/s3heap-benchmark.rs +++ b/rust/s3heap/examples/s3heap-benchmark.rs @@ -10,7 +10,25 @@ use guacamole::Guacamole; use chroma_storage::s3::s3_client_for_test_with_bucket_name; use uuid::Uuid; -use s3heap::{HeapWriter, Schedule, Triggerable, UnitOfPartitioningUuid, UnitOfSchedulingUuid}; +use s3heap::{ + Error, HeapScheduler, HeapWriter, Schedule, Triggerable, UnitOfPartitioningUuid, + UnitOfSchedulingUuid, +}; + +///////////////////////////////////////////// DummyScheduler /////////////////////////////////////// + +struct DummyScheduler; + +#[async_trait::async_trait] +impl HeapScheduler for DummyScheduler { + async fn are_done(&self, items: &[(Triggerable, uuid::Uuid)]) -> Result, Error> { + Ok(vec![false; items.len()]) + } + + async fn get_schedules(&self, _ids: &[uuid::Uuid]) -> Result, Error> { + Ok(vec![]) + } +} ///////////////////////////////////////////// benchmark //////////////////////////////////////////// @@ -36,12 +54,9 @@ async fn main() { let options = Options::default(); let storage = s3_client_for_test_with_bucket_name("s3heap-testing").await; let heap = Arc::new( - HeapWriter::new( - storage, - "s3heapbench".to_string(), - Arc::new(s3heap::DummyScheduler), - ) - .unwrap(), + HeapWriter::new(storage, "s3heapbench".to_string(), Arc::new(DummyScheduler)) + .await + .unwrap(), ); let (tx, mut rx) = tokio::sync::mpsc::channel::(options.target_throughput + options.max_tokio_tasks); @@ -49,7 +64,7 @@ async fn main() { let sum = Arc::new(AtomicU64::new(0)); let heap_count = Arc::clone(&count); let heap_sum = Arc::clone(&sum); - let heap_runner = Arc::clone(&heap); + let heap_runner: Arc = Arc::clone(&heap); let runner = tokio::task::spawn(async move { let mut buffer = vec![]; loop { From a140142c88bc61651a671e64b723006621d972ff Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 12:50:05 -0700 Subject: [PATCH 12/13] s/test_/test_k8s_integration_/where necessary --- rust/worker/src/compactor/compaction_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 11bc49d5722..7fa236e2dea 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -1019,7 +1019,7 @@ mod tests { } #[tokio::test] - async fn test_list_dead_jobs() { + async fn test_k8s_integration_list_dead_jobs() { // Create a simple system for testing let system = System::new(); let dispatcher = Dispatcher::new(DispatcherConfig::default()); From 8bf9197a7a5b072bd981b7daace2cbbe273b0a96 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 15 Oct 2025 12:56:11 -0700 Subject: [PATCH 13/13] test_k8s_integration --- rust/worker/src/compactor/scheduler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 7f3febb7503..31c2b3c42e8 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -536,7 +536,7 @@ mod tests { use chroma_types::{Collection, LogRecord, Operation, OperationRecord}; #[tokio::test] - async fn test_scheduler() { + async fn test_k8s_integration_scheduler() { let storage = s3_client_for_test_with_new_bucket().await; let mut log = Log::InMemory(InMemoryLog::new()); @@ -767,7 +767,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "is less than offset")] - async fn test_scheduler_panic() { + async fn test_k8s_integration_scheduler_panic() { let storage = s3_client_for_test_with_new_bucket().await; let mut log = Log::InMemory(InMemoryLog::new());