|
| 1 | +use std::collections::hash_map::Entry; |
| 2 | + |
| 3 | +use ahash::{HashMap, HashSet}; |
| 4 | +use itertools::Itertools as _; |
| 5 | + |
| 6 | +use re_chunk::ChunkId; |
| 7 | +use re_chunk_store::ChunkStoreEvent; |
| 8 | +use re_types_core::ChunkIndexMessage; |
| 9 | + |
| 10 | +/// Info about a single chunk that we know ahead of loading it. |
| 11 | +#[derive(Clone, Debug, Default)] |
| 12 | +pub struct ChunkInfo { |
| 13 | + /// Do we have the whole chunk in memory? |
| 14 | + pub fully_loaded: bool, |
| 15 | +} |
| 16 | + |
| 17 | +/// A secondary index that keeps track of which chunks have been loaded into memory. |
| 18 | +/// |
| 19 | +/// This is currently used to show a progress bar. |
| 20 | +/// |
| 21 | +/// This is constructed from one ore more [`ChunkIndexMessage`], which is what |
| 22 | +/// the server sends to the client/viewer. |
| 23 | +/// TODO(RR-2999): use this for larger-than-RAM. |
| 24 | +#[derive(Default, Debug, Clone)] |
| 25 | +pub struct ChunkIndex { |
| 26 | + /// These are the chunks known to exist in the data source (e.g. remote server). |
| 27 | + /// |
| 28 | + /// The chunk store may split large chunks and merge (compact) small ones, |
| 29 | + /// so what's in the chunk store can differ significantally. |
| 30 | + remote_chunks: HashMap<ChunkId, ChunkInfo>, |
| 31 | + |
| 32 | + /// The chunk store may split large chunks and merge (compact) small ones. |
| 33 | + /// When we later drop a chunk, we need to know which other chunks to invalidate. |
| 34 | + parents: HashMap<ChunkId, HashSet<ChunkId>>, |
| 35 | + |
| 36 | + /// Have we ever deleted a chunk? |
| 37 | + /// |
| 38 | + /// If so, we have run some GC and should not show progress bar. |
| 39 | + has_deleted: bool, |
| 40 | +} |
| 41 | + |
| 42 | +impl ChunkIndex { |
| 43 | + #[expect(clippy::needless_pass_by_value)] // In the future we may want to store them as record batches |
| 44 | + pub fn append(&mut self, msg: ChunkIndexMessage) { |
| 45 | + re_tracing::profile_function!(); |
| 46 | + for chunk_id in msg.chunk_ids() { |
| 47 | + match self.remote_chunks.entry(*chunk_id) { |
| 48 | + Entry::Occupied(_occupied_entry) => { |
| 49 | + // TODO(RR-2999): update time range index for the chunk |
| 50 | + } |
| 51 | + Entry::Vacant(vacant_entry) => { |
| 52 | + vacant_entry.insert(ChunkInfo { |
| 53 | + fully_loaded: false, |
| 54 | + }); |
| 55 | + } |
| 56 | + } |
| 57 | + } |
| 58 | + } |
| 59 | + |
| 60 | + /// How many chunks are in the index? |
| 61 | + /// |
| 62 | + /// Not all of them are necessarily loaded. |
| 63 | + pub fn num_chunks(&self) -> usize { |
| 64 | + self.remote_chunks.len() |
| 65 | + } |
| 66 | + |
| 67 | + /// [0, 1], how many chunks have been loaded? |
| 68 | + /// |
| 69 | + /// Returns `None` if we have already started garbage-collecting some chunks. |
| 70 | + pub fn progress(&self) -> Option<f32> { |
| 71 | + if self.has_deleted { |
| 72 | + None |
| 73 | + } else if self.num_chunks() == 0 { |
| 74 | + Some(1.0) |
| 75 | + } else { |
| 76 | + let num_loaded = self |
| 77 | + .remote_chunks |
| 78 | + .values() |
| 79 | + .filter(|c| c.fully_loaded) |
| 80 | + .count(); |
| 81 | + Some(num_loaded as f32 / self.num_chunks() as f32) |
| 82 | + } |
| 83 | + } |
| 84 | + |
| 85 | + pub fn mark_as_loaded(&mut self, chunk_id: ChunkId) { |
| 86 | + let chunk_info = self.remote_chunks.entry(chunk_id).or_default(); |
| 87 | + chunk_info.fully_loaded = true; |
| 88 | + } |
| 89 | + |
| 90 | + pub fn on_events(&mut self, store_events: &[ChunkStoreEvent]) { |
| 91 | + re_tracing::profile_function!(); |
| 92 | + |
| 93 | + for event in store_events { |
| 94 | + let chunk_id = event.chunk.id(); |
| 95 | + match event.kind { |
| 96 | + re_chunk_store::ChunkStoreDiffKind::Addition => { |
| 97 | + if let Some(chunk_info) = self.remote_chunks.get_mut(&chunk_id) { |
| 98 | + chunk_info.fully_loaded = true; |
| 99 | + } else if let Some(source) = event.split_source { |
| 100 | + // The added chunk was the result of splitting another chunk: |
| 101 | + self.parents.entry(chunk_id).or_default().insert(source); |
| 102 | + } else { |
| 103 | + re_log::warn!("Added chunk that was not part of the index"); |
| 104 | + } |
| 105 | + } |
| 106 | + re_chunk_store::ChunkStoreDiffKind::Deletion => { |
| 107 | + self.mark_deleted(&chunk_id); |
| 108 | + } |
| 109 | + } |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + fn mark_deleted(&mut self, chunk_id: &ChunkId) { |
| 114 | + self.has_deleted = true; |
| 115 | + |
| 116 | + if let Some(chunk_info) = self.remote_chunks.get_mut(chunk_id) { |
| 117 | + chunk_info.fully_loaded = false; |
| 118 | + } else if let Some(parents) = self.parents.remove(chunk_id) { |
| 119 | + // Mark all ancestors as not being fully loaded: |
| 120 | + |
| 121 | + let mut ancestors = parents.into_iter().collect_vec(); |
| 122 | + while let Some(chunk_id) = ancestors.pop() { |
| 123 | + if let Some(chunk_info) = self.remote_chunks.get_mut(&chunk_id) { |
| 124 | + chunk_info.fully_loaded = false; |
| 125 | + } else if let Some(grandparents) = self.parents.get(&chunk_id) { |
| 126 | + ancestors.extend(grandparents); |
| 127 | + } else { |
| 128 | + re_log::warn!("Removed chunk that was not part of the index"); |
| 129 | + } |
| 130 | + } |
| 131 | + } else { |
| 132 | + re_log::warn!("Removed chunk that was not part of the index"); |
| 133 | + } |
| 134 | + } |
| 135 | +} |
0 commit comments