Skip to content

Commit 76ead5d

Browse files
abey79teh-cmc
andauthored
Use ChunkStoreHandle in the OSS server instead of EntityDb (#11422)
Co-authored-by: Clement Rey <[email protected]>
1 parent 4590f7f commit 76ead5d

File tree

4 files changed

+78
-95
lines changed

4 files changed

+78
-95
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9384,13 +9384,16 @@ dependencies = [
93849384
"nohash-hasher",
93859385
"re_build_info",
93869386
"re_build_tools",
9387+
"re_byte_size",
93879388
"re_chunk_store",
93889389
"re_entity_db",
93899390
"re_grpc_server",
93909391
"re_log",
93919392
"re_log_encoding",
93929393
"re_log_types",
93939394
"re_protos",
9395+
"re_tuid",
9396+
"re_types_core",
93949397
"thiserror 1.0.69",
93959398
"tokio",
93969399
"tokio-stream",

crates/store/re_server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ default = []
2929
# Rerun
3030
re_chunk_store.workspace = true
3131
re_build_info.workspace = true
32+
re_byte_size.workspace = true
3233
re_entity_db.workspace = true
3334
re_grpc_server.workspace = true
3435
re_log = { workspace = true, features = ["setup"] }
3536
re_log_encoding.workspace = true
3637
re_log_types.workspace = true
3738
re_protos.workspace = true
39+
re_tuid.workspace = true
40+
re_types_core.workspace = true
3841

3942
# External
4043
anyhow.workspace = true

crates/store/re_server/src/rerun_cloud.rs

Lines changed: 44 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -9,41 +9,30 @@ use arrow::array::{
99
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1010
use datafusion::prelude::SessionContext;
1111
use nohash_hasher::IntSet;
12-
use re_chunk_store::Chunk;
13-
use re_chunk_store::external::re_chunk::external::re_byte_size::SizeBytes as _;
14-
use re_entity_db::EntityDb;
15-
use re_entity_db::external::re_query::StorageEngine;
12+
use tokio_stream::StreamExt as _;
13+
use tonic::{Code, Status};
14+
15+
use re_byte_size::SizeBytes as _;
16+
use re_chunk_store::{Chunk, ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
1617
use re_log_encoding::codec::wire::{decoder::Decode as _, encoder::Encode as _};
17-
use re_log_types::external::re_types_core::{ChunkId, Loggable as _};
1818
use re_log_types::{EntityPath, EntryId, StoreId, StoreKind};
19-
use re_protos::cloud::v1alpha1::ext::GetChunksRequest;
20-
use re_protos::cloud::v1alpha1::{
21-
EntryDetails, GetChunksResponse, GetDatasetSchemaResponse, GetPartitionTableSchemaResponse,
22-
QueryDatasetResponse, ScanPartitionTableResponse, ScanTableResponse,
23-
};
24-
use re_protos::headers::RerunHeadersExtractorExt as _;
25-
use re_protos::{cloud::v1alpha1::RegisterWithDatasetResponse, common::v1alpha1::ext::PartitionId};
2619
use re_protos::{
27-
cloud::v1alpha1::ext,
28-
cloud::v1alpha1::ext::{
29-
CreateDatasetEntryResponse, ReadDatasetEntryResponse, ReadTableEntryResponse,
30-
},
31-
};
32-
use re_protos::{
33-
cloud::v1alpha1::rerun_cloud_service_server::RerunCloudService,
3420
cloud::v1alpha1::{
35-
FetchTaskOutputRequest, FetchTaskOutputResponse, QueryTasksOnCompletionRequest,
36-
QueryTasksRequest, QueryTasksResponse,
21+
DeleteEntryResponse, EntryDetails, EntryKind, FetchTaskOutputRequest,
22+
FetchTaskOutputResponse, GetChunksResponse, GetDatasetSchemaResponse,
23+
GetPartitionTableSchemaResponse, QueryDatasetResponse, QueryTasksOnCompletionRequest,
24+
QueryTasksRequest, QueryTasksResponse, RegisterTableRequest, RegisterTableResponse,
25+
RegisterWithDatasetResponse, ScanPartitionTableResponse, ScanTableResponse,
26+
ext::{
27+
self, CreateDatasetEntryResponse, GetChunksRequest, ReadDatasetEntryResponse,
28+
ReadTableEntryResponse,
29+
},
30+
rerun_cloud_service_server::RerunCloudService,
3731
},
32+
common::v1alpha1::ext::{IfDuplicateBehavior, PartitionId},
33+
headers::RerunHeadersExtractorExt as _,
3834
};
39-
use re_protos::{
40-
cloud::v1alpha1::{
41-
DeleteEntryResponse, EntryKind, RegisterTableRequest, RegisterTableResponse,
42-
},
43-
common::v1alpha1::ext::IfDuplicateBehavior,
44-
};
45-
use tokio_stream::StreamExt as _;
46-
use tonic::{Code, Status};
35+
use re_types_core::{ChunkId, Loggable as _};
4736

4837
use crate::store::{Dataset, InMemoryStore, Table};
4938

@@ -111,7 +100,7 @@ impl RerunCloudHandler {
111100
&self,
112101
dataset_id: EntryId,
113102
mut partition_ids: Vec<PartitionId>,
114-
) -> Result<Vec<(PartitionId, StorageEngine)>, tonic::Status> {
103+
) -> Result<Vec<(PartitionId, ChunkStoreHandle)>, tonic::Status> {
115104
let store = self.store.read().await;
116105
let dataset = store.dataset(dataset_id).ok_or_else(|| {
117106
tonic::Status::not_found(format!("Entry with ID {dataset_id} not found"))
@@ -125,19 +114,13 @@ impl RerunCloudHandler {
125114
.into_iter()
126115
.map(|partition_id| {
127116
dataset
128-
.partition(&partition_id)
117+
.partition_store_handle(&partition_id)
129118
.ok_or_else(|| {
130119
tonic::Status::not_found(format!(
131120
"Partition with ID {partition_id} not found"
132121
))
133122
})
134-
.map(|partition| {
135-
#[expect(unsafe_code)]
136-
// Safety: no viewer is running, and we've locked the store for the duration
137-
// of the handler already.
138-
unsafe { partition.storage_engine_raw() }.clone()
139-
})
140-
.map(|storage_engine| (partition_id, storage_engine))
123+
.map(|store_handle| (partition_id, store_handle.clone()))
141124
})
142125
.collect::<Result<Vec<_>, _>>()
143126
}
@@ -560,7 +543,7 @@ impl RerunCloudService for RerunCloudHandler {
560543

561544
let mut request = request.into_inner();
562545

563-
let mut entity_dbs = HashMap::new();
546+
let mut chunk_stores = HashMap::new();
564547

565548
while let Some(chunk_msg) = request.next().await {
566549
let chunk_msg = chunk_msg?;
@@ -589,16 +572,15 @@ impl RerunCloudService for RerunCloudHandler {
589572
tonic::Status::internal(format!("error decoding chunk from record batch: {err:#}"))
590573
})?);
591574

592-
entity_dbs
575+
chunk_stores
593576
.entry(partition_id.clone())
594577
.or_insert_with(|| {
595-
EntityDb::new(StoreId::new(
596-
StoreKind::Recording,
597-
entry_id.to_string(),
598-
partition_id.id,
599-
))
578+
ChunkStore::new(
579+
StoreId::new(StoreKind::Recording, entry_id.to_string(), partition_id.id),
580+
ChunkStoreConfig::CHANGELOG_DISABLED,
581+
)
600582
})
601-
.add_chunk(&chunk)
583+
.insert_chunk(&chunk)
602584
.map_err(|err| {
603585
tonic::Status::internal(format!("error adding chunk to store: {err:#}"))
604586
})?;
@@ -610,8 +592,8 @@ impl RerunCloudService for RerunCloudHandler {
610592
};
611593

612594
#[expect(clippy::iter_over_hash_type)]
613-
for (entity_path, entity_db) in entity_dbs {
614-
dataset.add_partition(entity_path, entity_db);
595+
for (entity_path, chunk_store) in chunk_stores {
596+
dataset.add_partition(entity_path, ChunkStoreHandle::new(chunk_store));
615597
}
616598

617599
Ok(tonic::Response::new(
@@ -767,9 +749,8 @@ impl RerunCloudService for RerunCloudHandler {
767749
let storage_engines = self.get_storage_engines(entry_id, partition_ids).await?;
768750

769751
let stream = futures::stream::iter(storage_engines.into_iter().map(
770-
move |(partition_id, storage_engine)| {
771-
let storage_read = storage_engine.read();
772-
let chunk_store = storage_read.store();
752+
move |(partition_id, store_handle)| {
753+
let chunk_store = store_handle.read();
773754
let num_rows = chunk_store.num_chunks();
774755

775756
let mut chunk_partition_id = Vec::with_capacity(num_rows);
@@ -940,21 +921,20 @@ impl RerunCloudService for RerunCloudHandler {
940921
let storage_engines = self.get_storage_engines(entry_id, partition_ids).await?;
941922

942923
let stream = futures::stream::iter(storage_engines.into_iter().map(
943-
move |(partition_id, storage_engine)| {
924+
move |(partition_id, store_handle)| {
944925
let compression = re_log_encoding::Compression::Off;
945926
let store_id = StoreId::new(
946927
StoreKind::Recording,
947928
entry_id.to_string(),
948929
partition_id.id.as_str(),
949930
);
950931

951-
let arrow_msgs: Result<Vec<_>, _> = storage_engine
932+
let arrow_msgs: Result<Vec<_>, _> = store_handle
952933
// NOTE: ⚠️This is super cursed ⚠️The underlying lock is synchronous: the only
953934
// reason this doesn't deadlock is because we collect() at the end of this mapping,
954935
// before the overarching stream ever gets a chance to yield.
955936
// Make sure it stays that way.
956937
.read()
957-
.store()
958938
.iter_chunks()
959939
.filter(|chunk| {
960940
entity_paths.is_empty() || entity_paths.contains(chunk.entity_path())
@@ -1036,7 +1016,7 @@ impl RerunCloudService for RerunCloudHandler {
10361016

10371017
// get storage engines only for the partitions we actually need
10381018
let store = self.store.read().await;
1039-
let storage_engines: std::collections::HashMap<_, _> = store
1019+
let store_handles: std::collections::HashMap<_, _> = store
10401020
.iter_datasets()
10411021
.flat_map(|dataset| {
10421022
let dataset_id = dataset.id();
@@ -1046,13 +1026,9 @@ impl RerunCloudService for RerunCloudHandler {
10461026
.iter()
10471027
.any(|(_, pid)| pid == &partition_id)
10481028
{
1049-
dataset.partition(&partition_id).map(|partition| {
1050-
#[expect(unsafe_code)]
1051-
// Safety: no viewer is running, and we've locked the store for the duration
1052-
// of the handler already.
1053-
let storage_engine = unsafe { partition.storage_engine_raw() }.clone();
1054-
(partition_id, (dataset_id, storage_engine))
1055-
})
1029+
dataset
1030+
.partition_store_handle(&partition_id)
1031+
.map(|store_handle| (partition_id, (dataset_id, store_handle.clone())))
10561032
} else {
10571033
None
10581034
}
@@ -1065,15 +1041,13 @@ impl RerunCloudService for RerunCloudHandler {
10651041
let compression = re_log_encoding::Compression::Off;
10661042

10671043
for (chunk_id, partition_id) in chunk_partition_pairs {
1068-
let (dataset_id, storage_engine) =
1069-
storage_engines.get(&partition_id).ok_or_else(|| {
1070-
tonic::Status::internal(format!(
1071-
"Storage engine not found for partition {partition_id}"
1072-
))
1073-
})?;
1044+
let (dataset_id, store_handle) = store_handles.get(&partition_id).ok_or_else(|| {
1045+
tonic::Status::internal(format!(
1046+
"Storage engine not found for partition {partition_id}"
1047+
))
1048+
})?;
10741049

1075-
let storage_read = storage_engine.read();
1076-
let chunk_store = storage_read.store();
1050+
let chunk_store = store_handle.read();
10771051

10781052
if let Some(chunk) = chunk_store.chunk(&chunk_id) {
10791053
let store_id = StoreId::new(

crates/store/re_server/src/store.rs

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
use std::{
2+
collections::{BTreeSet, HashMap, hash_map::Entry},
3+
path::Path,
4+
sync::Arc,
5+
};
6+
17
use arrow::array::{
28
ArrayRef, Int32Array, RecordBatchOptions, StringArray, TimestampNanosecondArray,
39
};
@@ -9,26 +15,18 @@ use datafusion::error::DataFusionError;
915
use itertools::Itertools as _;
1016
use jiff::Timestamp;
1117
use lance::datafusion::LanceTableProvider;
12-
use re_entity_db::{EntityDb, StoreBundle};
13-
use re_log_types::external::re_tuid::Tuid;
14-
use re_log_types::external::re_types_core::{ComponentBatch as _, Loggable as _};
18+
19+
use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
1520
use re_log_types::{EntryId, StoreKind};
16-
use re_protos::cloud::v1alpha1::SystemTableKind;
17-
use re_protos::cloud::v1alpha1::ext::{ProviderDetails as _, SystemTable};
1821
use re_protos::{
19-
cloud::v1alpha1::ScanPartitionTableResponse,
2022
cloud::v1alpha1::{
21-
EntryKind,
22-
ext::{DatasetEntry, EntryDetails, TableEntry},
23+
EntryKind, ScanPartitionTableResponse, SystemTableKind,
24+
ext::{DatasetEntry, EntryDetails, ProviderDetails as _, SystemTable, TableEntry},
2325
},
2426
common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId},
2527
};
26-
use std::sync::Arc;
27-
use std::{
28-
collections::{BTreeSet, HashMap, hash_map::Entry},
29-
fs::File,
30-
path::Path,
31-
};
28+
use re_tuid::Tuid;
29+
use re_types_core::{ComponentBatch as _, Loggable as _};
3230

3331
const ENTRIES_TABLE_NAME: &str = "__entries";
3432

@@ -49,6 +47,9 @@ pub enum Error {
4947

5048
#[error(transparent)]
5149
DataFusionError(#[from] datafusion::error::DataFusionError),
50+
51+
#[error("Error loading RRD: {0}")]
52+
RrdLoadingError(anyhow::Error),
5253
}
5354

5455
impl From<Error> for tonic::Status {
@@ -61,12 +62,13 @@ impl From<Error> for tonic::Status {
6162
}
6263
Error::EntryIdNotFound(id) => Self::not_found(format!("Entry ID not found: {id}")),
6364
Error::DataFusionError(err) => Self::internal(format!("DataFusion error: {err:#}")),
65+
Error::RrdLoadingError(err) => Self::internal(format!("{err:#}")),
6466
}
6567
}
6668
}
6769

6870
pub struct Partition {
69-
entity_db: EntityDb,
71+
store_handle: ChunkStoreHandle,
7072
registration_time: jiff::Timestamp,
7173
}
7274

@@ -116,7 +118,7 @@ impl Dataset {
116118

117119
pub fn schema(&self) -> arrow::error::Result<Schema> {
118120
let schemas = self.partitions.values().map(|partition| {
119-
let columns = partition.entity_db.storage_engine().store().schema();
121+
let columns = partition.store_handle.read().schema();
120122
let fields = columns.arrow_fields();
121123
Schema::new_with_metadata(fields, HashMap::default())
122124
});
@@ -160,16 +162,16 @@ impl Dataset {
160162
)
161163
}
162164

163-
pub fn partition(&self, partition_id: &PartitionId) -> Option<&EntityDb> {
164-
self.partitions.get(partition_id).map(|p| &p.entity_db)
165+
pub fn partition_store_handle(&self, partition_id: &PartitionId) -> Option<&ChunkStoreHandle> {
166+
self.partitions.get(partition_id).map(|p| &p.store_handle)
165167
}
166168

167-
pub fn add_partition(&mut self, partition_id: PartitionId, entity_db: EntityDb) {
169+
pub fn add_partition(&mut self, partition_id: PartitionId, store_handle: ChunkStoreHandle) {
168170
re_log::debug!(?partition_id, "add_partition");
169171
self.partitions.insert(
170172
partition_id,
171173
Partition {
172-
entity_db,
174+
store_handle,
173175
registration_time: jiff::Timestamp::now(),
174176
},
175177
);
@@ -182,12 +184,13 @@ impl Dataset {
182184
on_duplicate: IfDuplicateBehavior,
183185
) -> Result<BTreeSet<PartitionId>, Error> {
184186
re_log::info!("Loading RRD: {}", path.display());
185-
let mut contents = StoreBundle::from_rrd(File::open(path)?)?;
187+
let contents =
188+
ChunkStore::handle_from_rrd_filepath(&ChunkStoreConfig::CHANGELOG_DISABLED, path)
189+
.map_err(Error::RrdLoadingError)?;
186190

187191
let mut new_partition_ids = BTreeSet::default();
188192

189-
for entity_db in contents.drain_entity_dbs() {
190-
let store_id = entity_db.store_id();
193+
for (store_id, chunk_store) in contents {
191194
if !store_id.is_recording() {
192195
continue;
193196
}
@@ -198,15 +201,15 @@ impl Dataset {
198201
Entry::Vacant(entry) => {
199202
new_partition_ids.insert(partition_id);
200203
entry.insert(Partition {
201-
entity_db,
204+
store_handle: chunk_store,
202205
registration_time: jiff::Timestamp::now(),
203206
});
204207
}
205208
Entry::Occupied(mut entry) => match on_duplicate {
206209
IfDuplicateBehavior::Overwrite => {
207210
re_log::info!("Overwriting {partition_id}");
208211
entry.insert(Partition {
209-
entity_db,
212+
store_handle: chunk_store,
210213
registration_time: jiff::Timestamp::now(),
211214
});
212215
}

0 commit comments

Comments
 (0)