Skip to content

Commit 822ac41

Browse files
authored
Add ChunkStoreDiff::SchemaAddition and use it for heuristics
### Related * Closes RR-4028. ### What Our heuristics and recommondations are purely additive, i.e. once something has been recommended once it will only be recommended. An additional assumption is that we never look at the actual values of chunks to make decisions, but rather infer everything from the per-column information. Before this PR, we were recomputed heuristics for each chunk that would arrive. This was particularly expensive for chunks with _any scalar_ components. This PR introduces a new `ChunkStoreDiff::SchemaColumnAddition` event to chunk stores, that gets emitted iff a new column is added to the chunk store's schema. This drastically cuts down on how often we need to recompute heuristics and significantly speeds up ingestion. Source-Ref: e89fd4af59885940dd14edc7fca55e1b305ac1c3
1 parent 9355dd8 commit 822ac41

File tree

16 files changed

+734
-354
lines changed

16 files changed

+734
-354
lines changed

crates/store/re_chunk_store/src/events.rs

Lines changed: 109 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{ChunkId, ChunkStore, ChunkStoreSubscriber, RowId};
1414
/// Per-component information for chunks.
1515
///
1616
/// Created from either a physical chunk or virtual manifest metadata.
17-
#[derive(Clone)]
17+
#[derive(Debug, Clone, PartialEq, Eq)]
1818
pub struct ChunkComponentMeta {
1919
pub descriptor: re_sdk_types::ComponentDescriptor,
2020

@@ -28,15 +28,18 @@ pub struct ChunkComponentMeta {
2828
/// For virtual this means `row_count > 0`.
2929
pub has_data: bool,
3030

31-
/// Whether this component only has static data.
32-
pub is_static_only: bool,
31+
/// Whether this component has ever been written as static data.
32+
///
33+
/// Once a component is static, it stays static. This flag is monotonic
34+
/// and never transitions back to `false`.
35+
pub is_static: bool,
3336
}
3437

3538
/// Chunk meta originating from either a virtual or physical chunk.
3639
///
3740
/// Useful for chunk store subscribers that do the same logic
3841
/// for physical and virtual additions.
39-
#[derive(Clone)]
42+
#[derive(Debug, Clone, PartialEq, Eq)]
4043
pub struct ChunkMeta {
4144
pub entity_path: re_chunk::EntityPath,
4245
pub components: Vec<ChunkComponentMeta>,
@@ -52,7 +55,7 @@ impl ChunkMeta {
5255
descriptor: column.descriptor.clone(),
5356
inner_arrow_datatype: Some(column.list_array.value_type()),
5457
has_data: !column.list_array.values().is_empty(),
55-
is_static_only: chunk.is_static(),
58+
is_static: chunk.is_static(),
5659
})
5760
.collect();
5861

@@ -138,6 +141,22 @@ pub enum ChunkStoreDiff {
138141

139142
/// When a physical chunk has been evicted.
140143
Deletion(ChunkStoreDiffDeletion),
144+
145+
/// Newly discovered entity/component columns in the schema.
146+
///
147+
/// Also emitted when a component's `is_static` flag transitions from `false` to `true`.
148+
/// Note: `has_data` does not influence the emission of `SchemaAddition` events.
149+
SchemaAddition(ChunkStoreDiffSchemaAddition),
150+
}
151+
152+
/// Describes newly added columns to the store schema.
153+
///
154+
/// This event is emitted when previously unseen entity/component pairs are
155+
/// discovered, either from a physical chunk addition or from an RRD manifest.
156+
#[derive(Debug, Clone, PartialEq, Eq)]
157+
pub struct ChunkStoreDiffSchemaAddition {
158+
/// Newly discovered entity/component pairs, grouped by entity.
159+
pub new_columns: Vec<ChunkMeta>,
141160
}
142161

143162
impl From<ChunkStoreDiffAddition> for ChunkStoreDiff {
@@ -158,6 +177,12 @@ impl From<ChunkStoreDiffDeletion> for ChunkStoreDiff {
158177
}
159178
}
160179

180+
impl From<ChunkStoreDiffSchemaAddition> for ChunkStoreDiff {
181+
fn from(value: ChunkStoreDiffSchemaAddition) -> Self {
182+
Self::SchemaAddition(value)
183+
}
184+
}
185+
161186
impl ChunkStoreDiff {
162187
pub fn addition(
163188
chunk_before_processing: Arc<Chunk>,
@@ -191,6 +216,10 @@ impl ChunkStoreDiff {
191216
matches!(self, Self::Deletion(_))
192217
}
193218

219+
pub fn is_schema_addition(&self) -> bool {
220+
matches!(self, Self::SchemaAddition(_))
221+
}
222+
194223
pub fn into_addition(self) -> Option<ChunkStoreDiffAddition> {
195224
match self {
196225
Self::Addition(addition) => Some(addition),
@@ -238,7 +267,7 @@ impl ChunkStoreDiff {
238267
pub fn delta(&self) -> i64 {
239268
match self {
240269
Self::Addition(_) => 1,
241-
Self::VirtualAddition(_) => 0,
270+
Self::VirtualAddition(_) | Self::SchemaAddition(_) => 0,
242271
Self::Deletion(_) => -1,
243272
}
244273
}
@@ -256,7 +285,7 @@ impl ChunkStoreDiff {
256285
pub fn delta_chunk(&self) -> Option<&Arc<Chunk>> {
257286
match self {
258287
Self::Addition(addition) => Some(addition.delta_chunk()),
259-
Self::VirtualAddition(_) => None,
288+
Self::VirtualAddition(_) | Self::SchemaAddition(_) => None,
260289
Self::Deletion(deletion) => Some(&deletion.chunk),
261290
}
262291
}
@@ -376,7 +405,7 @@ impl ChunkStoreDiffAddition {
376405
descriptor: column.descriptor.clone(),
377406
inner_arrow_datatype: Some(column.list_array.value_type()),
378407
has_data: !column.list_array.values().is_empty(),
379-
is_static_only: delta_chunk.is_static(),
408+
is_static: delta_chunk.is_static(),
380409
})
381410
.collect();
382411

@@ -435,16 +464,16 @@ impl ChunkStoreDiffVirtualAddition {
435464
inner_arrow_datatype: Some(inner_arrow_datatype),
436465
// These fields are filled in later in this function
437466
has_data: false,
438-
is_static_only: false,
467+
is_static: false,
439468
},
440469
)
441470
})
442471
.collect();
443472

444-
/// Helper to track what's know about a component from the manifest's static/temporal maps.
473+
/// Helper to track what's known about a component from the manifest's static/temporal maps.
445474
#[derive(Default)]
446475
struct VirtualComponentInfo {
447-
has_temporal: bool,
476+
is_static: bool,
448477

449478
has_rows: bool,
450479
}
@@ -462,7 +491,7 @@ impl ChunkStoreDiffVirtualAddition {
462491
entry.insert(
463492
component,
464493
VirtualComponentInfo {
465-
has_temporal: false,
494+
is_static: true,
466495
has_rows: true,
467496
},
468497
);
@@ -480,7 +509,6 @@ impl ChunkStoreDiffVirtualAddition {
480509
let has_rows = per_chunk.values().any(|e| e.num_rows > 0);
481510

482511
let existing = entry.entry(component).or_default();
483-
existing.has_temporal = true;
484512
existing.has_rows |= has_rows;
485513
}
486514
}
@@ -494,17 +522,17 @@ impl ChunkStoreDiffVirtualAddition {
494522
.into_iter()
495523
.map(|(component, info)| {
496524
let has_data = info.has_rows;
497-
let is_static_only = !info.has_temporal;
525+
let is_static = info.is_static;
498526
if let Some(meta) = component_schema_info.get(&component) {
499527
ChunkComponentMeta {
500528
has_data,
501-
is_static_only,
529+
is_static,
502530
..meta.clone()
503531
}
504532
} else {
505533
ChunkComponentMeta {
506534
has_data,
507-
is_static_only,
535+
is_static,
508536
descriptor: re_sdk_types::ComponentDescriptor::partial(component),
509537
inner_arrow_datatype: None,
510538
}
@@ -562,7 +590,7 @@ impl ChunkStoreDiffDeletion {
562590

563591
#[cfg(test)]
564592
mod tests {
565-
use std::collections::BTreeMap;
593+
use std::collections::{BTreeMap, BTreeSet};
566594

567595
use re_chunk::{RowId, TimelineName};
568596
use re_log_types::example_components::{MyColor, MyIndex, MyPoint, MyPoints};
@@ -612,7 +640,9 @@ mod tests {
612640

613641
for event in events {
614642
let delta = event.delta();
615-
let delta_chunk = event.delta_chunk().unwrap();
643+
let Some(delta_chunk) = event.delta_chunk() else {
644+
continue;
645+
};
616646
let delta_rows = delta * delta_chunk.num_rows() as i64;
617647

618648
for row_id in delta_chunk.row_ids() {
@@ -645,6 +675,18 @@ mod tests {
645675
}
646676
}
647677

678+
/// Helper to extract the set of new component descriptors from a [`ChunkStoreDiff::SchemaAddition`].
679+
fn schema_addition_descriptors(event: &ChunkStoreEvent) -> BTreeSet<ComponentDescriptor> {
680+
match &event.diff {
681+
ChunkStoreDiff::SchemaAddition(sa) => sa
682+
.new_columns
683+
.iter()
684+
.flat_map(|m| m.components.iter().map(|c| c.descriptor.clone()))
685+
.collect(),
686+
other => panic!("expected SchemaAddition, got {other:?}"),
687+
}
688+
}
689+
648690
#[test]
649691
fn store_events() -> anyhow::Result<()> {
650692
let mut store = ChunkStore::new(
@@ -673,7 +715,18 @@ mod tests {
673715
)
674716
.build()?;
675717

676-
view.on_events(&store.insert_chunk(&Arc::new(chunk1))?);
718+
let events = store.insert_chunk(&Arc::new(chunk1))?;
719+
720+
// chunk1 introduces entity_a with MyIndex — expect Addition + SchemaAddition.
721+
assert_eq!(events.len(), 2);
722+
assert!(events[0].is_addition());
723+
assert!(events[1].is_schema_addition());
724+
assert_eq!(
725+
schema_addition_descriptors(&events[1]),
726+
BTreeSet::from([MyIndex::partial_descriptor()]),
727+
);
728+
729+
view.on_events(&events);
677730

678731
similar_asserts::assert_eq!(
679732
GlobalCounts::new(
@@ -725,7 +778,18 @@ mod tests {
725778
.build()?
726779
};
727780

728-
view.on_events(&store.insert_chunk(&Arc::new(chunk2))?);
781+
let events = store.insert_chunk(&Arc::new(chunk2))?;
782+
783+
// chunk2 introduces entity_b with Points + Colors — expect Addition + SchemaAddition.
784+
assert_eq!(events.len(), 2);
785+
assert!(events[0].is_addition());
786+
assert!(events[1].is_schema_addition());
787+
assert_eq!(
788+
schema_addition_descriptors(&events[1]),
789+
BTreeSet::from([MyPoints::descriptor_points(), MyPoints::descriptor_colors(),]),
790+
);
791+
792+
view.on_events(&events);
729793

730794
similar_asserts::assert_eq!(
731795
GlobalCounts::new(
@@ -777,7 +841,21 @@ mod tests {
777841
.build()?
778842
};
779843

780-
view.on_events(&store.insert_chunk(&Arc::new(chunk3))?);
844+
let events = store.insert_chunk(&Arc::new(chunk3))?;
845+
846+
// chunk3 adds MyIndex to entity_b (new!) and re-uses Colors (not new, but transitions to static).
847+
// Colors already existed on entity_b, but this is a static chunk so Colors gets an
848+
// is_static transition (false → true). MyIndex is new on entity_b.
849+
assert_eq!(events.len(), 2);
850+
assert!(events[0].is_addition());
851+
assert!(events[1].is_schema_addition());
852+
assert_eq!(
853+
schema_addition_descriptors(&events[1]),
854+
BTreeSet::from([MyIndex::partial_descriptor(), MyPoints::descriptor_colors(),]),
855+
"MyIndex is new on entity_b; Colors gets is_static transition"
856+
);
857+
858+
view.on_events(&events);
781859

782860
similar_asserts::assert_eq!(
783861
GlobalCounts::new(
@@ -811,6 +889,16 @@ mod tests {
811889
);
812890

813891
let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
892+
893+
// GC should only produce Deletion events, never SchemaAddition.
894+
for event in &events {
895+
assert!(
896+
event.is_deletion(),
897+
"GC should only produce deletions, got: {:?}",
898+
event.diff
899+
);
900+
}
901+
814902
view.on_events(&events);
815903

816904
similar_asserts::assert_eq!(

crates/store/re_chunk_store/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use self::dataframe::{
4545
};
4646
pub use self::events::{
4747
ChunkComponentMeta, ChunkMeta, ChunkStoreDiff, ChunkStoreDiffAddition, ChunkStoreDiffDeletion,
48-
ChunkStoreDiffVirtualAddition, ChunkStoreEvent,
48+
ChunkStoreDiffSchemaAddition, ChunkStoreDiffVirtualAddition, ChunkStoreEvent,
4949
};
5050
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
5151
pub use self::lineage::{ChunkDirectLineage, ChunkDirectLineageReport};

0 commit comments

Comments
 (0)