Skip to content

Commit c1dfb27

Browse files
authored
Refactor server implementation and introduce Layer structure (#11468)
1 parent 4689518 commit c1dfb27

File tree

8 files changed

+474
-306
lines changed

8 files changed

+474
-306
lines changed

crates/store/re_server/src/rerun_cloud.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use re_byte_size::SizeBytes as _;
1616
use re_chunk_store::{Chunk, ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
1717
use re_log_encoding::codec::wire::{decoder::Decode as _, encoder::Encode as _};
1818
use re_log_types::{EntityPath, EntryId, StoreId, StoreKind};
19+
use re_protos::cloud::v1alpha1::ext::DataSource;
1920
use re_protos::{
2021
cloud::v1alpha1::{
2122
DeleteEntryResponse, EntryDetails, EntryKind, FetchTaskOutputRequest,
@@ -95,7 +96,7 @@ impl RerunCloudHandler {
9596
}
9697
}
9798

98-
async fn get_storage_engines(
99+
async fn get_chunk_stores(
99100
&self,
100101
dataset_id: EntryId,
101102
mut partition_ids: Vec<PartitionId>,
@@ -113,7 +114,8 @@ impl RerunCloudHandler {
113114
.into_iter()
114115
.map(|partition_id| {
115116
dataset
116-
.partition_store_handle(&partition_id)
117+
//TODO(RR-2482)
118+
.layer_store_handle(&partition_id, DataSource::DEFAULT_LAYER)
117119
.ok_or_else(|| {
118120
tonic::Status::not_found(format!(
119121
"Partition with ID {partition_id} not found"
@@ -491,7 +493,7 @@ impl RerunCloudService for RerunCloudHandler {
491493
kind,
492494
} = source;
493495

494-
if layer != "base" {
496+
if layer != DataSource::DEFAULT_LAYER {
495497
return Err(tonic::Status::unimplemented(format!(
496498
"register_with_dataset: only 'base' layer is implemented, got {layer:?}"
497499
)));
@@ -504,7 +506,7 @@ impl RerunCloudService for RerunCloudHandler {
504506
}
505507

506508
if let Ok(rrd_path) = storage_url.to_file_path() {
507-
let new_partition_ids = dataset.load_rrd(&rrd_path, on_duplicate)?;
509+
let new_partition_ids = dataset.load_rrd(&rrd_path, None, on_duplicate)?;
508510

509511
for partition_id in new_partition_ids {
510512
partition_ids.push(partition_id.to_string());
@@ -592,7 +594,12 @@ impl RerunCloudService for RerunCloudHandler {
592594

593595
#[expect(clippy::iter_over_hash_type)]
594596
for (entity_path, chunk_store) in chunk_stores {
595-
dataset.add_partition(entity_path, ChunkStoreHandle::new(chunk_store));
597+
//TODO(RR-2482)
598+
dataset.add_layer(
599+
entity_path,
600+
DataSource::DEFAULT_LAYER.to_owned(),
601+
ChunkStoreHandle::new(chunk_store),
602+
);
596603
}
597604

598605
Ok(tonic::Response::new(
@@ -767,7 +774,7 @@ impl RerunCloudService for RerunCloudHandler {
767774
.map(PartitionId::try_from)
768775
.collect::<Result<Vec<_>, _>>()?;
769776

770-
let storage_engines = self.get_storage_engines(entry_id, partition_ids).await?;
777+
let storage_engines = self.get_chunk_stores(entry_id, partition_ids).await?;
771778

772779
let stream = futures::stream::iter(storage_engines.into_iter().map(
773780
move |(partition_id, store_handle)| {
@@ -972,7 +979,8 @@ impl RerunCloudService for RerunCloudHandler {
972979
.any(|(_, pid)| pid == &partition_id)
973980
{
974981
dataset
975-
.partition_store_handle(&partition_id)
982+
//TODO(RR-2482)
983+
.layer_store_handle(&partition_id, DataSource::DEFAULT_LAYER)
976984
.map(|store_handle| (partition_id, (dataset_id, store_handle.clone())))
977985
} else {
978986
None
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
use std::collections::{BTreeSet, HashMap, hash_map::Entry};
2+
use std::path::Path;
3+
4+
use arrow::array::RecordBatch;
5+
use arrow::datatypes::Schema;
6+
7+
use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
8+
use re_log_types::{EntryId, StoreKind};
9+
use re_protos::{
10+
cloud::v1alpha1::{
11+
EntryKind, ScanPartitionTableResponse,
12+
ext::{DataSource, DatasetEntry, EntryDetails},
13+
},
14+
common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId},
15+
};
16+
17+
use crate::store::{Error, Layer, Partition};
18+
19+
pub struct Dataset {
20+
id: EntryId,
21+
name: String,
22+
partitions: HashMap<PartitionId, Partition>,
23+
24+
created_at: jiff::Timestamp,
25+
updated_at: jiff::Timestamp,
26+
}
27+
28+
impl Dataset {
29+
pub fn new(id: EntryId, name: String) -> Self {
30+
Self {
31+
id,
32+
name,
33+
partitions: HashMap::default(),
34+
created_at: jiff::Timestamp::now(),
35+
updated_at: jiff::Timestamp::now(),
36+
}
37+
}
38+
39+
pub fn id(&self) -> EntryId {
40+
self.id
41+
}
42+
43+
pub fn name(&self) -> &str {
44+
&self.name
45+
}
46+
47+
pub fn as_entry_details(&self) -> EntryDetails {
48+
EntryDetails {
49+
id: self.id,
50+
name: self.name.clone(),
51+
kind: EntryKind::Dataset,
52+
created_at: self.created_at,
53+
updated_at: self.updated_at,
54+
}
55+
}
56+
57+
pub fn as_dataset_entry(&self) -> DatasetEntry {
58+
DatasetEntry {
59+
details: EntryDetails {
60+
id: self.id,
61+
name: self.name.clone(),
62+
kind: EntryKind::Dataset,
63+
created_at: self.created_at,
64+
updated_at: self.updated_at,
65+
},
66+
67+
dataset_details: Default::default(),
68+
69+
handle: DatasetHandle {
70+
id: Some(self.id),
71+
store_kind: StoreKind::Recording,
72+
url: url::Url::parse(&format!("memory:///{}", self.id)).expect("valid url"),
73+
},
74+
}
75+
}
76+
77+
pub fn iter_store_handles(&self) -> impl Iterator<Item = &ChunkStoreHandle> {
78+
self.partitions
79+
.values()
80+
.flat_map(|partition| partition.iter_store_handles())
81+
}
82+
83+
pub fn schema(&self) -> arrow::error::Result<Schema> {
84+
let schemas = self.iter_store_handles().map(|store_handle| {
85+
let fields = store_handle.read().schema().arrow_fields();
86+
87+
//TODO(ab): why is that needed again?
88+
Schema::new_with_metadata(fields, HashMap::default())
89+
});
90+
91+
Schema::try_merge(schemas)
92+
}
93+
94+
pub fn partition_ids(&self) -> impl Iterator<Item = PartitionId> {
95+
self.partitions.keys().cloned()
96+
}
97+
98+
pub fn partition_table(&self) -> arrow::error::Result<RecordBatch> {
99+
let (partition_ids, last_updated_at, num_chunks, size_bytes): (
100+
Vec<_>,
101+
Vec<_>,
102+
Vec<_>,
103+
Vec<_>,
104+
) = itertools::multiunzip(self.partitions.iter().map(|(partition_id, partition)| {
105+
(
106+
partition_id.to_string(),
107+
partition.last_updated_at().as_nanosecond() as i64,
108+
partition.num_chunks(),
109+
partition.size_bytes(),
110+
)
111+
}));
112+
113+
let layers = vec![vec![DataSource::DEFAULT_LAYER.to_owned()]; partition_ids.len()];
114+
115+
let storage_urls = partition_ids
116+
.iter()
117+
.map(|partition_id| vec![format!("memory:///{}/{partition_id}", self.id)])
118+
.collect();
119+
120+
ScanPartitionTableResponse::create_dataframe(
121+
partition_ids,
122+
layers,
123+
storage_urls,
124+
last_updated_at,
125+
num_chunks,
126+
size_bytes,
127+
)
128+
}
129+
130+
pub fn layer_store_handle(
131+
&self,
132+
partition_id: &PartitionId,
133+
layer_name: &str,
134+
) -> Option<&ChunkStoreHandle> {
135+
self.partitions
136+
.get(partition_id)
137+
.and_then(|partition| partition.layer(layer_name))
138+
.map(|layer| layer.store_handle())
139+
}
140+
141+
pub fn add_layer(
142+
&mut self,
143+
partition_id: PartitionId,
144+
layer_name: String,
145+
store_handle: ChunkStoreHandle,
146+
) {
147+
re_log::debug!(?partition_id, ?layer_name, "add_layer");
148+
149+
self.partitions
150+
.entry(partition_id)
151+
.or_default()
152+
.insert_layer(layer_name, Layer::new(store_handle));
153+
154+
self.updated_at = jiff::Timestamp::now();
155+
}
156+
157+
/// Load a RRD using its recording id as partition id.
158+
pub fn load_rrd(
159+
&mut self,
160+
path: &Path,
161+
layer_name: Option<&str>,
162+
on_duplicate: IfDuplicateBehavior,
163+
) -> Result<BTreeSet<PartitionId>, Error> {
164+
re_log::info!("Loading RRD: {}", path.display());
165+
let contents =
166+
ChunkStore::handle_from_rrd_filepath(&ChunkStoreConfig::CHANGELOG_DISABLED, path)
167+
.map_err(Error::RrdLoadingError)?;
168+
169+
let layer_name = layer_name.unwrap_or(DataSource::DEFAULT_LAYER);
170+
171+
let mut new_partition_ids = BTreeSet::default();
172+
173+
for (store_id, chunk_store) in contents {
174+
if !store_id.is_recording() {
175+
continue;
176+
}
177+
178+
let partition_id = PartitionId::new(store_id.recording_id().to_string());
179+
180+
match self.partitions.entry(partition_id.clone()) {
181+
Entry::Vacant(entry) => {
182+
new_partition_ids.insert(partition_id);
183+
184+
entry.insert(Partition::from_layer_data(layer_name, chunk_store));
185+
}
186+
Entry::Occupied(mut entry) => match on_duplicate {
187+
IfDuplicateBehavior::Overwrite => {
188+
re_log::info!("Overwriting {partition_id}");
189+
entry.insert(Partition::from_layer_data(layer_name, chunk_store));
190+
}
191+
IfDuplicateBehavior::Skip => {
192+
re_log::info!("Ignoring {partition_id}: it already exists");
193+
}
194+
IfDuplicateBehavior::Error => {
195+
return Err(Error::DuplicateEntryNameError(partition_id.to_string()));
196+
}
197+
},
198+
}
199+
}
200+
201+
Ok(new_partition_ids)
202+
}
203+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use re_log_types::EntryId;
2+
3+
#[derive(thiserror::Error, Debug)]
4+
#[expect(clippy::enum_variant_names)]
5+
pub enum Error {
6+
#[error(transparent)]
7+
IoError(#[from] std::io::Error),
8+
9+
#[error(transparent)]
10+
StoreLoadError(#[from] re_entity_db::StoreLoadError),
11+
12+
#[error("Entry name '{0}' already exists")]
13+
DuplicateEntryNameError(String),
14+
15+
#[error("Entry id '{0}' not found")]
16+
EntryIdNotFound(EntryId),
17+
18+
#[error(transparent)]
19+
DataFusionError(#[from] datafusion::error::DataFusionError),
20+
21+
#[error("Error loading RRD: {0}")]
22+
RrdLoadingError(anyhow::Error),
23+
}
24+
25+
impl From<Error> for tonic::Status {
26+
fn from(value: Error) -> Self {
27+
match value {
28+
Error::IoError(err) => Self::internal(format!("IO error: {err:#}")),
29+
Error::StoreLoadError(err) => Self::internal(format!("Store load error: {err:#}")),
30+
Error::DuplicateEntryNameError(name) => {
31+
Self::already_exists(format!("Entry name already exists: {name}"))
32+
}
33+
Error::EntryIdNotFound(id) => Self::not_found(format!("Entry ID not found: {id}")),
34+
Error::DataFusionError(err) => Self::internal(format!("DataFusion error: {err:#}")),
35+
Error::RrdLoadingError(err) => Self::internal(format!("{err:#}")),
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)