From 6850986b3c4a45657af98ac21c4d1cf2aaf488e9 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Fri, 3 Oct 2025 17:50:10 +0200 Subject: [PATCH 01/15] Add grpc endpoint for layer table and cleanup helper objects --- .../proto/rerun/v1alpha1/cloud.proto | 38 +++ .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 121 ++++++++-- .../src/v1alpha1/rerun.cloud.v1alpha1.rs | 223 ++++++++++++++++++ crates/store/re_server/src/rerun_cloud.rs | 28 ++- crates/store/re_server/src/store.rs | 51 ++-- 5 files changed, 412 insertions(+), 49 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto index 763f61b9163d..c64617e3fc40 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto @@ -77,6 +77,21 @@ service RerunCloudService { // This endpoint requires the standard dataset headers. rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {} + // Returns the schema of the layer table. + // + // To inspect the data of the partition table, which is guaranteed to match the schema returned by + // this endpoint, check out `ScanLayerTable`. + // + // This endpoint requires the standard dataset headers. + rpc GetLayerTableSchema(GetLayerTableSchemaRequest) returns (GetLayerTableSchemaResponse) {} + + // Inspect the contents of the layer table. + // + // The data will follow the schema returned by `GetLayerTableSchema`. + // + // This endpoint requires the standard dataset headers. + rpc ScanLayerTable(ScanLayerTableRequest) returns (stream ScanLayerTableResponse) {} + // Returns the schema of the dataset. // // This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -271,6 +286,29 @@ message ScanPartitionTableResponse { rerun.common.v1alpha1.DataframePart data = 1; } +message GetLayerTableSchemaRequest { + reserved "dataset_id"; +} + +message GetLayerTableSchemaResponse { + rerun.common.v1alpha1.Schema schema = 1; +} + +message ScanLayerTableRequest { + // A list of column names to be projected server-side. + // + // All of them if left empty. + repeated string columns = 3; + + reserved "dataset_id"; + reserved "scan_parameters"; +} + +message ScanLayerTableResponse { + // Layer metadata as arrow RecordBatch + rerun.common.v1alpha1.DataframePart data = 1; +} + message GetDatasetSchemaRequest { reserved 1; reserved "dataset_id"; diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 1068c1a1470f..a95359f4ec40 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use arrow::array::RecordBatchOptions; +use arrow::array::{ + FixedSizeBinaryArray, ListBuilder, RecordBatchOptions, StringBuilder, UInt64Array, +}; use arrow::{ array::{Array, ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Field, Schema, TimeUnit}, @@ -11,10 +13,9 @@ use re_chunk::TimelineName; use re_log_types::{EntityPath, EntryId, TimeInt}; use re_sorbet::ComponentColumnDescriptor; -use crate::cloud::v1alpha1::{EntryKind, QueryTasksResponse}; use crate::cloud::v1alpha1::{ - GetDatasetSchemaResponse, RegisterWithDatasetResponse, ScanPartitionTableResponse, - VectorDistanceMetric, + EntryKind, GetDatasetSchemaResponse, QueryTasksResponse, RegisterWithDatasetResponse, + ScanLayerTableResponse, ScanPartitionTableResponse, VectorDistanceMetric, }; use crate::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId}; use crate::common::v1alpha1::{ComponentDescriptor, DataframePart, TaskId}; @@ -1178,51 +1179,125 @@ pub struct RegisterWithDatasetTaskDescriptor { impl ScanPartitionTableResponse { pub const PARTITION_ID: &str = "rerun_partition_id"; - pub const PARTITION_TYPE: &str = "rerun_partition_type"; + pub const LAYERS: &str = "rerun_layers"; + pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; + pub const NUM_CHUNKS: &str = "rerun_num_chunks"; + pub const SIZE_BYTES: &str = "rerun_size_bytes"; + + pub fn schema() -> Schema { + Schema::new(vec![ + Field::new(Self::PARTITION_ID, DataType::Utf8, false), + Field::new( + Self::LAYERS, + DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))), + false, + ), + Field::new( + Self::LAST_UPDATED_AT, + DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())), + false, + ), + Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), + Field::new(Self::SIZE_BYTES, DataType::UInt64, false), + ]) + } + + /// Helper to simplify instantiation of the dataframe in [`Self::data`]. + pub fn create_dataframe( + partition_ids: Vec, + layers: Vec>, + last_updated_at: Vec, + num_chunks: Vec, + size_bytes: Vec, + ) -> arrow::error::Result { + let row_count = partition_ids.len(); + let schema = Arc::new(Self::schema()); + + let mut layers_builder = ListBuilder::new(StringBuilder::new()); + for mut inner_vec in layers { + for layer_name in inner_vec.drain(..) { + layers_builder.values().append_value(layer_name) + } + layers_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(StringArray::from(partition_ids)), + Arc::new(layers_builder.finish()), + Arc::new(TimestampNanosecondArray::from(last_updated_at)), + Arc::new(UInt64Array::from(num_chunks)), + Arc::new(UInt64Array::from(size_bytes)), + ]; + + RecordBatch::try_new_with_options( + schema, + columns, + &RecordBatchOptions::default().with_row_count(Some(row_count)), + ) + } + + pub fn data(&self) -> Result<&DataframePart, TypeConversionError> { + Ok(self.data.as_ref().ok_or_else(|| { + missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data") + })?) + } +} + +// --- ScanLayerTableResponse -- + +impl ScanLayerTableResponse { + pub const LAYER_NAME: &str = "rerun_layer_name"; + pub const PARTITION_ID: &str = "rerun_partition_id"; pub const STORAGE_URL: &str = "rerun_storage_url"; + pub const LAYER_TYPE: &str = "rerun_layer_type"; pub const REGISTRATION_TIME: &str = "rerun_registration_time"; - pub const PARTITION_MANIFEST_UPDATED_AT: &str = "rerun_partition_manifest_updated_at"; - pub const PARTITION_MANIFEST_URL: &str = "rerun_partition_manifest_url"; + pub const NUM_CHUNKS: &str = "rerun_num_chunks"; + pub const SIZE_BYTES: &str = "rerun_size_bytes"; + pub const SCHEMA_SHA256: &str = "rerun_schema_sha256"; pub fn schema() -> Schema { Schema::new(vec![ + Field::new(Self::LAYER_NAME, DataType::Utf8, false), Field::new(Self::PARTITION_ID, DataType::Utf8, false), - Field::new(Self::PARTITION_TYPE, DataType::Utf8, false), Field::new(Self::STORAGE_URL, DataType::Utf8, false), + Field::new(Self::LAYER_TYPE, DataType::Utf8, false), Field::new( Self::REGISTRATION_TIME, - DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())), false, ), - Field::new( - Self::PARTITION_MANIFEST_UPDATED_AT, - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - Field::new(Self::PARTITION_MANIFEST_URL, DataType::Utf8, true), + Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), + Field::new(Self::SIZE_BYTES, DataType::UInt64, false), + Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false), ]) } /// Helper to simplify instantiation of the dataframe in [`Self::data`]. pub fn create_dataframe( + layer_names: Vec, partition_ids: Vec, - partition_types: Vec, storage_urls: Vec, + layer_types: Vec, registration_times: Vec, - partition_manifest_updated_ats: Vec>, - partition_manifest_urls: Vec>, + num_chunks: Vec, + size_bytes: Vec, + schema_sha256s: Vec<[u8; 32]>, ) -> arrow::error::Result { let row_count = partition_ids.len(); let schema = Arc::new(Self::schema()); + let columns: Vec = vec![ + Arc::new(StringArray::from(layer_names)), Arc::new(StringArray::from(partition_ids)), - Arc::new(StringArray::from(partition_types)), Arc::new(StringArray::from(storage_urls)), + Arc::new(StringArray::from(layer_types)), Arc::new(TimestampNanosecondArray::from(registration_times)), - Arc::new(TimestampNanosecondArray::from( - partition_manifest_updated_ats, - )), - Arc::new(StringArray::from(partition_manifest_urls)), + Arc::new(UInt64Array::from(num_chunks)), + Arc::new(UInt64Array::from(size_bytes)), + Arc::new( + FixedSizeBinaryArray::try_from_iter(schema_sha256s.into_iter()) + .expect("sizes of nested slices are guaranteed to match"), + ), ]; RecordBatch::try_new_with_options( diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs index 15c2c38d0557..6dbf278f29d3 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs @@ -197,6 +197,67 @@ impl ::prost::Name for ScanPartitionTableResponse { } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetLayerTableSchemaRequest {} +impl ::prost::Name for GetLayerTableSchemaRequest { + const NAME: &'static str = "GetLayerTableSchemaRequest"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetLayerTableSchemaResponse { + #[prost(message, optional, tag = "1")] + pub schema: ::core::option::Option, +} +impl ::prost::Name for GetLayerTableSchemaResponse { + const NAME: &'static str = "GetLayerTableSchemaResponse"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScanLayerTableRequest { + /// A list of column names to be projected server-side. + /// + /// All of them if left empty. + #[prost(string, repeated, tag = "3")] + pub columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +impl ::prost::Name for ScanLayerTableRequest { + const NAME: &'static str = "ScanLayerTableRequest"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScanLayerTableResponse { + /// Layer metadata as arrow RecordBatch + #[prost(message, optional, tag = "1")] + pub data: ::core::option::Option, +} +impl ::prost::Name for ScanLayerTableResponse { + const NAME: &'static str = "ScanLayerTableResponse"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct GetDatasetSchemaRequest {} impl ::prost::Name for GetDatasetSchemaRequest { const NAME: &'static str = "GetDatasetSchemaRequest"; @@ -2031,6 +2092,57 @@ pub mod rerun_cloud_service_client { )); self.inner.server_streaming(req, path, codec).await } + /// Returns the schema of the layer table. + /// + /// To inspect the data of the partition table, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanLayerTable`. + /// + /// This endpoint requires the standard dataset headers. + pub async fn get_layer_table_schema( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.cloud.v1alpha1.RerunCloudService", + "GetLayerTableSchema", + )); + self.inner.unary(req, path, codec).await + } + /// Inspect the contents of the layer table. + /// + /// The data will follow the schema returned by `GetLayerTableSchema`. + /// + /// This endpoint requires the standard dataset headers. + pub async fn scan_layer_table( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.cloud.v1alpha1.RerunCloudService", + "ScanLayerTable", + )); + self.inner.server_streaming(req, path, codec).await + } /// Returns the schema of the dataset. /// /// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -2456,6 +2568,30 @@ pub mod rerun_cloud_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Returns the schema of the layer table. + /// + /// To inspect the data of the partition table, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanLayerTable`. + /// + /// This endpoint requires the standard dataset headers. + async fn get_layer_table_schema( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ScanLayerTable method. + type ScanLayerTableStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + /// Inspect the contents of the layer table. + /// + /// The data will follow the schema returned by `GetLayerTableSchema`. + /// + /// This endpoint requires the standard dataset headers. + async fn scan_layer_table( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Returns the schema of the dataset. /// /// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -3190,6 +3326,93 @@ pub mod rerun_cloud_service_server { }; Box::pin(fut) } + "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema" => { + #[allow(non_camel_case_types)] + struct GetLayerTableSchemaSvc(pub Arc); + impl + tonic::server::UnaryService + for GetLayerTableSchemaSvc + { + type Response = super::GetLayerTableSchemaResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_layer_table_schema(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetLayerTableSchemaSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable" => { + #[allow(non_camel_case_types)] + struct ScanLayerTableSvc(pub Arc); + impl + tonic::server::ServerStreamingService + for ScanLayerTableSvc + { + type Response = super::ScanLayerTableResponse; + type ResponseStream = T::ScanLayerTableStream; + type Future = + BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::scan_layer_table(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ScanLayerTableSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rerun.cloud.v1alpha1.RerunCloudService/GetDatasetSchema" => { #[allow(non_camel_case_types)] struct GetDatasetSchemaSvc(pub Arc); diff --git a/crates/store/re_server/src/rerun_cloud.rs b/crates/store/re_server/src/rerun_cloud.rs index 3ac52a1bcbc7..7b94927c2759 100644 --- a/crates/store/re_server/src/rerun_cloud.rs +++ b/crates/store/re_server/src/rerun_cloud.rs @@ -10,12 +10,15 @@ use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::prelude::SessionContext; use nohash_hasher::IntSet; use tokio_stream::StreamExt as _; -use tonic::{Code, Status}; +use tonic::{Code, Request, Response, Status}; use re_byte_size::SizeBytes as _; use re_chunk_store::{Chunk, ChunkStore, ChunkStoreConfig, ChunkStoreHandle}; use re_log_encoding::codec::wire::{decoder::Decode as _, encoder::Encode as _}; use re_log_types::{EntityPath, EntryId, StoreId, StoreKind}; +use re_protos::cloud::v1alpha1::{ + GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, ScanLayerTableRequest, +}; use re_protos::{ cloud::v1alpha1::{ DeleteEntryResponse, EntryDetails, EntryKind, FetchTaskOutputRequest, @@ -166,6 +169,7 @@ decl_stream!(FetchChunksResponseStream); decl_stream!(GetChunksResponseStream); decl_stream!(QueryDatasetResponseStream); decl_stream!(ScanPartitionTableResponseStream); +decl_stream!(ScanLayerTableResponseStream); decl_stream!(SearchDatasetResponseStream); decl_stream!(ScanTableResponseStream); decl_stream!(QueryTasksOnCompletionResponseStream); @@ -667,6 +671,28 @@ impl RerunCloudService for RerunCloudHandler { )) } + type ScanLayerTableStream = ScanLayerTableResponseStream; + + async fn get_layer_table_schema( + &self, + _request: Request, + ) -> Result, Status> { + //TODO(RR-2482) + Err(tonic::Status::unimplemented( + "get_layer_table_schema not implemented", + )) + } + + async fn scan_layer_table( + &self, + _request: Request, + ) -> Result, Status> { + //TODO(RR-2482) + Err(tonic::Status::unimplemented( + "scan_layer_table not implemented", + )) + } + async fn get_dataset_schema( &self, request: tonic::Request, diff --git a/crates/store/re_server/src/store.rs b/crates/store/re_server/src/store.rs index a104f1c0583c..d704ae54cc7c 100644 --- a/crates/store/re_server/src/store.rs +++ b/crates/store/re_server/src/store.rs @@ -16,6 +16,7 @@ use itertools::Itertools as _; use jiff::Timestamp; use lance::datafusion::LanceTableProvider; +use re_byte_size::SizeBytes as _; use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle}; use re_log_types::{EntryId, StoreKind}; use re_protos::{ @@ -133,34 +134,34 @@ impl Dataset { } pub fn partition_table(&self) -> arrow::error::Result { - let (partition_ids, registration_times): (Vec<_>, Vec<_>) = self - .partitions - .iter() - .map(|(store_id, partition)| { - ( - store_id.to_string(), - partition.registration_time.as_nanosecond() as i64, - ) - }) - .unzip(); - - let partition_types = vec!["rrd".to_owned(); partition_ids.len()]; - - let storage_urls = partition_ids - .iter() - .map(|partition_id| format!("memory:///{}/{partition_id}", self.id)) - .collect(); - - let partition_manifest_updated_ats = vec![None; partition_ids.len()]; - let partition_manifest_urls = vec![None; partition_ids.len()]; + let (partition_ids, last_updated_at, num_chunks, size_bytes): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = itertools::multiunzip(self.partitions.iter().map(|(store_id, partition)| { + let store = partition.store_handle.read(); + let size_bytes: u64 = store + .iter_chunks() + .map(|chunk| chunk.heap_size_bytes()) + .sum(); + + ( + store_id.to_string(), + partition.registration_time.as_nanosecond() as i64, + store.num_chunks() as u64, + size_bytes, + ) + })); + + let layers = vec![vec!["base".to_owned()]; partition_ids.len()]; ScanPartitionTableResponse::create_dataframe( partition_ids, - partition_types, - storage_urls, - registration_times, - partition_manifest_updated_ats, - partition_manifest_urls, + layers, + last_updated_at, + num_chunks, + size_bytes, ) } From 1cd8a6946e6864fed64e76495799f998674da057 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Mon, 6 Oct 2025 16:21:33 +0200 Subject: [PATCH 02/15] add table provider for layer table --- crates/store/re_datafusion/src/layer_table.rs | 104 ++++++++++++++++++ crates/store/re_datafusion/src/lib.rs | 2 + .../re_datafusion/src/partition_table.rs | 1 + .../re_redap_client/src/connection_client.rs | 26 ++++- crates/store/re_redap_client/src/lib.rs | 3 + 5 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 crates/store/re_datafusion/src/layer_table.rs diff --git a/crates/store/re_datafusion/src/layer_table.rs b/crates/store/re_datafusion/src/layer_table.rs new file mode 100644 index 000000000000..9d0d0a82faaa --- /dev/null +++ b/crates/store/re_datafusion/src/layer_table.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + catalog::TableProvider, + error::{DataFusionError, Result as DataFusionResult}, +}; +use tracing::instrument; + +use re_log_encoding::codec::wire::decoder::Decode as _; +use re_log_types::EntryId; +use re_protos::{ + cloud::v1alpha1::{ScanLayerTableRequest, ScanLayerTableResponse}, + headers::RerunHeadersInjectorExt as _, +}; +use re_redap_client::ConnectionClient; + +use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable}; +use crate::wasm_compat::make_future_send; + +//TODO(ab): deduplicate from PartitionTableProvider +#[derive(Clone)] +pub struct LayerTableProvider { + client: ConnectionClient, + dataset_id: EntryId, +} + +impl std::fmt::Debug for LayerTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LayerTableProvider") + .field("dataset_id", &self.dataset_id) + .finish() + } +} + +impl LayerTableProvider { + pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self { + Self { client, dataset_id } + } + + /// This is a convenience function + pub async fn into_provider(self) -> DataFusionResult> { + Ok(GrpcStreamProvider::prepare(self).await?) + } +} + +#[async_trait] +impl GrpcStreamToTable for LayerTableProvider { + type GrpcStreamData = ScanLayerTableResponse; + + #[instrument(skip(self), err)] + async fn fetch_schema(&mut self) -> DataFusionResult { + let mut client = self.client.clone(); + + let dataset_id = self.dataset_id; + + Ok(Arc::new( + make_future_send(async move { + client + .get_layer_table_schema(dataset_id) + .await + .map_err(|err| { + DataFusionError::External( + format!("Couldn't get partition table schema: {err}").into(), + ) + }) + }) + .await?, + )) + } + + // TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by + // `ConnectionClient` + #[instrument(skip(self), err)] + async fn send_streaming_request( + &mut self, + ) -> DataFusionResult>> { + let request = tonic::Request::new(ScanLayerTableRequest { + columns: vec![], // all of them + }) + .with_entry_id(self.dataset_id) + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + let mut client = self.client.clone(); + + make_future_send(async move { Ok(client.inner().scan_layer_table(request).await) }) + .await? + .map_err(|err| DataFusionError::External(Box::new(err))) + } + + fn process_response( + &mut self, + response: Self::GrpcStreamData, + ) -> DataFusionResult { + response + .data + .ok_or(DataFusionError::Execution( + "DataFrame missing from PartitionList response".to_owned(), + ))? + .decode() + .map_err(|err| DataFusionError::External(Box::new(err))) + } +} diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 3f50baef6d9a..7f617cd147c6 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -7,6 +7,7 @@ mod dataframe_query_provider; #[cfg(target_arch = "wasm32")] mod dataframe_query_provider_wasm; mod grpc_streaming_provider; +mod layer_table; mod partition_table; mod search_provider; mod table_entry_provider; @@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e pub(crate) use dataframe_query_provider::PartitionStreamExec; #[cfg(target_arch = "wasm32")] pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec; +pub use layer_table::LayerTableProvider; pub use partition_table::PartitionTableProvider; pub use search_provider::SearchResultsTableProvider; pub use table_entry_provider::TableEntryTableProvider; diff --git a/crates/store/re_datafusion/src/partition_table.rs b/crates/store/re_datafusion/src/partition_table.rs index d2401d1e3a8f..d7a26a05370b 100644 --- a/crates/store/re_datafusion/src/partition_table.rs +++ b/crates/store/re_datafusion/src/partition_table.rs @@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient; use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable}; use crate::wasm_compat::make_future_send; +//TODO(ab): deduplicate from LayerTableProvider #[derive(Clone)] pub struct PartitionTableProvider { client: ConnectionClient, diff --git a/crates/store/re_redap_client/src/connection_client.rs b/crates/store/re_redap_client/src/connection_client.rs index 20dad6d0976f..8e84c95b221e 100644 --- a/crates/store/re_redap_client/src/connection_client.rs +++ b/crates/store/re_redap_client/src/connection_client.rs @@ -9,9 +9,9 @@ use re_protos::{ TypeConversionError, cloud::v1alpha1::{ CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind, FindEntriesRequest, - GetPartitionTableSchemaRequest, GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, - ReadTableEntryRequest, RegisterWithDatasetResponse, ScanPartitionTableRequest, - ScanPartitionTableResponse, + GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, GetPartitionTableSchemaRequest, + GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, ReadTableEntryRequest, + RegisterWithDatasetResponse, ScanPartitionTableRequest, ScanPartitionTableResponse, ext::{ CreateDatasetEntryResponse, DataSource, DataSourceKind, DatasetDetails, DatasetEntry, EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails as _, @@ -269,6 +269,26 @@ where Ok(partition_ids) } + //TODO(ab): accept entry name + pub async fn get_layer_table_schema( + &mut self, + entry_id: EntryId, + ) -> Result { + Ok(self + .inner() + .get_layer_table_schema( + tonic::Request::new(GetLayerTableSchemaRequest {}) + .with_entry_id(entry_id) + .map_err(|err| StreamEntryError::InvalidId(err.into()))?, + ) + .await + .map_err(|err| StreamEntryError::GetLayerTableSchema(err.into()))? + .into_inner() + .schema + .ok_or_else(|| missing_field!(GetLayerTableSchemaResponse, "schema"))? + .try_into()?) + } + /// Initiate registration of the provided recording URIs with a dataset and return the /// corresponding task descriptors. /// diff --git a/crates/store/re_redap_client/src/lib.rs b/crates/store/re_redap_client/src/lib.rs index 528ef55f76bf..94ab6cbf4422 100644 --- a/crates/store/re_redap_client/src/lib.rs +++ b/crates/store/re_redap_client/src/lib.rs @@ -96,6 +96,9 @@ pub enum StreamEntryError { #[error("Failed reading partition table scheme\nDetails:{0}")] GetPartitionTableSchema(TonicStatusError), + #[error("Failed reading layer table scheme\nDetails:{0}")] + GetLayerTableSchema(TonicStatusError), + #[error("Failed scanning the partition table \nDetails:{0}")] ScanPartitionTable(TonicStatusError), From 1ce0fc57cc9b9bca95dd44e3bd5ed244e192ca95 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Mon, 6 Oct 2025 16:42:08 +0200 Subject: [PATCH 03/15] add `DatasetEntry.layer_table` to Python SDK --- rerun_py/rerun_bindings/rerun_bindings.pyi | 3 +++ rerun_py/src/catalog/dataset_entry.rs | 24 +++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index efd49408b802..a1275d0f291b 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1345,6 +1345,9 @@ class DatasetEntry(Entry): def partition_table(self) -> DataFusionTable: """Return the partition table as a Datafusion table provider.""" + def layer_table(self) -> DataFusionTable: + """Return the layer table as a Datafusion table provider.""" + def partition_url( self, partition_id: str, diff --git a/rerun_py/src/catalog/dataset_entry.rs b/rerun_py/src/catalog/dataset_entry.rs index 602488bf5563..d2b229f6cec8 100644 --- a/rerun_py/src/catalog/dataset_entry.rs +++ b/rerun_py/src/catalog/dataset_entry.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt as _; use tracing::instrument; use re_chunk_store::{ChunkStore, ChunkStoreHandle}; -use re_datafusion::{PartitionTableProvider, SearchResultsTableProvider}; +use re_datafusion::{LayerTableProvider, PartitionTableProvider, SearchResultsTableProvider}; use re_log_encoding::codec::wire::encoder::Encode as _; use re_log_types::{StoreId, StoreKind}; use re_protos::cloud::v1alpha1::ext::DatasetDetails; @@ -165,6 +165,28 @@ impl PyDatasetEntry { }) } + /// Return the layer table as a Datafusion table provider. + #[instrument(skip_all)] + fn layer_table(self_: PyRef<'_, Self>) -> PyResult { + let super_ = self_.as_super(); + let connection = super_.client.borrow(self_.py()).connection().clone(); + let dataset_id = super_.details.id; + + let provider = wait_for_future(self_.py(), async move { + LayerTableProvider::new(connection.client().await?, dataset_id) + .into_provider() + .await + .map_err(to_py_err) + })?; + + #[expect(clippy::string_add)] + Ok(PyDataFusionTable { + client: super_.client.clone_ref(self_.py()), + name: super_.name() + "_layer_table", + provider, + }) + } + /// Return the URL for the given partition. /// /// Parameters From 33f74091aa7ff641e23b66a0469b23075199eeba Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Mon, 6 Oct 2025 17:57:46 +0200 Subject: [PATCH 04/15] reintroduce storage_urls in partition table --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 20 +++++++++++++++++++ crates/store/re_server/src/store.rs | 6 ++++++ 2 files changed, 26 insertions(+) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index a95359f4ec40..ddc9e41f8439 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1180,6 +1180,7 @@ pub struct RegisterWithDatasetTaskDescriptor { impl ScanPartitionTableResponse { pub const PARTITION_ID: &str = "rerun_partition_id"; pub const LAYERS: &str = "rerun_layers"; + pub const STORAGE_URLS: &str = "rerun_storage_urls"; pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; pub const NUM_CHUNKS: &str = "rerun_num_chunks"; pub const SIZE_BYTES: &str = "rerun_size_bytes"; @@ -1192,6 +1193,15 @@ impl ScanPartitionTableResponse { DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))), false, ), + Field::new( + Self::STORAGE_URLS, + DataType::List(Arc::new(Field::new( + Self::STORAGE_URLS, + DataType::Utf8, + false, + ))), + false, + ), Field::new( Self::LAST_UPDATED_AT, DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())), @@ -1206,6 +1216,7 @@ impl ScanPartitionTableResponse { pub fn create_dataframe( partition_ids: Vec, layers: Vec>, + storage_urls: Vec>, last_updated_at: Vec, num_chunks: Vec, size_bytes: Vec, @@ -1221,9 +1232,18 @@ impl ScanPartitionTableResponse { layers_builder.append(true); } + let mut urls_builder = ListBuilder::new(StringBuilder::new()); + for mut inner_vec in storage_urls { + for layer_name in inner_vec.drain(..) { + urls_builder.values().append_value(layer_name) + } + urls_builder.append(true); + } + let columns: Vec = vec![ Arc::new(StringArray::from(partition_ids)), Arc::new(layers_builder.finish()), + Arc::new(urls_builder.finish()), Arc::new(TimestampNanosecondArray::from(last_updated_at)), Arc::new(UInt64Array::from(num_chunks)), Arc::new(UInt64Array::from(size_bytes)), diff --git a/crates/store/re_server/src/store.rs b/crates/store/re_server/src/store.rs index d704ae54cc7c..a8bafe4e0483 100644 --- a/crates/store/re_server/src/store.rs +++ b/crates/store/re_server/src/store.rs @@ -156,9 +156,15 @@ impl Dataset { let layers = vec![vec!["base".to_owned()]; partition_ids.len()]; + let storage_urls = partition_ids + .iter() + .map(|partition_id| vec![format!("memory:///{}/{partition_id}", self.id)]) + .collect(); + ScanPartitionTableResponse::create_dataframe( partition_ids, layers, + storage_urls, last_updated_at, num_chunks, size_bytes, From abc37b00bc43f27ff368059599dcdc06777f3693 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Mon, 6 Oct 2025 18:26:22 +0200 Subject: [PATCH 05/15] Fix schema mismatch --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index ddc9e41f8439..c2fb672aba13 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1204,7 +1204,7 @@ impl ScanPartitionTableResponse { ), Field::new( Self::LAST_UPDATED_AT, - DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())), + DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), @@ -1224,7 +1224,9 @@ impl ScanPartitionTableResponse { let row_count = partition_ids.len(); let schema = Arc::new(Self::schema()); - let mut layers_builder = ListBuilder::new(StringBuilder::new()); + let mut layers_builder = ListBuilder::new(StringBuilder::new()) + .with_field(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))); + for mut inner_vec in layers { for layer_name in inner_vec.drain(..) { layers_builder.values().append_value(layer_name) @@ -1232,7 +1234,10 @@ impl ScanPartitionTableResponse { layers_builder.append(true); } - let mut urls_builder = ListBuilder::new(StringBuilder::new()); + let mut urls_builder = ListBuilder::new(StringBuilder::new()).with_field(Arc::new( + Field::new(Self::STORAGE_URLS, DataType::Utf8, false), + )); + for mut inner_vec in storage_urls { for layer_name in inner_vec.drain(..) { urls_builder.values().append_value(layer_name) @@ -1283,7 +1288,7 @@ impl ScanLayerTableResponse { Field::new(Self::LAYER_TYPE, DataType::Utf8, false), Field::new( Self::REGISTRATION_TIME, - DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())), + DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), From feb1b31404b1b7f49cc3a03de60fc18735725a09 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 09:21:00 +0200 Subject: [PATCH 06/15] Rename everything to "DatasetManifest" --- .../{layer_table.rs => dataset_manifest.rs} | 20 +-- crates/store/re_datafusion/src/lib.rs | 4 +- .../re_datafusion/src/partition_table.rs | 2 +- .../proto/rerun/v1alpha1/cloud.proto | 22 +-- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 12 +- .../src/v1alpha1/rerun.cloud.v1alpha1.rs | 144 +++++++++--------- .../re_redap_client/src/connection_client.rs | 21 +-- crates/store/re_redap_client/src/lib.rs | 2 +- crates/store/re_server/src/rerun_cloud.rs | 29 ++-- rerun_py/rerun_bindings/rerun_bindings.pyi | 4 +- rerun_py/src/catalog/dataset_entry.rs | 27 ++-- 11 files changed, 149 insertions(+), 138 deletions(-) rename crates/store/re_datafusion/src/{layer_table.rs => dataset_manifest.rs} (82%) diff --git a/crates/store/re_datafusion/src/layer_table.rs b/crates/store/re_datafusion/src/dataset_manifest.rs similarity index 82% rename from crates/store/re_datafusion/src/layer_table.rs rename to crates/store/re_datafusion/src/dataset_manifest.rs index 9d0d0a82faaa..eba468321f25 100644 --- a/crates/store/re_datafusion/src/layer_table.rs +++ b/crates/store/re_datafusion/src/dataset_manifest.rs @@ -11,7 +11,7 @@ use tracing::instrument; use re_log_encoding::codec::wire::decoder::Decode as _; use re_log_types::EntryId; use re_protos::{ - cloud::v1alpha1::{ScanLayerTableRequest, ScanLayerTableResponse}, + cloud::v1alpha1::{ScanDatasetManifestRequest, ScanDatasetManifestResponse}, headers::RerunHeadersInjectorExt as _, }; use re_redap_client::ConnectionClient; @@ -21,20 +21,20 @@ use crate::wasm_compat::make_future_send; //TODO(ab): deduplicate from PartitionTableProvider #[derive(Clone)] -pub struct LayerTableProvider { +pub struct DatsetManifestProvider { client: ConnectionClient, dataset_id: EntryId, } -impl std::fmt::Debug for LayerTableProvider { +impl std::fmt::Debug for DatsetManifestProvider { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LayerTableProvider") + f.debug_struct("DatasetManifestProvider") .field("dataset_id", &self.dataset_id) .finish() } } -impl LayerTableProvider { +impl DatsetManifestProvider { pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self { Self { client, dataset_id } } @@ -46,8 +46,8 @@ impl LayerTableProvider { } #[async_trait] -impl GrpcStreamToTable for LayerTableProvider { - type GrpcStreamData = ScanLayerTableResponse; +impl GrpcStreamToTable for DatsetManifestProvider { + type GrpcStreamData = ScanDatasetManifestResponse; #[instrument(skip(self), err)] async fn fetch_schema(&mut self) -> DataFusionResult { @@ -58,7 +58,7 @@ impl GrpcStreamToTable for LayerTableProvider { Ok(Arc::new( make_future_send(async move { client - .get_layer_table_schema(dataset_id) + .get_dataset_manifest_schema(dataset_id) .await .map_err(|err| { DataFusionError::External( @@ -76,7 +76,7 @@ impl GrpcStreamToTable for LayerTableProvider { async fn send_streaming_request( &mut self, ) -> DataFusionResult>> { - let request = tonic::Request::new(ScanLayerTableRequest { + let request = tonic::Request::new(ScanDatasetManifestRequest { columns: vec![], // all of them }) .with_entry_id(self.dataset_id) @@ -84,7 +84,7 @@ impl GrpcStreamToTable for LayerTableProvider { let mut client = self.client.clone(); - make_future_send(async move { Ok(client.inner().scan_layer_table(request).await) }) + make_future_send(async move { Ok(client.inner().scan_dataset_manifest(request).await) }) .await? .map_err(|err| DataFusionError::External(Box::new(err))) } diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 7f617cd147c6..5deff064c2fb 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -6,8 +6,8 @@ mod dataframe_query_common; mod dataframe_query_provider; #[cfg(target_arch = "wasm32")] mod dataframe_query_provider_wasm; +mod dataset_manifest; mod grpc_streaming_provider; -mod layer_table; mod partition_table; mod search_provider; mod table_entry_provider; @@ -19,7 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e pub(crate) use dataframe_query_provider::PartitionStreamExec; #[cfg(target_arch = "wasm32")] pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec; -pub use layer_table::LayerTableProvider; +pub use dataset_manifest::DatsetManifestProvider; pub use partition_table::PartitionTableProvider; pub use search_provider::SearchResultsTableProvider; pub use table_entry_provider::TableEntryTableProvider; diff --git a/crates/store/re_datafusion/src/partition_table.rs b/crates/store/re_datafusion/src/partition_table.rs index d7a26a05370b..0b2f5443f068 100644 --- a/crates/store/re_datafusion/src/partition_table.rs +++ b/crates/store/re_datafusion/src/partition_table.rs @@ -19,7 +19,7 @@ use re_redap_client::ConnectionClient; use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable}; use crate::wasm_compat::make_future_send; -//TODO(ab): deduplicate from LayerTableProvider +//TODO(ab): deduplicate from DatasetManifestProvider #[derive(Clone)] pub struct PartitionTableProvider { client: ConnectionClient, diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto index c64617e3fc40..c0c7a25b00c2 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto @@ -77,20 +77,20 @@ service RerunCloudService { // This endpoint requires the standard dataset headers. rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {} - // Returns the schema of the layer table. + // Returns the schema of the dataset manifest. // - // To inspect the data of the partition table, which is guaranteed to match the schema returned by - // this endpoint, check out `ScanLayerTable`. + // To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by + // this endpoint, check out `ScanDatasetManifest`. // // This endpoint requires the standard dataset headers. - rpc GetLayerTableSchema(GetLayerTableSchemaRequest) returns (GetLayerTableSchemaResponse) {} + rpc GetDatasetManifestSchema(GetDatasetManifestSchemaRequest) returns (GetDatasetManifestSchemaResponse) {} - // Inspect the contents of the layer table. + // Inspect the contents of the dataset manifest. // - // The data will follow the schema returned by `GetLayerTableSchema`. + // The data will follow the schema returned by `GetDatasetManifestSchema`. // // This endpoint requires the standard dataset headers. - rpc ScanLayerTable(ScanLayerTableRequest) returns (stream ScanLayerTableResponse) {} + rpc ScanDatasetManifest(ScanDatasetManifestRequest) returns (stream ScanDatasetManifestResponse) {} // Returns the schema of the dataset. // @@ -286,15 +286,15 @@ message ScanPartitionTableResponse { rerun.common.v1alpha1.DataframePart data = 1; } -message GetLayerTableSchemaRequest { +message GetDatasetManifestSchemaRequest { reserved "dataset_id"; } -message GetLayerTableSchemaResponse { +message GetDatasetManifestSchemaResponse { rerun.common.v1alpha1.Schema schema = 1; } -message ScanLayerTableRequest { +message ScanDatasetManifestRequest { // A list of column names to be projected server-side. // // All of them if left empty. @@ -304,7 +304,7 @@ message ScanLayerTableRequest { reserved "scan_parameters"; } -message ScanLayerTableResponse { +message ScanDatasetManifestResponse { // Layer metadata as arrow RecordBatch rerun.common.v1alpha1.DataframePart data = 1; } diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index c2fb672aba13..51d85111411d 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -15,10 +15,12 @@ use re_sorbet::ComponentColumnDescriptor; use crate::cloud::v1alpha1::{ EntryKind, GetDatasetSchemaResponse, QueryTasksResponse, RegisterWithDatasetResponse, - ScanLayerTableResponse, ScanPartitionTableResponse, VectorDistanceMetric, + ScanDatasetManifestResponse, ScanPartitionTableResponse, VectorDistanceMetric, +}; +use crate::common::v1alpha1::{ + ComponentDescriptor, DataframePart, TaskId, + ext::{DatasetHandle, IfDuplicateBehavior, PartitionId}, }; -use crate::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId}; -use crate::common::v1alpha1::{ComponentDescriptor, DataframePart, TaskId}; use crate::{TypeConversionError, missing_field}; // --- RegisterWithDatasetRequest --- @@ -1268,9 +1270,9 @@ impl ScanPartitionTableResponse { } } -// --- ScanLayerTableResponse -- +// --- ScanDatasetManifestResponse -- -impl ScanLayerTableResponse { +impl ScanDatasetManifestResponse { pub const LAYER_NAME: &str = "rerun_layer_name"; pub const PARTITION_ID: &str = "rerun_partition_id"; pub const STORAGE_URL: &str = "rerun_storage_url"; diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs index 6dbf278f29d3..b10a53bc774d 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs @@ -197,64 +197,64 @@ impl ::prost::Name for ScanPartitionTableResponse { } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct GetLayerTableSchemaRequest {} -impl ::prost::Name for GetLayerTableSchemaRequest { - const NAME: &'static str = "GetLayerTableSchemaRequest"; +pub struct GetDatasetManifestSchemaRequest {} +impl ::prost::Name for GetDatasetManifestSchemaRequest { + const NAME: &'static str = "GetDatasetManifestSchemaRequest"; const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; fn full_name() -> ::prost::alloc::string::String { - "rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + "rerun.cloud.v1alpha1.GetDatasetManifestSchemaRequest".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + "/rerun.cloud.v1alpha1.GetDatasetManifestSchemaRequest".into() } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetLayerTableSchemaResponse { +pub struct GetDatasetManifestSchemaResponse { #[prost(message, optional, tag = "1")] pub schema: ::core::option::Option, } -impl ::prost::Name for GetLayerTableSchemaResponse { - const NAME: &'static str = "GetLayerTableSchemaResponse"; +impl ::prost::Name for GetDatasetManifestSchemaResponse { + const NAME: &'static str = "GetDatasetManifestSchemaResponse"; const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; fn full_name() -> ::prost::alloc::string::String { - "rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + "rerun.cloud.v1alpha1.GetDatasetManifestSchemaResponse".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + "/rerun.cloud.v1alpha1.GetDatasetManifestSchemaResponse".into() } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ScanLayerTableRequest { +pub struct ScanDatasetManifestRequest { /// A list of column names to be projected server-side. /// /// All of them if left empty. #[prost(string, repeated, tag = "3")] pub columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } -impl ::prost::Name for ScanLayerTableRequest { - const NAME: &'static str = "ScanLayerTableRequest"; +impl ::prost::Name for ScanDatasetManifestRequest { + const NAME: &'static str = "ScanDatasetManifestRequest"; const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; fn full_name() -> ::prost::alloc::string::String { - "rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + "rerun.cloud.v1alpha1.ScanDatasetManifestRequest".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + "/rerun.cloud.v1alpha1.ScanDatasetManifestRequest".into() } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ScanLayerTableResponse { +pub struct ScanDatasetManifestResponse { /// Layer metadata as arrow RecordBatch #[prost(message, optional, tag = "1")] pub data: ::core::option::Option, } -impl ::prost::Name for ScanLayerTableResponse { - const NAME: &'static str = "ScanLayerTableResponse"; +impl ::prost::Name for ScanDatasetManifestResponse { + const NAME: &'static str = "ScanDatasetManifestResponse"; const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; fn full_name() -> ::prost::alloc::string::String { - "rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + "rerun.cloud.v1alpha1.ScanDatasetManifestResponse".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + "/rerun.cloud.v1alpha1.ScanDatasetManifestResponse".into() } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] @@ -2092,41 +2092,43 @@ pub mod rerun_cloud_service_client { )); self.inner.server_streaming(req, path, codec).await } - /// Returns the schema of the layer table. + /// Returns the schema of the dataset manifest. /// - /// To inspect the data of the partition table, which is guaranteed to match the schema returned by - /// this endpoint, check out `ScanLayerTable`. + /// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanDatasetManifest`. /// /// This endpoint requires the standard dataset headers. - pub async fn get_layer_table_schema( + pub async fn get_dataset_manifest_schema( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema", + "/rerun.cloud.v1alpha1.RerunCloudService/GetDatasetManifestSchema", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.cloud.v1alpha1.RerunCloudService", - "GetLayerTableSchema", + "GetDatasetManifestSchema", )); self.inner.unary(req, path, codec).await } - /// Inspect the contents of the layer table. + /// Inspect the contents of the dataset manifest. /// - /// The data will follow the schema returned by `GetLayerTableSchema`. + /// The data will follow the schema returned by `GetDatasetManifestSchema`. /// /// This endpoint requires the standard dataset headers. - pub async fn scan_layer_table( + pub async fn scan_dataset_manifest( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response>, + tonic::Response>, tonic::Status, > { self.inner.ready().await.map_err(|e| { @@ -2134,12 +2136,12 @@ pub mod rerun_cloud_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable", + "/rerun.cloud.v1alpha1.RerunCloudService/ScanDatasetManifest", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.cloud.v1alpha1.RerunCloudService", - "ScanLayerTable", + "ScanDatasetManifest", )); self.inner.server_streaming(req, path, codec).await } @@ -2568,30 +2570,33 @@ pub mod rerun_cloud_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Returns the schema of the layer table. + /// Returns the schema of the dataset manifest. /// - /// To inspect the data of the partition table, which is guaranteed to match the schema returned by - /// this endpoint, check out `ScanLayerTable`. + /// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanDatasetManifest`. /// /// This endpoint requires the standard dataset headers. - async fn get_layer_table_schema( + async fn get_dataset_manifest_schema( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Server streaming response type for the ScanLayerTable method. - type ScanLayerTableStream: tonic::codegen::tokio_stream::Stream< - Item = std::result::Result, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Server streaming response type for the ScanDatasetManifest method. + type ScanDatasetManifestStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, > + std::marker::Send + 'static; - /// Inspect the contents of the layer table. + /// Inspect the contents of the dataset manifest. /// - /// The data will follow the schema returned by `GetLayerTableSchema`. + /// The data will follow the schema returned by `GetDatasetManifestSchema`. /// /// This endpoint requires the standard dataset headers. - async fn scan_layer_table( + async fn scan_dataset_manifest( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Returns the schema of the dataset. /// /// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -3326,23 +3331,25 @@ pub mod rerun_cloud_service_server { }; Box::pin(fut) } - "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema" => { + "/rerun.cloud.v1alpha1.RerunCloudService/GetDatasetManifestSchema" => { #[allow(non_camel_case_types)] - struct GetLayerTableSchemaSvc(pub Arc); + struct GetDatasetManifestSchemaSvc(pub Arc); impl - tonic::server::UnaryService - for GetLayerTableSchemaSvc + tonic::server::UnaryService + for GetDatasetManifestSchemaSvc { - type Response = super::GetLayerTableSchemaResponse; + type Response = super::GetDatasetManifestSchemaResponse; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_layer_table_schema(&inner, request) - .await + ::get_dataset_manifest_schema( + &inner, request, + ) + .await }; Box::pin(fut) } @@ -3353,7 +3360,7 @@ pub mod rerun_cloud_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = GetLayerTableSchemaSvc(inner); + let method = GetDatasetManifestSchemaSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -3369,24 +3376,25 @@ pub mod rerun_cloud_service_server { }; Box::pin(fut) } - "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable" => { + "/rerun.cloud.v1alpha1.RerunCloudService/ScanDatasetManifest" => { #[allow(non_camel_case_types)] - struct ScanLayerTableSvc(pub Arc); + struct ScanDatasetManifestSvc(pub Arc); impl - tonic::server::ServerStreamingService - for ScanLayerTableSvc + tonic::server::ServerStreamingService + for ScanDatasetManifestSvc { - type Response = super::ScanLayerTableResponse; - type ResponseStream = T::ScanLayerTableStream; + type Response = super::ScanDatasetManifestResponse; + type ResponseStream = T::ScanDatasetManifestStream; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::scan_layer_table(&inner, request).await + ::scan_dataset_manifest(&inner, request) + .await }; Box::pin(fut) } @@ -3397,7 +3405,7 @@ pub mod rerun_cloud_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = ScanLayerTableSvc(inner); + let method = ScanDatasetManifestSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/crates/store/re_redap_client/src/connection_client.rs b/crates/store/re_redap_client/src/connection_client.rs index 8e84c95b221e..c1f8290410e4 100644 --- a/crates/store/re_redap_client/src/connection_client.rs +++ b/crates/store/re_redap_client/src/connection_client.rs @@ -2,16 +2,19 @@ use arrow::datatypes::Schema as ArrowSchema; use tokio_stream::StreamExt as _; use tonic::codegen::{Body, StdError}; +use crate::{StreamEntryError, StreamError}; use re_arrow_util::ArrowArrayDowncastRef as _; use re_log_encoding::codec::wire::decoder::Decode as _; use re_log_types::EntryId; + use re_protos::{ TypeConversionError, cloud::v1alpha1::{ CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind, FindEntriesRequest, - GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, GetPartitionTableSchemaRequest, - GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, ReadTableEntryRequest, - RegisterWithDatasetResponse, ScanPartitionTableRequest, ScanPartitionTableResponse, + GetDatasetManifestSchemaRequest, GetDatasetManifestSchemaResponse, + GetPartitionTableSchemaRequest, GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, + ReadTableEntryRequest, RegisterWithDatasetResponse, ScanPartitionTableRequest, + ScanPartitionTableResponse, ext::{ CreateDatasetEntryResponse, DataSource, DataSourceKind, DatasetDetails, DatasetEntry, EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails as _, @@ -31,8 +34,6 @@ use re_protos::{ missing_field, }; -use crate::{StreamEntryError, StreamError}; - /// Expose an ergonomic API over the gRPC redap client. /// /// Implementation note: this type is generic so that it can be used with several client types. This @@ -270,22 +271,22 @@ where } //TODO(ab): accept entry name - pub async fn get_layer_table_schema( + pub async fn get_dataset_manifest_schema( &mut self, entry_id: EntryId, ) -> Result { Ok(self .inner() - .get_layer_table_schema( - tonic::Request::new(GetLayerTableSchemaRequest {}) + .get_dataset_manifest_schema( + tonic::Request::new(GetDatasetManifestSchemaRequest {}) .with_entry_id(entry_id) .map_err(|err| StreamEntryError::InvalidId(err.into()))?, ) .await - .map_err(|err| StreamEntryError::GetLayerTableSchema(err.into()))? + .map_err(|err| StreamEntryError::GetDatasetManifestSchema(err.into()))? .into_inner() .schema - .ok_or_else(|| missing_field!(GetLayerTableSchemaResponse, "schema"))? + .ok_or_else(|| missing_field!(GetDatasetManifestSchemaResponse, "schema"))? .try_into()?) } diff --git a/crates/store/re_redap_client/src/lib.rs b/crates/store/re_redap_client/src/lib.rs index 94ab6cbf4422..7579edeba07d 100644 --- a/crates/store/re_redap_client/src/lib.rs +++ b/crates/store/re_redap_client/src/lib.rs @@ -97,7 +97,7 @@ pub enum StreamEntryError { GetPartitionTableSchema(TonicStatusError), #[error("Failed reading layer table scheme\nDetails:{0}")] - GetLayerTableSchema(TonicStatusError), + GetDatasetManifestSchema(TonicStatusError), #[error("Failed scanning the partition table \nDetails:{0}")] ScanPartitionTable(TonicStatusError), diff --git a/crates/store/re_server/src/rerun_cloud.rs b/crates/store/re_server/src/rerun_cloud.rs index 7b94927c2759..884b99d1986f 100644 --- a/crates/store/re_server/src/rerun_cloud.rs +++ b/crates/store/re_server/src/rerun_cloud.rs @@ -16,16 +16,15 @@ use re_byte_size::SizeBytes as _; use re_chunk_store::{Chunk, ChunkStore, ChunkStoreConfig, ChunkStoreHandle}; use re_log_encoding::codec::wire::{decoder::Decode as _, encoder::Encode as _}; use re_log_types::{EntityPath, EntryId, StoreId, StoreKind}; -use re_protos::cloud::v1alpha1::{ - GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, ScanLayerTableRequest, -}; use re_protos::{ cloud::v1alpha1::{ DeleteEntryResponse, EntryDetails, EntryKind, FetchTaskOutputRequest, - FetchTaskOutputResponse, GetChunksResponse, GetDatasetSchemaResponse, + FetchTaskOutputResponse, GetChunksResponse, GetDatasetManifestSchemaRequest, + GetDatasetManifestSchemaResponse, GetDatasetSchemaResponse, GetPartitionTableSchemaResponse, QueryDatasetResponse, QueryTasksOnCompletionRequest, QueryTasksRequest, QueryTasksResponse, RegisterTableRequest, RegisterTableResponse, - RegisterWithDatasetResponse, ScanPartitionTableResponse, ScanTableResponse, + RegisterWithDatasetResponse, ScanDatasetManifestRequest, ScanPartitionTableResponse, + ScanTableResponse, ext::{ self, CreateDatasetEntryResponse, GetChunksRequest, ReadDatasetEntryResponse, ReadTableEntryResponse, @@ -169,7 +168,7 @@ decl_stream!(FetchChunksResponseStream); decl_stream!(GetChunksResponseStream); decl_stream!(QueryDatasetResponseStream); decl_stream!(ScanPartitionTableResponseStream); -decl_stream!(ScanLayerTableResponseStream); +decl_stream!(ScanDatasetManifestResponseStream); decl_stream!(SearchDatasetResponseStream); decl_stream!(ScanTableResponseStream); decl_stream!(QueryTasksOnCompletionResponseStream); @@ -671,25 +670,25 @@ impl RerunCloudService for RerunCloudHandler { )) } - type ScanLayerTableStream = ScanLayerTableResponseStream; + type ScanDatasetManifestStream = ScanDatasetManifestResponseStream; - async fn get_layer_table_schema( + async fn get_dataset_manifest_schema( &self, - _request: Request, - ) -> Result, Status> { + _request: Request, + ) -> Result, Status> { //TODO(RR-2482) Err(tonic::Status::unimplemented( - "get_layer_table_schema not implemented", + "get_dataset_manifest_schema not implemented", )) } - async fn scan_layer_table( + async fn scan_dataset_manifest( &self, - _request: Request, - ) -> Result, Status> { + _request: Request, + ) -> Result, Status> { //TODO(RR-2482) Err(tonic::Status::unimplemented( - "scan_layer_table not implemented", + "scan_dataset_manifest not implemented", )) } diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index a1275d0f291b..73227bcac711 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1345,8 +1345,8 @@ class DatasetEntry(Entry): def partition_table(self) -> DataFusionTable: """Return the partition table as a Datafusion table provider.""" - def layer_table(self) -> DataFusionTable: - """Return the layer table as a Datafusion table provider.""" + def manifest(self) -> DataFusionTable: + """Return the dataset manifest as a Datafusion table provider.""" def partition_url( self, diff --git a/rerun_py/src/catalog/dataset_entry.rs b/rerun_py/src/catalog/dataset_entry.rs index d2b229f6cec8..13cab5eb9e9a 100644 --- a/rerun_py/src/catalog/dataset_entry.rs +++ b/rerun_py/src/catalog/dataset_entry.rs @@ -13,18 +13,19 @@ use tokio_stream::StreamExt as _; use tracing::instrument; use re_chunk_store::{ChunkStore, ChunkStoreHandle}; -use re_datafusion::{LayerTableProvider, PartitionTableProvider, SearchResultsTableProvider}; +use re_datafusion::{DatsetManifestProvider, PartitionTableProvider, SearchResultsTableProvider}; use re_log_encoding::codec::wire::encoder::Encode as _; use re_log_types::{StoreId, StoreKind}; -use re_protos::cloud::v1alpha1::ext::DatasetDetails; -use re_protos::cloud::v1alpha1::ext::IndexProperties; -use re_protos::cloud::v1alpha1::{CreateIndexRequest, GetChunksRequest, SearchDatasetRequest}; -use re_protos::cloud::v1alpha1::{ - IndexConfig, IndexQueryProperties, InvertedIndexQuery, VectorIndexQuery, index_query_properties, +use re_protos::{ + cloud::v1alpha1::{ + CreateIndexRequest, GetChunksRequest, IndexConfig, IndexQueryProperties, + InvertedIndexQuery, SearchDatasetRequest, VectorIndexQuery, + ext::{DatasetDetails, IndexProperties}, + index_query_properties, + }, + common::v1alpha1::{IfDuplicateBehavior, ext::DatasetHandle}, + headers::RerunHeadersInjectorExt as _, }; -use re_protos::common::v1alpha1::IfDuplicateBehavior; -use re_protos::common::v1alpha1::ext::DatasetHandle; -use re_protos::headers::RerunHeadersInjectorExt as _; use re_redap_client::get_chunks_response_to_chunk_and_partition_id; use re_sorbet::{SorbetColumnDescriptors, TimeColumnSelector}; @@ -165,15 +166,15 @@ impl PyDatasetEntry { }) } - /// Return the layer table as a Datafusion table provider. + /// Return the dataset manifest as a Datafusion table provider. #[instrument(skip_all)] - fn layer_table(self_: PyRef<'_, Self>) -> PyResult { + fn manifest(self_: PyRef<'_, Self>) -> PyResult { let super_ = self_.as_super(); let connection = super_.client.borrow(self_.py()).connection().clone(); let dataset_id = super_.details.id; let provider = wait_for_future(self_.py(), async move { - LayerTableProvider::new(connection.client().await?, dataset_id) + DatsetManifestProvider::new(connection.client().await?, dataset_id) .into_provider() .await .map_err(to_py_err) @@ -182,7 +183,7 @@ impl PyDatasetEntry { #[expect(clippy::string_add)] Ok(PyDataFusionTable { client: super_.client.clone_ref(self_.py()), - name: super_.name() + "_layer_table", + name: super_.name() + "__manifest", provider, }) } From b070cbe8f6424a00ad12e52c3c25713a37a62462 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 09:29:54 +0200 Subject: [PATCH 07/15] Fix name + update proto docstring --- crates/store/re_datafusion/src/dataset_manifest.rs | 8 ++++---- crates/store/re_datafusion/src/lib.rs | 2 +- crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto | 6 +++--- .../store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs | 6 +++--- rerun_py/src/catalog/dataset_entry.rs | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/store/re_datafusion/src/dataset_manifest.rs b/crates/store/re_datafusion/src/dataset_manifest.rs index eba468321f25..c07127b5291d 100644 --- a/crates/store/re_datafusion/src/dataset_manifest.rs +++ b/crates/store/re_datafusion/src/dataset_manifest.rs @@ -21,12 +21,12 @@ use crate::wasm_compat::make_future_send; //TODO(ab): deduplicate from PartitionTableProvider #[derive(Clone)] -pub struct DatsetManifestProvider { +pub struct DatasetManifestProvider { client: ConnectionClient, dataset_id: EntryId, } -impl std::fmt::Debug for DatsetManifestProvider { +impl std::fmt::Debug for DatasetManifestProvider { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DatasetManifestProvider") .field("dataset_id", &self.dataset_id) @@ -34,7 +34,7 @@ impl std::fmt::Debug for DatsetManifestProvider { } } -impl DatsetManifestProvider { +impl DatasetManifestProvider { pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self { Self { client, dataset_id } } @@ -46,7 +46,7 @@ impl DatsetManifestProvider { } #[async_trait] -impl GrpcStreamToTable for DatsetManifestProvider { +impl GrpcStreamToTable for DatasetManifestProvider { type GrpcStreamData = ScanDatasetManifestResponse; #[instrument(skip(self), err)] diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 5deff064c2fb..53c373090b18 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -19,7 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e pub(crate) use dataframe_query_provider::PartitionStreamExec; #[cfg(target_arch = "wasm32")] pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec; -pub use dataset_manifest::DatsetManifestProvider; +pub use dataset_manifest::DatasetManifestProvider; pub use partition_table::PartitionTableProvider; pub use search_provider::SearchResultsTableProvider; pub use table_entry_provider::TableEntryTableProvider; diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto index c0c7a25b00c2..82572fe7b0db 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto @@ -816,7 +816,7 @@ message CreateDatasetEntryRequest { // Name of the dataset entry to create. // // The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry - // with the same name already exists, the request will fail. + // with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. optional string name = 1; // If specified, create the entry using this specific ID. Use at your own risk. @@ -858,10 +858,10 @@ message UpdateDatasetEntryResponse { // RegisterTable message RegisterTableRequest { - // Name of the table entry to create. + // Name of the dataset entry to create. // // The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry - // with the same name already exists, the request will fail. + // with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. string name = 1; // Information about the table to register. diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs index b10a53bc774d..a99faadc04d3 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs @@ -1257,7 +1257,7 @@ pub struct CreateDatasetEntryRequest { /// Name of the dataset entry to create. /// /// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry - /// with the same name already exists, the request will fail. + /// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. #[prost(string, optional, tag = "1")] pub name: ::core::option::Option<::prost::alloc::string::String>, /// If specified, create the entry using this specific ID. Use at your own risk. @@ -1353,10 +1353,10 @@ impl ::prost::Name for UpdateDatasetEntryResponse { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterTableRequest { - /// Name of the table entry to create. + /// Name of the dataset entry to create. /// /// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry - /// with the same name already exists, the request will fail. + /// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, /// Information about the table to register. diff --git a/rerun_py/src/catalog/dataset_entry.rs b/rerun_py/src/catalog/dataset_entry.rs index 13cab5eb9e9a..ae79ac260f69 100644 --- a/rerun_py/src/catalog/dataset_entry.rs +++ b/rerun_py/src/catalog/dataset_entry.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt as _; use tracing::instrument; use re_chunk_store::{ChunkStore, ChunkStoreHandle}; -use re_datafusion::{DatsetManifestProvider, PartitionTableProvider, SearchResultsTableProvider}; +use re_datafusion::{DatasetManifestProvider, PartitionTableProvider, SearchResultsTableProvider}; use re_log_encoding::codec::wire::encoder::Encode as _; use re_log_types::{StoreId, StoreKind}; use re_protos::{ @@ -174,7 +174,7 @@ impl PyDatasetEntry { let dataset_id = super_.details.id; let provider = wait_for_future(self_.py(), async move { - DatsetManifestProvider::new(connection.client().await?, dataset_id) + DatasetManifestProvider::new(connection.client().await?, dataset_id) .into_provider() .await .map_err(to_py_err) From 5d43d14e78888125629273a1c238cac974f9c358 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler <49431240+abey79@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:34:22 +0200 Subject: [PATCH 08/15] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/store/re_datafusion/src/dataset_manifest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_datafusion/src/dataset_manifest.rs b/crates/store/re_datafusion/src/dataset_manifest.rs index c07127b5291d..f7bc7f9d2dfe 100644 --- a/crates/store/re_datafusion/src/dataset_manifest.rs +++ b/crates/store/re_datafusion/src/dataset_manifest.rs @@ -96,7 +96,7 @@ impl GrpcStreamToTable for DatasetManifestProvider { response .data .ok_or(DataFusionError::Execution( - "DataFrame missing from PartitionList response".to_owned(), + "DataFrame missing from DatasetManifest response".to_owned(), ))? .decode() .map_err(|err| DataFusionError::External(Box::new(err))) From 6bba8ab8651dd8e824149fd02773b56c28c77715 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler <49431240+abey79@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:34:33 +0200 Subject: [PATCH 09/15] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/store/re_datafusion/src/dataset_manifest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_datafusion/src/dataset_manifest.rs b/crates/store/re_datafusion/src/dataset_manifest.rs index f7bc7f9d2dfe..4baf482f5cab 100644 --- a/crates/store/re_datafusion/src/dataset_manifest.rs +++ b/crates/store/re_datafusion/src/dataset_manifest.rs @@ -62,7 +62,7 @@ impl GrpcStreamToTable for DatasetManifestProvider { .await .map_err(|err| { DataFusionError::External( - format!("Couldn't get partition table schema: {err}").into(), + format!("Couldn't get dataset manifest schema: {err}").into(), ) }) }) From a82dbb4805160609b0d64f3e02390b9afcbca854 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 09:37:28 +0200 Subject: [PATCH 10/15] Minor fix --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 51d85111411d..63f97d0f9cf9 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1264,9 +1264,10 @@ impl ScanPartitionTableResponse { } pub fn data(&self) -> Result<&DataframePart, TypeConversionError> { - Ok(self.data.as_ref().ok_or_else(|| { - missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data") - })?) + Ok(self + .data + .as_ref() + .ok_or_else(|| missing_field!(Self, "data"))?) } } @@ -1335,9 +1336,10 @@ impl ScanDatasetManifestResponse { } pub fn data(&self) -> Result<&DataframePart, TypeConversionError> { - Ok(self.data.as_ref().ok_or_else(|| { - missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data") - })?) + Ok(self + .data + .as_ref() + .ok_or_else(|| missing_field!(Self, "data"))?) } } From 648d46a850e0980db6bd373c8b872589e10b6c29 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 09:50:36 +0200 Subject: [PATCH 11/15] Minor minor fix --- crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto | 2 +- crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto index 82572fe7b0db..e94a48b63cab 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto @@ -858,7 +858,7 @@ message UpdateDatasetEntryResponse { // RegisterTable message RegisterTableRequest { - // Name of the dataset entry to create. + // Name of the table entry to create. // // The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry // with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs index a99faadc04d3..4bcd89714536 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs @@ -1353,7 +1353,7 @@ impl ::prost::Name for UpdateDatasetEntryResponse { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterTableRequest { - /// Name of the dataset entry to create. + /// Name of the table entry to create. /// /// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry /// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved. From af5c3df4b116757f71324035267ca93a612dd05a Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 10:11:47 +0200 Subject: [PATCH 12/15] Add explicit `fields()` method --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 63f97d0f9cf9..21da2e54b4f4 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1187,8 +1187,8 @@ impl ScanPartitionTableResponse { pub const NUM_CHUNKS: &str = "rerun_num_chunks"; pub const SIZE_BYTES: &str = "rerun_size_bytes"; - pub fn schema() -> Schema { - Schema::new(vec![ + pub fn fields() -> Vec { + vec![ Field::new(Self::PARTITION_ID, DataType::Utf8, false), Field::new( Self::LAYERS, @@ -1211,7 +1211,11 @@ impl ScanPartitionTableResponse { ), Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), Field::new(Self::SIZE_BYTES, DataType::UInt64, false), - ]) + ] + } + + pub fn schema() -> Schema { + Schema::new(Self::fields()) } /// Helper to simplify instantiation of the dataframe in [`Self::data`]. @@ -1283,8 +1287,8 @@ impl ScanDatasetManifestResponse { pub const SIZE_BYTES: &str = "rerun_size_bytes"; pub const SCHEMA_SHA256: &str = "rerun_schema_sha256"; - pub fn schema() -> Schema { - Schema::new(vec![ + pub fn fields() -> Vec { + vec![ Field::new(Self::LAYER_NAME, DataType::Utf8, false), Field::new(Self::PARTITION_ID, DataType::Utf8, false), Field::new(Self::STORAGE_URL, DataType::Utf8, false), @@ -1297,7 +1301,11 @@ impl ScanDatasetManifestResponse { Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), Field::new(Self::SIZE_BYTES, DataType::UInt64, false), Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false), - ]) + ] + } + + pub fn schema() -> Schema { + Schema::new(Self::fields()) } /// Helper to simplify instantiation of the dataframe in [`Self::data`]. From 87785010a4a93cde8f60927004ed8ad57bd11e80 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 10:16:09 +0200 Subject: [PATCH 13/15] Add explicit `xxx_inner_field()` methods --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 21da2e54b4f4..13d9c77ba675 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use arrow::array::{ FixedSizeBinaryArray, ListBuilder, RecordBatchOptions, StringBuilder, UInt64Array, }; +use arrow::datatypes::FieldRef; use arrow::{ array::{Array, ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Field, Schema, TimeUnit}, @@ -1187,21 +1188,25 @@ impl ScanPartitionTableResponse { pub const NUM_CHUNKS: &str = "rerun_num_chunks"; pub const SIZE_BYTES: &str = "rerun_size_bytes"; + pub fn layers_inner_field() -> FieldRef { + Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false)) + } + + pub fn storage_urls_inner_field() -> FieldRef { + Arc::new(Field::new(Self::STORAGE_URLS, DataType::Utf8, false)) + } + pub fn fields() -> Vec { vec![ Field::new(Self::PARTITION_ID, DataType::Utf8, false), Field::new( Self::LAYERS, - DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))), + DataType::List(Self::layers_inner_field()), false, ), Field::new( Self::STORAGE_URLS, - DataType::List(Arc::new(Field::new( - Self::STORAGE_URLS, - DataType::Utf8, - false, - ))), + DataType::List(Self::storage_urls_inner_field()), false, ), Field::new( @@ -1230,8 +1235,8 @@ impl ScanPartitionTableResponse { let row_count = partition_ids.len(); let schema = Arc::new(Self::schema()); - let mut layers_builder = ListBuilder::new(StringBuilder::new()) - .with_field(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))); + let mut layers_builder = + ListBuilder::new(StringBuilder::new()).with_field(Self::layers_inner_field()); for mut inner_vec in layers { for layer_name in inner_vec.drain(..) { @@ -1240,9 +1245,8 @@ impl ScanPartitionTableResponse { layers_builder.append(true); } - let mut urls_builder = ListBuilder::new(StringBuilder::new()).with_field(Arc::new( - Field::new(Self::STORAGE_URLS, DataType::Utf8, false), - )); + let mut urls_builder = + ListBuilder::new(StringBuilder::new()).with_field(Self::storage_urls_inner_field()); for mut inner_vec in storage_urls { for layer_name in inner_vec.drain(..) { From 3e5baa0e84ba637a2014e3f2348e72d1558ebb37 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 10:32:14 +0200 Subject: [PATCH 14/15] Remove utterly deprecated constants --- .../re_datafusion/src/dataframe_query_common.rs | 15 +++++++++------ .../re_datafusion/src/dataframe_query_provider.rs | 15 +++++++++------ crates/store/re_protos/src/lib.rs | 9 --------- crates/viewer/re_redap_browser/src/servers.rs | 9 ++++----- 4 files changed, 22 insertions(+), 26 deletions(-) diff --git a/crates/store/re_datafusion/src/dataframe_query_common.rs b/crates/store/re_datafusion/src/dataframe_query_common.rs index ff6d570276e3..92ad27f6b55b 100644 --- a/crates/store/re_datafusion/src/dataframe_query_common.rs +++ b/crates/store/re_datafusion/src/dataframe_query_common.rs @@ -23,11 +23,14 @@ use re_dataframe::external::re_chunk_store::ChunkStore; use re_dataframe::{Index, QueryExpression}; use re_log_encoding::codec::wire::decoder::Decode as _; use re_log_types::EntryId; -use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME; -use re_protos::cloud::v1alpha1::ext::{Query, QueryLatestAt, QueryRange}; -use re_protos::cloud::v1alpha1::{GetDatasetSchemaRequest, QueryDatasetRequest}; -use re_protos::common::v1alpha1::ext::ScanParameters; -use re_protos::headers::RerunHeadersInjectorExt as _; +use re_protos::{ + cloud::v1alpha1::{ + GetDatasetSchemaRequest, QueryDatasetRequest, ScanPartitionTableResponse, + ext::{Query, QueryLatestAt, QueryRange}, + }, + common::v1alpha1::ext::ScanParameters, + headers::RerunHeadersInjectorExt as _, +}; use re_redap_client::{ConnectionClient, ConnectionRegistryHandle}; use re_sorbet::{BatchType, ChunkColumnDescriptors, ColumnKind, ComponentColumnSelector}; use re_uri::Origin; @@ -168,7 +171,7 @@ impl DataframeQueryTableProvider { let schema = Arc::new(prepend_string_column_schema( &schema, - DATASET_MANIFEST_ID_FIELD_NAME, + ScanPartitionTableResponse::PARTITION_ID, )); Ok(Self { diff --git a/crates/store/re_datafusion/src/dataframe_query_provider.rs b/crates/store/re_datafusion/src/dataframe_query_provider.rs index f10ee14e2ed7..4dddcc9b6a08 100644 --- a/crates/store/re_datafusion/src/dataframe_query_provider.rs +++ b/crates/store/re_datafusion/src/dataframe_query_provider.rs @@ -33,7 +33,7 @@ use re_dataframe::{ ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine, }; use re_log_types::{ApplicationId, StoreId, StoreKind}; -use re_protos::cloud::v1alpha1::{DATASET_MANIFEST_ID_FIELD_NAME, FetchChunksRequest}; +use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse}; use re_redap_client::ConnectionClient; use re_sorbet::{ColumnDescriptor, ColumnSelector}; @@ -201,10 +201,10 @@ impl PartitionStreamExec { let orderings = if projected_schema .fields() .iter() - .any(|f| f.name().as_str() == DATASET_MANIFEST_ID_FIELD_NAME) + .any(|f| f.name().as_str() == ScanPartitionTableResponse::PARTITION_ID) { - let partition_col = - Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0)) as Arc; + let partition_col = Arc::new(Column::new(ScanPartitionTableResponse::PARTITION_ID, 0)) + as Arc; let order_col = sort_index .and_then(|index| { let index_name = index.as_str(); @@ -242,7 +242,10 @@ impl PartitionStreamExec { let output_partitioning = if partition_in_output_schema { Partitioning::Hash( - vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))], + vec![Arc::new(Column::new( + ScanPartitionTableResponse::PARTITION_ID, + 0, + ))], num_partitions, ) } else { @@ -303,7 +306,7 @@ async fn send_next_row( let batch_schema = Arc::new(prepend_string_column_schema( &query_schema, - DATASET_MANIFEST_ID_FIELD_NAME, + ScanPartitionTableResponse::PARTITION_ID, )); let batch = RecordBatch::try_new_with_options( diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index eb8c629dab7e..41ce371ff956 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -69,15 +69,6 @@ pub mod cloud { pub mod ext { pub use crate::v1alpha1::rerun_cloud_v1alpha1_ext::*; } - - /// `DatasetManifest` mandatory field names. All mandatory metadata fields are prefixed - /// with "rerun_" to avoid conflicts with user-defined fields. - pub const DATASET_MANIFEST_ID_FIELD_NAME: &str = "rerun_partition_id"; - pub const DATASET_MANIFEST_PARTITION_MANIFEST_UPDATED_AT_FIELD_NAME: &str = "rerun_partition_manifest_updated_at"; - pub const DATASET_MANIFEST_RECORDING_TYPE_FIELD_NAME: &str = "rerun_partition_type"; - pub const DATASET_MANIFEST_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time"; - pub const DATASET_MANIFEST_START_TIME_FIELD_NAME: &str = "rerun_start_time"; - pub const DATASET_MANIFEST_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url"; } } diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 3ac6514a6dcb..e6d36198ffbd 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -9,8 +9,7 @@ use egui::{Frame, Margin, RichText}; use re_auth::Jwt; use re_dataframe_ui::{ColumnBlueprint, default_display_name_for_column}; use re_log_types::{EntityPathPart, EntryId}; -use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME; -use re_protos::cloud::v1alpha1::EntryKind; +use re_protos::cloud::v1alpha1::{EntryKind, ScanPartitionTableResponse}; use re_redap_client::ConnectionRegistryHandle; use re_sorbet::{BatchType, ColumnDescriptorRef}; use re_ui::alert::Alert; @@ -232,12 +231,12 @@ impl Server { } else { matches!( desc.display_name().as_str(), - RECORDING_LINK_COLUMN_NAME | DATASET_MANIFEST_ID_FIELD_NAME + RECORDING_LINK_COLUMN_NAME | ScanPartitionTableResponse::PARTITION_ID ) }; let column_sort_key = match desc.display_name().as_str() { - DATASET_MANIFEST_ID_FIELD_NAME => 0, + ScanPartitionTableResponse::PARTITION_ID => 0, RECORDING_LINK_COLUMN_NAME => 1, _ => 2, }; @@ -255,7 +254,7 @@ impl Server { }) .generate_partition_links( RECORDING_LINK_COLUMN_NAME, - DATASET_MANIFEST_ID_FIELD_NAME, + ScanPartitionTableResponse::PARTITION_ID, self.origin.clone(), dataset.id(), ) From ff20d4eb5e407a7f0d09cbf7ed5bba450b32196a Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 7 Oct 2025 11:55:18 +0200 Subject: [PATCH 15/15] More docstring and rename to `LAYER_NAMES` --- .../src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | 48 ++++++++++++++----- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 13d9c77ba675..23647997eacd 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1182,14 +1182,26 @@ pub struct RegisterWithDatasetTaskDescriptor { impl ScanPartitionTableResponse { pub const PARTITION_ID: &str = "rerun_partition_id"; - pub const LAYERS: &str = "rerun_layers"; + + /// Layer names for this partition, one per layer. + /// + /// Should have the same length as [`Self::STORAGE_URLS`]. + pub const LAYER_NAMES: &str = "rerun_layer_names"; + + /// Storage URLs for this partition, one per layer. + /// + /// Should have the same length as [`Self::LAYER_NAMES`]. pub const STORAGE_URLS: &str = "rerun_storage_urls"; pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; + + /// Total number of chunks for this partition. pub const NUM_CHUNKS: &str = "rerun_num_chunks"; + + /// Total size in bytes for this partition. pub const SIZE_BYTES: &str = "rerun_size_bytes"; - pub fn layers_inner_field() -> FieldRef { - Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false)) + pub fn layer_names_inner_field() -> FieldRef { + Arc::new(Field::new(Self::LAYER_NAMES, DataType::Utf8, false)) } pub fn storage_urls_inner_field() -> FieldRef { @@ -1200,8 +1212,8 @@ impl ScanPartitionTableResponse { vec![ Field::new(Self::PARTITION_ID, DataType::Utf8, false), Field::new( - Self::LAYERS, - DataType::List(Self::layers_inner_field()), + Self::LAYER_NAMES, + DataType::List(Self::layer_names_inner_field()), false, ), Field::new( @@ -1226,7 +1238,7 @@ impl ScanPartitionTableResponse { /// Helper to simplify instantiation of the dataframe in [`Self::data`]. pub fn create_dataframe( partition_ids: Vec, - layers: Vec>, + layer_names: Vec>, storage_urls: Vec>, last_updated_at: Vec, num_chunks: Vec, @@ -1235,14 +1247,14 @@ impl ScanPartitionTableResponse { let row_count = partition_ids.len(); let schema = Arc::new(Self::schema()); - let mut layers_builder = - ListBuilder::new(StringBuilder::new()).with_field(Self::layers_inner_field()); + let mut layer_names_builder = + ListBuilder::new(StringBuilder::new()).with_field(Self::layer_names_inner_field()); - for mut inner_vec in layers { + for mut inner_vec in layer_names { for layer_name in inner_vec.drain(..) { - layers_builder.values().append_value(layer_name) + layer_names_builder.values().append_value(layer_name) } - layers_builder.append(true); + layer_names_builder.append(true); } let mut urls_builder = @@ -1257,7 +1269,7 @@ impl ScanPartitionTableResponse { let columns: Vec = vec![ Arc::new(StringArray::from(partition_ids)), - Arc::new(layers_builder.finish()), + Arc::new(layer_names_builder.finish()), Arc::new(urls_builder.finish()), Arc::new(TimestampNanosecondArray::from(last_updated_at)), Arc::new(UInt64Array::from(num_chunks)), @@ -1286,7 +1298,12 @@ impl ScanDatasetManifestResponse { pub const PARTITION_ID: &str = "rerun_partition_id"; pub const STORAGE_URL: &str = "rerun_storage_url"; pub const LAYER_TYPE: &str = "rerun_layer_type"; + + /// Time at which the layer was initially registered. pub const REGISTRATION_TIME: &str = "rerun_registration_time"; + + /// When was this row of the manifest modified last? + pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; pub const NUM_CHUNKS: &str = "rerun_num_chunks"; pub const SIZE_BYTES: &str = "rerun_size_bytes"; pub const SCHEMA_SHA256: &str = "rerun_schema_sha256"; @@ -1302,6 +1319,11 @@ impl ScanDatasetManifestResponse { DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), + Field::new( + Self::LAST_UPDATED_AT, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), Field::new(Self::SIZE_BYTES, DataType::UInt64, false), Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false), @@ -1319,6 +1341,7 @@ impl ScanDatasetManifestResponse { storage_urls: Vec, layer_types: Vec, registration_times: Vec, + last_updated_at_times: Vec, num_chunks: Vec, size_bytes: Vec, schema_sha256s: Vec<[u8; 32]>, @@ -1332,6 +1355,7 @@ impl ScanDatasetManifestResponse { Arc::new(StringArray::from(storage_urls)), Arc::new(StringArray::from(layer_types)), Arc::new(TimestampNanosecondArray::from(registration_times)), + Arc::new(TimestampNanosecondArray::from(last_updated_at_times)), Arc::new(UInt64Array::from(num_chunks)), Arc::new(UInt64Array::from(size_bytes)), Arc::new(