From 449c80d5693f701af5869a620dea90153059c4c5 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Tue, 11 Nov 2025 16:18:04 -0800 Subject: [PATCH 1/5] [ENH] Make a DirtyMarker have a backfill bit. We want the ability to run a task for the first time on a collection. Backfills coalesce. --- idl/chromadb/proto/logservice.proto | 13 + rust/log-service/src/lib.rs | 269 +++++++++++++++--- rust/log/src/grpc_log.rs | 2 + .../test_k8s_integration_00_heap_tender.rs | 14 + rust/types/src/log.rs | 9 +- 5 files changed, 268 insertions(+), 39 deletions(-) diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index b0ee70a3685..ff06be25ee3 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -67,6 +67,9 @@ message CollectionInfo { int64 first_log_offset = 2; // The timestamp of the first log entry of the collection that needs to be compacted int64 first_log_ts = 3; + // If the request was the result of the backfill call---operator's intention left to the spurr of + // the moment. + bool backfill = 4; } message GetAllCollectionInfoToCompactRequest { @@ -174,6 +177,14 @@ message FragmentToEvict { message PurgeFromCacheResponse { } +message BackfillRequest { + string collection_id = 1; + uint64 initial_insertion_epoch_us = 2; +} + +message BackfillResponse { +} + service LogService { rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {} rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {} @@ -200,4 +211,6 @@ service LogService { // Similar to UpdateCollectionLogOffset, but allows the offset to go back in time. // Uses the exact same request/response types as UpdateCollectionLogOffset by design. rpc RollbackCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {} + // Manually mark a request as dirty. + rpc Backfill(BackfillRequest) returns (BackfillResponse) {} } diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index ebe29d19ac1..9029efb3750 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -20,15 +20,16 @@ use chroma_tracing::OtelFilter; use chroma_tracing::OtelFilterLevel; use chroma_types::chroma_proto::{ garbage_collect_phase2_request::LogToCollect, log_service_server::LogService, - purge_from_cache_request::EntryToEvict, CollectionInfo, GarbageCollectPhase2Request, - GarbageCollectPhase2Response, GetAllCollectionInfoToCompactRequest, - GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse, - InspectLogStateRequest, InspectLogStateResponse, LogRecord, MigrateLogRequest, - MigrateLogResponse, OperationRecord, PullLogsRequest, PullLogsResponse, - PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse, PurgeFromCacheRequest, - PurgeFromCacheResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, - ScrubLogRequest, ScrubLogResponse, SealLogRequest, SealLogResponse, - UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse, + purge_from_cache_request::EntryToEvict, BackfillRequest, BackfillResponse, CollectionInfo, + GarbageCollectPhase2Request, GarbageCollectPhase2Response, + GetAllCollectionInfoToCompactRequest, GetAllCollectionInfoToCompactResponse, + InspectDirtyLogRequest, InspectDirtyLogResponse, InspectLogStateRequest, + InspectLogStateResponse, LogRecord, MigrateLogRequest, MigrateLogResponse, OperationRecord, + PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest, + PurgeDirtyForCollectionResponse, PurgeFromCacheRequest, PurgeFromCacheResponse, + PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, ScrubLogRequest, + ScrubLogResponse, SealLogRequest, SealLogResponse, UpdateCollectionLogOffsetRequest, + UpdateCollectionLogOffsetResponse, }; use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse}; use chroma_types::dirty_log_path_from_hostname; @@ -346,6 +347,7 @@ struct RollupPerCollection { limit_log_position: LogPosition, reinsert_count: u64, initial_insertion_epoch_us: u64, + backfill: bool, } impl RollupPerCollection { @@ -353,6 +355,7 @@ impl RollupPerCollection { first_observation: LogPosition, num_records: u64, initial_insertion_epoch_us: u64, + backfill: bool, ) -> Self { Self { start_log_position: first_observation, @@ -361,6 +364,7 @@ impl RollupPerCollection { ), reinsert_count: 0, initial_insertion_epoch_us, + backfill, } } @@ -370,6 +374,7 @@ impl RollupPerCollection { num_records: u64, reinsert_count: u64, initial_insertion_epoch_us: u64, + backfill: bool, ) { if log_position < self.start_log_position { self.start_log_position = log_position; @@ -383,6 +388,7 @@ impl RollupPerCollection { // Consider the most recent initial insertion time so if we've compacted earlier we drop. self.initial_insertion_epoch_us = std::cmp::min(self.initial_insertion_epoch_us, initial_insertion_epoch_us); + self.backfill = self.backfill || backfill; } fn witness_cursor(&mut self, witness: Option<&Witness>) { @@ -421,6 +427,7 @@ impl RollupPerCollection { .saturating_sub(self.start_log_position.offset()), reinsert_count: self.reinsert_count.saturating_add(1), initial_insertion_epoch_us: self.initial_insertion_epoch_us, + backfill: self.backfill, } } @@ -456,12 +463,14 @@ fn coalesce_markers( num_records, reinsert_count, initial_insertion_epoch_us, + backfill, } => { let position = rollups.entry(*collection_id).or_insert_with(|| { RollupPerCollection::new( LogPosition::from_offset(*log_position), *num_records, *initial_insertion_epoch_us, + false, ) }); position.observe_dirty_marker( @@ -469,6 +478,7 @@ fn coalesce_markers( *num_records, *reinsert_count, *initial_insertion_epoch_us, + *backfill, ); } DirtyMarker::Purge { collection_id } => { @@ -495,27 +505,22 @@ impl MarkDirty { pub fn path_for_hostname(hostname: &str) -> String { dirty_log_path_from_hostname(hostname) } -} -#[async_trait::async_trait] -impl wal3::MarkDirty for MarkDirty { - async fn mark_dirty( + pub async fn mark_dirty_with_epoch_us( &self, log_position: LogPosition, num_records: usize, + initial_insertion_epoch_us: u64, ) -> Result<(), wal3::Error> { if let Some(dirty_log) = self.dirty_log.as_ref() { let num_records = num_records as u64; - let initial_insertion_epoch_us = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map_err(|_| wal3::Error::Internal)? - .as_micros() as u64; let dirty_marker = DirtyMarker::MarkDirty { collection_id: self.collection_id, log_position: log_position.offset(), num_records, reinsert_count: 0, initial_insertion_epoch_us, + backfill: false, }; let dirty_marker_json = serde_json::to_string(&dirty_marker).map_err(|err| { tracing::error!("Failed to serialize dirty marker: {}", err); @@ -530,6 +535,22 @@ impl wal3::MarkDirty for MarkDirty { } } +#[async_trait::async_trait] +impl wal3::MarkDirty for MarkDirty { + async fn mark_dirty( + &self, + log_position: LogPosition, + num_records: usize, + ) -> Result<(), wal3::Error> { + let initial_insertion_epoch_us = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|_| wal3::Error::Internal)? + .as_micros() as u64; + self.mark_dirty_with_epoch_us(log_position, num_records, initial_insertion_epoch_us) + .await + } +} + ///////////////////////////////////////////// LogServer //////////////////////////////////////////// #[derive(Default)] @@ -771,6 +792,7 @@ impl LogServer { collection_id: collection_id.to_string(), first_log_offset: rollup.start_log_position.offset() as i64, first_log_ts: rollup.start_log_position.offset() as i64, + backfill: rollup.backfill, }); } Ok(Response::new(GetAllCollectionInfoToCompactResponse { @@ -1823,6 +1845,41 @@ impl LogServer { } Ok(Response::new(PurgeFromCacheResponse {})) } + + async fn backfill( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let collection_id = Uuid::parse_str(&request.collection_id) + .map(CollectionUuid) + .map_err(|_| Status::invalid_argument("Failed to parse collection id"))?; + tracing::info!( + "backfill for {collection_id} at {}", + request.initial_insertion_epoch_us + ); + let storage_prefix = collection_id.storage_prefix_for_log(); + let log_reader = LogReader::new( + self.config.reader.clone(), + Arc::clone(&self.storage), + storage_prefix.clone(), + ); + let mani = log_reader.manifest().await; + let mani = mani + .map_err(|err| Status::unknown(err.to_string()))? + .ok_or_else(|| Status::unknown("cannot read manifest from initialized log"))?; + let offset = + LogPosition::from_offset(mani.next_write_timestamp().offset().saturating_sub(1)); + let mark_dirty = MarkDirty { + collection_id, + dirty_log: self.dirty_log.clone(), + }; + mark_dirty + .mark_dirty_with_epoch_us(offset, 1usize, request.initial_insertion_epoch_us) + .await + .map_err(|err| Status::unknown(err.to_string()))?; + Ok(Response::new(BackfillResponse {})) + } } struct LogServerWrapper { @@ -1939,6 +1996,13 @@ impl LogService for LogServerWrapper { ) -> Result, Status> { self.log_server.purge_from_cache(request).await } + + async fn backfill( + &self, + request: Request, + ) -> Result, Status> { + self.log_server.backfill(request).await + } } fn parquet_to_records(parquet: Arc>) -> Result)>, Status> { @@ -2411,6 +2475,7 @@ mod tests { num_records: 1, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2421,6 +2486,7 @@ mod tests { num_records: 1, reinsert_count: 2, initial_insertion_epoch_us: now, + backfill: false, }, ), ]; @@ -2455,6 +2521,7 @@ mod tests { num_records: 1, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2465,6 +2532,7 @@ mod tests { num_records: 100, reinsert_count: 1, initial_insertion_epoch_us: now, + backfill: false, }, ), ]; @@ -2512,6 +2580,7 @@ mod tests { num_records: 100, reinsert_count: 5, initial_insertion_epoch_us: now, + backfill: false, }; let serialized = serde_json::to_string(&mark_dirty).unwrap(); @@ -2539,6 +2608,7 @@ mod tests { num_records: 1, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }; assert_eq!(collection_id, mark_dirty.collection_id()); @@ -2560,6 +2630,7 @@ mod tests { num_records: 1, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }; // Test incrementing reinsert count @@ -2592,6 +2663,7 @@ mod tests { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2606,6 +2678,7 @@ mod tests { num_records: 5, reinsert_count: 1, initial_insertion_epoch_us: now + 1000, + backfill: false, }, ), ]; @@ -2641,6 +2714,7 @@ mod tests { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2651,6 +2725,7 @@ mod tests { num_records: 5, reinsert_count: 1, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2667,6 +2742,7 @@ mod tests { num_records: 3, reinsert_count: 2, initial_insertion_epoch_us: now + 1000, + backfill: false, }, ), ]; @@ -2694,7 +2770,7 @@ mod tests { fn rollup_per_collection_new() { let start_position = LogPosition::from_offset(10); let num_records = 5; - let rollup = RollupPerCollection::new(start_position, num_records, 0); + let rollup = RollupPerCollection::new(start_position, num_records, 0, false); assert_eq!(start_position, rollup.start_log_position); assert_eq!(LogPosition::from_offset(15), rollup.limit_log_position); @@ -2708,17 +2784,17 @@ mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros() as u64; - let mut rollup = RollupPerCollection::new(start_position, 5, now); + let mut rollup = RollupPerCollection::new(start_position, 5, now, false); // Observe a marker that extends the range - rollup.observe_dirty_marker(LogPosition::from_offset(20), 10, 3, now); + rollup.observe_dirty_marker(LogPosition::from_offset(20), 10, 3, now, false); assert_eq!(LogPosition::from_offset(10), rollup.start_log_position); assert_eq!(LogPosition::from_offset(30), rollup.limit_log_position); assert_eq!(3, rollup.reinsert_count); assert_eq!(now, rollup.initial_insertion_epoch_us); // Observe a marker that comes before the start - rollup.observe_dirty_marker(LogPosition::from_offset(5), 2, 1, now - 1000); + rollup.observe_dirty_marker(LogPosition::from_offset(5), 2, 1, now - 1000, false); assert_eq!(LogPosition::from_offset(5), rollup.start_log_position); assert_eq!(LogPosition::from_offset(30), rollup.limit_log_position); assert_eq!(3, rollup.reinsert_count); // Same @@ -2727,16 +2803,16 @@ mod tests { #[test] fn rollup_per_collection_is_empty() { - let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42, false); assert!(rollup.is_empty()); - let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42, false); assert!(!rollup.is_empty()); } #[test] fn rollup_per_collection_requires_backpressure() { - let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 100, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 100, 42, false); assert!(rollup.requires_backpressure(50)); assert!(!rollup.requires_backpressure(150)); assert!(rollup.requires_backpressure(100)); // Equal case @@ -2750,8 +2826,8 @@ mod tests { .unwrap() .as_micros() as u64; - let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now); - rollup.observe_dirty_marker(LogPosition::from_offset(10), 5, 2, now); + let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now, false); + rollup.observe_dirty_marker(LogPosition::from_offset(10), 5, 2, now, false); let marker = rollup.dirty_marker(collection_id); match marker { @@ -2761,6 +2837,7 @@ mod tests { num_records, reinsert_count, initial_insertion_epoch_us, + backfill: _, } => { assert_eq!(collection_id, cid); assert_eq!(10, log_position); @@ -2822,6 +2899,7 @@ mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros() as u64, + backfill: false, }; // Verify the marker can be serialized (this is what MarkDirty::mark_dirty does) @@ -2877,6 +2955,7 @@ mod tests { num_records: 5, reinsert_count: 1, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -2887,6 +2966,7 @@ mod tests { num_records: 10, reinsert_count: 2, initial_insertion_epoch_us: now + 1000, + backfill: false, }, ), ( @@ -2897,6 +2977,7 @@ mod tests { num_records: 3, reinsert_count: 0, initial_insertion_epoch_us: now - 1000, + backfill: false, }, ), ]; @@ -2980,6 +3061,7 @@ mod tests { num_records: 100, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, )]; @@ -3014,6 +3096,7 @@ mod tests { num_records: 0, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, )]; @@ -3050,6 +3133,7 @@ mod tests { num_records: 1, reinsert_count: u64::MAX, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -3060,6 +3144,7 @@ mod tests { num_records: 1, reinsert_count: 5, initial_insertion_epoch_us: now, + backfill: false, }, ), ]; @@ -3074,7 +3159,7 @@ mod tests { #[test] fn rollup_per_collection_witness_functionality() { - let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42, false); // Test that the rollup can handle boundary conditions assert_eq!(LogPosition::from_offset(10), rollup.start_log_position); @@ -3084,11 +3169,12 @@ mod tests { #[test] fn rollup_per_collection_backpressure_boundary_conditions() { - let rollup = RollupPerCollection::new(LogPosition::from_offset(0), u64::MAX, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(0), u64::MAX, 42, false); assert!(rollup.requires_backpressure(u64::MAX - 1)); assert!(rollup.requires_backpressure(u64::MAX)); - let rollup = RollupPerCollection::new(LogPosition::from_offset(u64::MAX - 100), 50, 42); + let rollup = + RollupPerCollection::new(LogPosition::from_offset(u64::MAX - 100), 50, 42, false); assert!(!rollup.requires_backpressure(100)); assert!(rollup.requires_backpressure(25)); } @@ -3180,6 +3266,7 @@ mod tests { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -3190,6 +3277,7 @@ mod tests { num_records: 10, reinsert_count: 1, initial_insertion_epoch_us: now + 1000, + backfill: false, }, ), ( @@ -3200,6 +3288,7 @@ mod tests { num_records: 10, reinsert_count: 2, initial_insertion_epoch_us: now + 2000, + backfill: false, }, ), ( @@ -3233,6 +3322,7 @@ mod tests { num_records: 1, reinsert_count: u64::MAX - 1, initial_insertion_epoch_us: now, + backfill: false, }; mark_dirty.reinsert(); @@ -3256,9 +3346,9 @@ mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros() as u64; - let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now + 1); + let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now + 1, false); - rollup.observe_dirty_marker(LogPosition::from_offset(20), 5, 1, now); + rollup.observe_dirty_marker(LogPosition::from_offset(20), 5, 1, now, false); assert_eq!(LogPosition::from_offset(10), rollup.start_log_position); assert_eq!(LogPosition::from_offset(25), rollup.limit_log_position); @@ -3298,6 +3388,7 @@ mod tests { num_records: 1, reinsert_count: i % 100, initial_insertion_epoch_us: now + i, + backfill: false, }, )); } @@ -3337,6 +3428,7 @@ mod tests { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: now, + backfill: false, }, ), ( @@ -3351,6 +3443,7 @@ mod tests { num_records: 5, reinsert_count: 1, initial_insertion_epoch_us: now + 1000, + backfill: false, }, ), ( @@ -3365,6 +3458,7 @@ mod tests { num_records: 3, reinsert_count: 2, initial_insertion_epoch_us: now + 2000, + backfill: false, }, ), ]; @@ -3383,7 +3477,7 @@ mod tests { #[test] fn rollup_per_collection_extreme_positions() { let start_position = LogPosition::from_offset(u64::MAX - 10); - let rollup = RollupPerCollection::new(start_position, 5, 42); + let rollup = RollupPerCollection::new(start_position, 5, 42, false); assert_eq!(start_position, rollup.start_log_position); assert!(!rollup.is_empty()); @@ -3392,9 +3486,9 @@ mod tests { #[test] fn rollup_per_collection_zero_epoch() { - let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, u64::MAX); + let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, u64::MAX, false); - rollup.observe_dirty_marker(LogPosition::from_offset(15), 5, 1, 0); + rollup.observe_dirty_marker(LogPosition::from_offset(15), 5, 1, 0, false); assert_eq!(0, rollup.initial_insertion_epoch_us); } @@ -3458,9 +3552,9 @@ mod tests { #[test] fn rollup_per_collection_edge_case_positions() { - let mut rollup = RollupPerCollection::new(LogPosition::from_offset(100), 0, 1042); + let mut rollup = RollupPerCollection::new(LogPosition::from_offset(100), 0, 1042, false); - rollup.observe_dirty_marker(LogPosition::from_offset(50), 25, 1, 1000); + rollup.observe_dirty_marker(LogPosition::from_offset(50), 25, 1, 1000, false); assert_eq!(LogPosition::from_offset(50), rollup.start_log_position); assert_eq!(LogPosition::from_offset(100), rollup.limit_log_position); @@ -3469,13 +3563,13 @@ mod tests { #[test] fn backpressure_threshold_verification() { - let rollup = RollupPerCollection::new(LogPosition::from_offset(0), 100, 42); + let rollup = RollupPerCollection::new(LogPosition::from_offset(0), 100, 42, false); assert!(rollup.requires_backpressure(99)); assert!(rollup.requires_backpressure(100)); assert!(!rollup.requires_backpressure(101)); - let zero_rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42); + let zero_rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42, false); assert!(!zero_rollup.requires_backpressure(1)); assert!(zero_rollup.requires_backpressure(0)); } @@ -4176,4 +4270,103 @@ mod tests { "Forward movement after backward attempt should succeed" ); } + + #[tokio::test] + async fn test_k8s_integration_backfill_basic() { + let log_server = setup_log_server().await; + let collection_id = CollectionUuid::new(); + + let operation_record = OperationRecord { + id: "test-id-1".to_string(), + embedding: None, + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }; + push_log_to_server(&log_server, collection_id, &[operation_record]).await; + + let custom_epoch_us = 1234567890000000u64; + let request = BackfillRequest { + collection_id: collection_id.to_string(), + initial_insertion_epoch_us: custom_epoch_us, + }; + + let response = log_server.backfill(Request::new(request)).await; + assert!( + response.is_ok(), + "backfill should succeed: {:?}", + response.err() + ); + + validate_dirty_log_on_server(&log_server, &[collection_id]).await; + } + + #[tokio::test] + async fn test_k8s_integration_backfill_invalid_collection_id() { + let log_server = setup_log_server().await; + + let request = BackfillRequest { + collection_id: "invalid-uuid".to_string(), + initial_insertion_epoch_us: 1000000u64, + }; + + let response = log_server.backfill(Request::new(request)).await; + assert!( + response.is_err(), + "backfill should fail with invalid collection_id" + ); + assert_eq!(response.unwrap_err().code(), Code::InvalidArgument); + } + + #[tokio::test] + async fn test_k8s_integration_backfill_nonexistent_collection() { + let log_server = setup_log_server().await; + let collection_id = CollectionUuid::new(); + + let request = BackfillRequest { + collection_id: collection_id.to_string(), + initial_insertion_epoch_us: 1000000u64, + }; + + let response = log_server.backfill(Request::new(request)).await; + assert!( + response.is_err(), + "backfill should fail for nonexistent collection" + ); + assert_eq!(response.unwrap_err().code(), Code::Unknown); + } + + #[test] + fn test_k8s_integration_mark_dirty_with_epoch_us() { + let runtime = Runtime::new().unwrap(); + runtime.block_on(async { + let log_server = setup_log_server().await; + let collection_id = CollectionUuid::new(); + + let operation_record = OperationRecord { + id: "test-id-2".to_string(), + embedding: None, + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }; + push_log_to_server(&log_server, collection_id, &[operation_record]).await; + + let mark_dirty = MarkDirty { + collection_id, + dirty_log: log_server.dirty_log.clone(), + }; + + let custom_epoch_us = 5555555555000000u64; + let result = mark_dirty + .mark_dirty_with_epoch_us(LogPosition::from_offset(0), 1, custom_epoch_us) + .await; + + assert!(result.is_ok(), "mark_dirty_with_epoch_us should succeed"); + + validate_dirty_log_on_server(&log_server, &[collection_id]).await; + }); + } } diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index 9213238cb55..64f86b7a813 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -692,6 +692,7 @@ mod tests { collection_id: collection_id.to_string(), first_log_offset: 100, first_log_ts: 1000, + backfill: false, }], }; @@ -700,6 +701,7 @@ mod tests { collection_id: collection_id.to_string(), first_log_offset: 50, first_log_ts: 2000, + backfill: false, }], }; diff --git a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs index 4cbf26c7a54..b95f25d54b2 100644 --- a/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs +++ b/rust/s3heap-service/tests/test_k8s_integration_00_heap_tender.rs @@ -100,6 +100,7 @@ async fn test_k8s_integration_single_mark_dirty_returns_collection() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }; let log_writer = wal3::LogWriter::open_or_initialize( @@ -139,6 +140,7 @@ async fn test_k8s_integration_multiple_markers_same_collection_keeps_max() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, DirtyMarker::MarkDirty { collection_id, @@ -146,6 +148,7 @@ async fn test_k8s_integration_multiple_markers_same_collection_keeps_max() { num_records: 5, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, DirtyMarker::MarkDirty { collection_id, @@ -153,6 +156,7 @@ async fn test_k8s_integration_multiple_markers_same_collection_keeps_max() { num_records: 3, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, ]; @@ -196,6 +200,7 @@ async fn test_k8s_integration_reinsert_count_nonzero_filters_marker() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, DirtyMarker::MarkDirty { collection_id: collection_id2, @@ -203,6 +208,7 @@ async fn test_k8s_integration_reinsert_count_nonzero_filters_marker() { num_records: 5, reinsert_count: 1, initial_insertion_epoch_us: 1234567890, + backfill: false, }, ]; @@ -246,6 +252,7 @@ async fn test_k8s_integration_purge_and_cleared_markers_ignored() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, DirtyMarker::Purge { collection_id: collection_id2, @@ -257,6 +264,7 @@ async fn test_k8s_integration_purge_and_cleared_markers_ignored() { num_records: 5, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }, ]; @@ -302,6 +310,7 @@ async fn test_k8s_integration_multiple_collections_all_processed() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }) .collect(); @@ -345,6 +354,7 @@ async fn test_k8s_integration_cursor_initialized_on_first_run() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }; let log_writer = wal3::LogWriter::open_or_initialize( @@ -399,6 +409,7 @@ async fn test_k8s_integration_cursor_advances_on_subsequent_runs() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }; log_writer .append(serde_json::to_vec(&marker1).unwrap()) @@ -426,6 +437,7 @@ async fn test_k8s_integration_cursor_advances_on_subsequent_runs() { num_records: 5, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }; log_writer .append(serde_json::to_vec(&marker2).unwrap()) @@ -453,6 +465,7 @@ async fn test_k8s_integration_cursor_not_updated_when_no_new_data() { num_records: 10, reinsert_count: 0, initial_insertion_epoch_us: 1234567890, + backfill: false, }; let log_writer = wal3::LogWriter::open_or_initialize( @@ -533,6 +546,7 @@ async fn test_k8s_integration_handles_empty_markers_after_filtering() { num_records: 10, reinsert_count: 5, initial_insertion_epoch_us: 1234567890, + backfill: false, }, DirtyMarker::Purge { collection_id: CollectionUuid::new(), diff --git a/rust/types/src/log.rs b/rust/types/src/log.rs index 14cc802df32..da1ccce75b8 100644 --- a/rust/types/src/log.rs +++ b/rust/types/src/log.rs @@ -16,7 +16,6 @@ pub fn dirty_log_path_from_hostname(hostname: &str) -> String { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] // NOTE(rescrv): This is intentionally an enum for easy forwards/backwards compatibility. Add a // new variant, handle both variants, cycle logs, stop handling old variant. -// TODO(rescrv): Dedupe with log-service crate. pub enum DirtyMarker { /// Marks a collection as needing compaction due to new records. #[serde(rename = "mark_dirty")] @@ -31,6 +30,9 @@ pub enum DirtyMarker { reinsert_count: u64, /// The epoch time in microseconds when this collection was first marked dirty. initial_insertion_epoch_us: u64, + /// Is this marker from a backfill? + #[serde(skip_serializing_if = "is_false", default)] + backfill: bool, }, /// Removes all compaction scheduling for a collection. #[serde(rename = "purge")] @@ -66,9 +68,14 @@ impl DirtyMarker { num_records: _, reinsert_count, initial_insertion_epoch_us: _, + backfill: _, } = self { *reinsert_count = reinsert_count.saturating_add(1); } } } + +fn is_false(x: &bool) -> bool { + !x +} From 36feebbf981768568f8002d3bcfdf9fc562ec91f Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Tue, 11 Nov 2025 16:47:31 -0800 Subject: [PATCH 2/5] propel review --- rust/log-service/src/lib.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 9029efb3750..f4b43261039 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -4300,6 +4300,23 @@ mod tests { ); validate_dirty_log_on_server(&log_server, &[collection_id]).await; + + let dirty_collections = log_server + .cached_get_all_collection_info_to_compact(GetAllCollectionInfoToCompactRequest { + min_compaction_size: 0, + }) + .await + .unwrap() + .into_inner() + .all_collection_info; + let collection_info = dirty_collections + .iter() + .find(|c| c.collection_id == collection_id.to_string()) + .expect("collection should be in dirty list"); + assert!( + collection_info.backfill, + "The backfill flag should be set for the collection" + ); } #[tokio::test] From 8f5a60dd3a3e502f620a9f9b8a61b737741e58ad Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Tue, 11 Nov 2025 17:15:11 -0800 Subject: [PATCH 3/5] fix tests --- rust/log-service/src/lib.rs | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index f4b43261039..ebe18aa99ad 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -506,11 +506,12 @@ impl MarkDirty { dirty_log_path_from_hostname(hostname) } - pub async fn mark_dirty_with_epoch_us( + pub async fn mark_dirty_with_epoch_us_and_backfill( &self, log_position: LogPosition, num_records: usize, initial_insertion_epoch_us: u64, + backfill: bool, ) -> Result<(), wal3::Error> { if let Some(dirty_log) = self.dirty_log.as_ref() { let num_records = num_records as u64; @@ -520,7 +521,7 @@ impl MarkDirty { num_records, reinsert_count: 0, initial_insertion_epoch_us, - backfill: false, + backfill, }; let dirty_marker_json = serde_json::to_string(&dirty_marker).map_err(|err| { tracing::error!("Failed to serialize dirty marker: {}", err); @@ -546,8 +547,13 @@ impl wal3::MarkDirty for MarkDirty { .duration_since(SystemTime::UNIX_EPOCH) .map_err(|_| wal3::Error::Internal)? .as_micros() as u64; - self.mark_dirty_with_epoch_us(log_position, num_records, initial_insertion_epoch_us) - .await + self.mark_dirty_with_epoch_us_and_backfill( + log_position, + num_records, + initial_insertion_epoch_us, + false, + ) + .await } } @@ -1875,7 +1881,12 @@ impl LogServer { dirty_log: self.dirty_log.clone(), }; mark_dirty - .mark_dirty_with_epoch_us(offset, 1usize, request.initial_insertion_epoch_us) + .mark_dirty_with_epoch_us_and_backfill( + offset, + 1usize, + request.initial_insertion_epoch_us, + true, + ) .await .map_err(|err| Status::unknown(err.to_string()))?; Ok(Response::new(BackfillResponse {})) @@ -4301,6 +4312,11 @@ mod tests { validate_dirty_log_on_server(&log_server, &[collection_id]).await; + log_server + .roll_dirty_log() + .await + .expect("Roll Dirty Logs should not fail"); + let dirty_collections = log_server .cached_get_all_collection_info_to_compact(GetAllCollectionInfoToCompactRequest { min_compaction_size: 0, @@ -4378,7 +4394,12 @@ mod tests { let custom_epoch_us = 5555555555000000u64; let result = mark_dirty - .mark_dirty_with_epoch_us(LogPosition::from_offset(0), 1, custom_epoch_us) + .mark_dirty_with_epoch_us_and_backfill( + LogPosition::from_offset(0), + 1, + custom_epoch_us, + false, + ) .await; assert!(result.is_ok(), "mark_dirty_with_epoch_us should succeed"); From f02f3b8aa30143919af2783966c15794f4ad3de2 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Fri, 14 Nov 2025 17:15:51 -0800 Subject: [PATCH 4/5] Adjust downward by the timeout the epoch passed into the backfill --- rust/log-service/src/lib.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index ebe18aa99ad..82465d942a1 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1860,10 +1860,8 @@ impl LogServer { let collection_id = Uuid::parse_str(&request.collection_id) .map(CollectionUuid) .map_err(|_| Status::invalid_argument("Failed to parse collection id"))?; - tracing::info!( - "backfill for {collection_id} at {}", - request.initial_insertion_epoch_us - ); + let target_epoch_us = request.initial_insertion_epoch_us - self.config.timeout_us; + tracing::info!("backfill for {collection_id} at {}", target_epoch_us); let storage_prefix = collection_id.storage_prefix_for_log(); let log_reader = LogReader::new( self.config.reader.clone(), @@ -1881,12 +1879,7 @@ impl LogServer { dirty_log: self.dirty_log.clone(), }; mark_dirty - .mark_dirty_with_epoch_us_and_backfill( - offset, - 1usize, - request.initial_insertion_epoch_us, - true, - ) + .mark_dirty_with_epoch_us_and_backfill(offset, 1usize, target_epoch_us, true) .await .map_err(|err| Status::unknown(err.to_string()))?; Ok(Response::new(BackfillResponse {})) From 6ba6a78e276972b6ede0e06300469ea9a228fb68 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Sat, 15 Nov 2025 08:18:00 -0800 Subject: [PATCH 5/5] saturating_sub --- rust/log-service/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 82465d942a1..ba8dee42baa 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1860,7 +1860,9 @@ impl LogServer { let collection_id = Uuid::parse_str(&request.collection_id) .map(CollectionUuid) .map_err(|_| Status::invalid_argument("Failed to parse collection id"))?; - let target_epoch_us = request.initial_insertion_epoch_us - self.config.timeout_us; + let target_epoch_us = request + .initial_insertion_epoch_us + .saturating_sub(self.config.timeout_us); tracing::info!("backfill for {collection_id} at {}", target_epoch_us); let storage_prefix = collection_id.storage_prefix_for_log(); let log_reader = LogReader::new(