diff --git a/crates/store/re_datafusion/src/layer_table.rs b/crates/store/re_datafusion/src/layer_table.rs new file mode 100644 index 000000000000..9d0d0a82faaa --- /dev/null +++ b/crates/store/re_datafusion/src/layer_table.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + catalog::TableProvider, + error::{DataFusionError, Result as DataFusionResult}, +}; +use tracing::instrument; + +use re_log_encoding::codec::wire::decoder::Decode as _; +use re_log_types::EntryId; +use re_protos::{ + cloud::v1alpha1::{ScanLayerTableRequest, ScanLayerTableResponse}, + headers::RerunHeadersInjectorExt as _, +}; +use re_redap_client::ConnectionClient; + +use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable}; +use crate::wasm_compat::make_future_send; + +//TODO(ab): deduplicate from PartitionTableProvider +#[derive(Clone)] +pub struct LayerTableProvider { + client: ConnectionClient, + dataset_id: EntryId, +} + +impl std::fmt::Debug for LayerTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LayerTableProvider") + .field("dataset_id", &self.dataset_id) + .finish() + } +} + +impl LayerTableProvider { + pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self { + Self { client, dataset_id } + } + + /// This is a convenience function + pub async fn into_provider(self) -> DataFusionResult> { + Ok(GrpcStreamProvider::prepare(self).await?) + } +} + +#[async_trait] +impl GrpcStreamToTable for LayerTableProvider { + type GrpcStreamData = ScanLayerTableResponse; + + #[instrument(skip(self), err)] + async fn fetch_schema(&mut self) -> DataFusionResult { + let mut client = self.client.clone(); + + let dataset_id = self.dataset_id; + + Ok(Arc::new( + make_future_send(async move { + client + .get_layer_table_schema(dataset_id) + .await + .map_err(|err| { + DataFusionError::External( + format!("Couldn't get partition table schema: {err}").into(), + ) + }) + }) + .await?, + )) + } + + // TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by + // `ConnectionClient` + #[instrument(skip(self), err)] + async fn send_streaming_request( + &mut self, + ) -> DataFusionResult>> { + let request = tonic::Request::new(ScanLayerTableRequest { + columns: vec![], // all of them + }) + .with_entry_id(self.dataset_id) + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + let mut client = self.client.clone(); + + make_future_send(async move { Ok(client.inner().scan_layer_table(request).await) }) + .await? + .map_err(|err| DataFusionError::External(Box::new(err))) + } + + fn process_response( + &mut self, + response: Self::GrpcStreamData, + ) -> DataFusionResult { + response + .data + .ok_or(DataFusionError::Execution( + "DataFrame missing from PartitionList response".to_owned(), + ))? + .decode() + .map_err(|err| DataFusionError::External(Box::new(err))) + } +} diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 3f50baef6d9a..7f617cd147c6 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -7,6 +7,7 @@ mod dataframe_query_provider; #[cfg(target_arch = "wasm32")] mod dataframe_query_provider_wasm; mod grpc_streaming_provider; +mod layer_table; mod partition_table; mod search_provider; mod table_entry_provider; @@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e pub(crate) use dataframe_query_provider::PartitionStreamExec; #[cfg(target_arch = "wasm32")] pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec; +pub use layer_table::LayerTableProvider; pub use partition_table::PartitionTableProvider; pub use search_provider::SearchResultsTableProvider; pub use table_entry_provider::TableEntryTableProvider; diff --git a/crates/store/re_datafusion/src/partition_table.rs b/crates/store/re_datafusion/src/partition_table.rs index d2401d1e3a8f..d7a26a05370b 100644 --- a/crates/store/re_datafusion/src/partition_table.rs +++ b/crates/store/re_datafusion/src/partition_table.rs @@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient; use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable}; use crate::wasm_compat::make_future_send; +//TODO(ab): deduplicate from LayerTableProvider #[derive(Clone)] pub struct PartitionTableProvider { client: ConnectionClient, diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto index 763f61b9163d..c64617e3fc40 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto @@ -77,6 +77,21 @@ service RerunCloudService { // This endpoint requires the standard dataset headers. rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {} + // Returns the schema of the layer table. + // + // To inspect the data of the partition table, which is guaranteed to match the schema returned by + // this endpoint, check out `ScanLayerTable`. + // + // This endpoint requires the standard dataset headers. + rpc GetLayerTableSchema(GetLayerTableSchemaRequest) returns (GetLayerTableSchemaResponse) {} + + // Inspect the contents of the layer table. + // + // The data will follow the schema returned by `GetLayerTableSchema`. + // + // This endpoint requires the standard dataset headers. + rpc ScanLayerTable(ScanLayerTableRequest) returns (stream ScanLayerTableResponse) {} + // Returns the schema of the dataset. // // This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -271,6 +286,29 @@ message ScanPartitionTableResponse { rerun.common.v1alpha1.DataframePart data = 1; } +message GetLayerTableSchemaRequest { + reserved "dataset_id"; +} + +message GetLayerTableSchemaResponse { + rerun.common.v1alpha1.Schema schema = 1; +} + +message ScanLayerTableRequest { + // A list of column names to be projected server-side. + // + // All of them if left empty. + repeated string columns = 3; + + reserved "dataset_id"; + reserved "scan_parameters"; +} + +message ScanLayerTableResponse { + // Layer metadata as arrow RecordBatch + rerun.common.v1alpha1.DataframePart data = 1; +} + message GetDatasetSchemaRequest { reserved 1; reserved "dataset_id"; diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs index 1068c1a1470f..c2fb672aba13 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use arrow::array::RecordBatchOptions; +use arrow::array::{ + FixedSizeBinaryArray, ListBuilder, RecordBatchOptions, StringBuilder, UInt64Array, +}; use arrow::{ array::{Array, ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Field, Schema, TimeUnit}, @@ -11,10 +13,9 @@ use re_chunk::TimelineName; use re_log_types::{EntityPath, EntryId, TimeInt}; use re_sorbet::ComponentColumnDescriptor; -use crate::cloud::v1alpha1::{EntryKind, QueryTasksResponse}; use crate::cloud::v1alpha1::{ - GetDatasetSchemaResponse, RegisterWithDatasetResponse, ScanPartitionTableResponse, - VectorDistanceMetric, + EntryKind, GetDatasetSchemaResponse, QueryTasksResponse, RegisterWithDatasetResponse, + ScanLayerTableResponse, ScanPartitionTableResponse, VectorDistanceMetric, }; use crate::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId}; use crate::common::v1alpha1::{ComponentDescriptor, DataframePart, TaskId}; @@ -1178,51 +1179,150 @@ pub struct RegisterWithDatasetTaskDescriptor { impl ScanPartitionTableResponse { pub const PARTITION_ID: &str = "rerun_partition_id"; - pub const PARTITION_TYPE: &str = "rerun_partition_type"; + pub const LAYERS: &str = "rerun_layers"; + pub const STORAGE_URLS: &str = "rerun_storage_urls"; + pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; + pub const NUM_CHUNKS: &str = "rerun_num_chunks"; + pub const SIZE_BYTES: &str = "rerun_size_bytes"; + + pub fn schema() -> Schema { + Schema::new(vec![ + Field::new(Self::PARTITION_ID, DataType::Utf8, false), + Field::new( + Self::LAYERS, + DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))), + false, + ), + Field::new( + Self::STORAGE_URLS, + DataType::List(Arc::new(Field::new( + Self::STORAGE_URLS, + DataType::Utf8, + false, + ))), + false, + ), + Field::new( + Self::LAST_UPDATED_AT, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), + Field::new(Self::SIZE_BYTES, DataType::UInt64, false), + ]) + } + + /// Helper to simplify instantiation of the dataframe in [`Self::data`]. + pub fn create_dataframe( + partition_ids: Vec, + layers: Vec>, + storage_urls: Vec>, + last_updated_at: Vec, + num_chunks: Vec, + size_bytes: Vec, + ) -> arrow::error::Result { + let row_count = partition_ids.len(); + let schema = Arc::new(Self::schema()); + + let mut layers_builder = ListBuilder::new(StringBuilder::new()) + .with_field(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))); + + for mut inner_vec in layers { + for layer_name in inner_vec.drain(..) { + layers_builder.values().append_value(layer_name) + } + layers_builder.append(true); + } + + let mut urls_builder = ListBuilder::new(StringBuilder::new()).with_field(Arc::new( + Field::new(Self::STORAGE_URLS, DataType::Utf8, false), + )); + + for mut inner_vec in storage_urls { + for layer_name in inner_vec.drain(..) { + urls_builder.values().append_value(layer_name) + } + urls_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(StringArray::from(partition_ids)), + Arc::new(layers_builder.finish()), + Arc::new(urls_builder.finish()), + Arc::new(TimestampNanosecondArray::from(last_updated_at)), + Arc::new(UInt64Array::from(num_chunks)), + Arc::new(UInt64Array::from(size_bytes)), + ]; + + RecordBatch::try_new_with_options( + schema, + columns, + &RecordBatchOptions::default().with_row_count(Some(row_count)), + ) + } + + pub fn data(&self) -> Result<&DataframePart, TypeConversionError> { + Ok(self.data.as_ref().ok_or_else(|| { + missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data") + })?) + } +} + +// --- ScanLayerTableResponse -- + +impl ScanLayerTableResponse { + pub const LAYER_NAME: &str = "rerun_layer_name"; + pub const PARTITION_ID: &str = "rerun_partition_id"; pub const STORAGE_URL: &str = "rerun_storage_url"; + pub const LAYER_TYPE: &str = "rerun_layer_type"; pub const REGISTRATION_TIME: &str = "rerun_registration_time"; - pub const PARTITION_MANIFEST_UPDATED_AT: &str = "rerun_partition_manifest_updated_at"; - pub const PARTITION_MANIFEST_URL: &str = "rerun_partition_manifest_url"; + pub const NUM_CHUNKS: &str = "rerun_num_chunks"; + pub const SIZE_BYTES: &str = "rerun_size_bytes"; + pub const SCHEMA_SHA256: &str = "rerun_schema_sha256"; pub fn schema() -> Schema { Schema::new(vec![ + Field::new(Self::LAYER_NAME, DataType::Utf8, false), Field::new(Self::PARTITION_ID, DataType::Utf8, false), - Field::new(Self::PARTITION_TYPE, DataType::Utf8, false), Field::new(Self::STORAGE_URL, DataType::Utf8, false), + Field::new(Self::LAYER_TYPE, DataType::Utf8, false), Field::new( Self::REGISTRATION_TIME, DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), - Field::new( - Self::PARTITION_MANIFEST_UPDATED_AT, - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - Field::new(Self::PARTITION_MANIFEST_URL, DataType::Utf8, true), + Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), + Field::new(Self::SIZE_BYTES, DataType::UInt64, false), + Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false), ]) } /// Helper to simplify instantiation of the dataframe in [`Self::data`]. pub fn create_dataframe( + layer_names: Vec, partition_ids: Vec, - partition_types: Vec, storage_urls: Vec, + layer_types: Vec, registration_times: Vec, - partition_manifest_updated_ats: Vec>, - partition_manifest_urls: Vec>, + num_chunks: Vec, + size_bytes: Vec, + schema_sha256s: Vec<[u8; 32]>, ) -> arrow::error::Result { let row_count = partition_ids.len(); let schema = Arc::new(Self::schema()); + let columns: Vec = vec![ + Arc::new(StringArray::from(layer_names)), Arc::new(StringArray::from(partition_ids)), - Arc::new(StringArray::from(partition_types)), Arc::new(StringArray::from(storage_urls)), + Arc::new(StringArray::from(layer_types)), Arc::new(TimestampNanosecondArray::from(registration_times)), - Arc::new(TimestampNanosecondArray::from( - partition_manifest_updated_ats, - )), - Arc::new(StringArray::from(partition_manifest_urls)), + Arc::new(UInt64Array::from(num_chunks)), + Arc::new(UInt64Array::from(size_bytes)), + Arc::new( + FixedSizeBinaryArray::try_from_iter(schema_sha256s.into_iter()) + .expect("sizes of nested slices are guaranteed to match"), + ), ]; RecordBatch::try_new_with_options( diff --git a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs index 15c2c38d0557..6dbf278f29d3 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs @@ -197,6 +197,67 @@ impl ::prost::Name for ScanPartitionTableResponse { } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetLayerTableSchemaRequest {} +impl ::prost::Name for GetLayerTableSchemaRequest { + const NAME: &'static str = "GetLayerTableSchemaRequest"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.GetLayerTableSchemaRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetLayerTableSchemaResponse { + #[prost(message, optional, tag = "1")] + pub schema: ::core::option::Option, +} +impl ::prost::Name for GetLayerTableSchemaResponse { + const NAME: &'static str = "GetLayerTableSchemaResponse"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.GetLayerTableSchemaResponse".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScanLayerTableRequest { + /// A list of column names to be projected server-side. + /// + /// All of them if left empty. + #[prost(string, repeated, tag = "3")] + pub columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +impl ::prost::Name for ScanLayerTableRequest { + const NAME: &'static str = "ScanLayerTableRequest"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.ScanLayerTableRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScanLayerTableResponse { + /// Layer metadata as arrow RecordBatch + #[prost(message, optional, tag = "1")] + pub data: ::core::option::Option, +} +impl ::prost::Name for ScanLayerTableResponse { + const NAME: &'static str = "ScanLayerTableResponse"; + const PACKAGE: &'static str = "rerun.cloud.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.cloud.v1alpha1.ScanLayerTableResponse".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct GetDatasetSchemaRequest {} impl ::prost::Name for GetDatasetSchemaRequest { const NAME: &'static str = "GetDatasetSchemaRequest"; @@ -2031,6 +2092,57 @@ pub mod rerun_cloud_service_client { )); self.inner.server_streaming(req, path, codec).await } + /// Returns the schema of the layer table. + /// + /// To inspect the data of the partition table, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanLayerTable`. + /// + /// This endpoint requires the standard dataset headers. + pub async fn get_layer_table_schema( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.cloud.v1alpha1.RerunCloudService", + "GetLayerTableSchema", + )); + self.inner.unary(req, path, codec).await + } + /// Inspect the contents of the layer table. + /// + /// The data will follow the schema returned by `GetLayerTableSchema`. + /// + /// This endpoint requires the standard dataset headers. + pub async fn scan_layer_table( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.cloud.v1alpha1.RerunCloudService", + "ScanLayerTable", + )); + self.inner.server_streaming(req, path, codec).await + } /// Returns the schema of the dataset. /// /// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -2456,6 +2568,30 @@ pub mod rerun_cloud_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Returns the schema of the layer table. + /// + /// To inspect the data of the partition table, which is guaranteed to match the schema returned by + /// this endpoint, check out `ScanLayerTable`. + /// + /// This endpoint requires the standard dataset headers. + async fn get_layer_table_schema( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ScanLayerTable method. + type ScanLayerTableStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + /// Inspect the contents of the layer table. + /// + /// The data will follow the schema returned by `GetLayerTableSchema`. + /// + /// This endpoint requires the standard dataset headers. + async fn scan_layer_table( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Returns the schema of the dataset. /// /// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes, @@ -3190,6 +3326,93 @@ pub mod rerun_cloud_service_server { }; Box::pin(fut) } + "/rerun.cloud.v1alpha1.RerunCloudService/GetLayerTableSchema" => { + #[allow(non_camel_case_types)] + struct GetLayerTableSchemaSvc(pub Arc); + impl + tonic::server::UnaryService + for GetLayerTableSchemaSvc + { + type Response = super::GetLayerTableSchemaResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_layer_table_schema(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetLayerTableSchemaSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.cloud.v1alpha1.RerunCloudService/ScanLayerTable" => { + #[allow(non_camel_case_types)] + struct ScanLayerTableSvc(pub Arc); + impl + tonic::server::ServerStreamingService + for ScanLayerTableSvc + { + type Response = super::ScanLayerTableResponse; + type ResponseStream = T::ScanLayerTableStream; + type Future = + BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::scan_layer_table(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ScanLayerTableSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rerun.cloud.v1alpha1.RerunCloudService/GetDatasetSchema" => { #[allow(non_camel_case_types)] struct GetDatasetSchemaSvc(pub Arc); diff --git a/crates/store/re_redap_client/src/connection_client.rs b/crates/store/re_redap_client/src/connection_client.rs index 20dad6d0976f..8e84c95b221e 100644 --- a/crates/store/re_redap_client/src/connection_client.rs +++ b/crates/store/re_redap_client/src/connection_client.rs @@ -9,9 +9,9 @@ use re_protos::{ TypeConversionError, cloud::v1alpha1::{ CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind, FindEntriesRequest, - GetPartitionTableSchemaRequest, GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, - ReadTableEntryRequest, RegisterWithDatasetResponse, ScanPartitionTableRequest, - ScanPartitionTableResponse, + GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, GetPartitionTableSchemaRequest, + GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, ReadTableEntryRequest, + RegisterWithDatasetResponse, ScanPartitionTableRequest, ScanPartitionTableResponse, ext::{ CreateDatasetEntryResponse, DataSource, DataSourceKind, DatasetDetails, DatasetEntry, EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails as _, @@ -269,6 +269,26 @@ where Ok(partition_ids) } + //TODO(ab): accept entry name + pub async fn get_layer_table_schema( + &mut self, + entry_id: EntryId, + ) -> Result { + Ok(self + .inner() + .get_layer_table_schema( + tonic::Request::new(GetLayerTableSchemaRequest {}) + .with_entry_id(entry_id) + .map_err(|err| StreamEntryError::InvalidId(err.into()))?, + ) + .await + .map_err(|err| StreamEntryError::GetLayerTableSchema(err.into()))? + .into_inner() + .schema + .ok_or_else(|| missing_field!(GetLayerTableSchemaResponse, "schema"))? + .try_into()?) + } + /// Initiate registration of the provided recording URIs with a dataset and return the /// corresponding task descriptors. /// diff --git a/crates/store/re_redap_client/src/lib.rs b/crates/store/re_redap_client/src/lib.rs index 528ef55f76bf..94ab6cbf4422 100644 --- a/crates/store/re_redap_client/src/lib.rs +++ b/crates/store/re_redap_client/src/lib.rs @@ -96,6 +96,9 @@ pub enum StreamEntryError { #[error("Failed reading partition table scheme\nDetails:{0}")] GetPartitionTableSchema(TonicStatusError), + #[error("Failed reading layer table scheme\nDetails:{0}")] + GetLayerTableSchema(TonicStatusError), + #[error("Failed scanning the partition table \nDetails:{0}")] ScanPartitionTable(TonicStatusError), diff --git a/crates/store/re_server/src/rerun_cloud.rs b/crates/store/re_server/src/rerun_cloud.rs index 3ac52a1bcbc7..7b94927c2759 100644 --- a/crates/store/re_server/src/rerun_cloud.rs +++ b/crates/store/re_server/src/rerun_cloud.rs @@ -10,12 +10,15 @@ use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::prelude::SessionContext; use nohash_hasher::IntSet; use tokio_stream::StreamExt as _; -use tonic::{Code, Status}; +use tonic::{Code, Request, Response, Status}; use re_byte_size::SizeBytes as _; use re_chunk_store::{Chunk, ChunkStore, ChunkStoreConfig, ChunkStoreHandle}; use re_log_encoding::codec::wire::{decoder::Decode as _, encoder::Encode as _}; use re_log_types::{EntityPath, EntryId, StoreId, StoreKind}; +use re_protos::cloud::v1alpha1::{ + GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, ScanLayerTableRequest, +}; use re_protos::{ cloud::v1alpha1::{ DeleteEntryResponse, EntryDetails, EntryKind, FetchTaskOutputRequest, @@ -166,6 +169,7 @@ decl_stream!(FetchChunksResponseStream); decl_stream!(GetChunksResponseStream); decl_stream!(QueryDatasetResponseStream); decl_stream!(ScanPartitionTableResponseStream); +decl_stream!(ScanLayerTableResponseStream); decl_stream!(SearchDatasetResponseStream); decl_stream!(ScanTableResponseStream); decl_stream!(QueryTasksOnCompletionResponseStream); @@ -667,6 +671,28 @@ impl RerunCloudService for RerunCloudHandler { )) } + type ScanLayerTableStream = ScanLayerTableResponseStream; + + async fn get_layer_table_schema( + &self, + _request: Request, + ) -> Result, Status> { + //TODO(RR-2482) + Err(tonic::Status::unimplemented( + "get_layer_table_schema not implemented", + )) + } + + async fn scan_layer_table( + &self, + _request: Request, + ) -> Result, Status> { + //TODO(RR-2482) + Err(tonic::Status::unimplemented( + "scan_layer_table not implemented", + )) + } + async fn get_dataset_schema( &self, request: tonic::Request, diff --git a/crates/store/re_server/src/store.rs b/crates/store/re_server/src/store.rs index a104f1c0583c..a8bafe4e0483 100644 --- a/crates/store/re_server/src/store.rs +++ b/crates/store/re_server/src/store.rs @@ -16,6 +16,7 @@ use itertools::Itertools as _; use jiff::Timestamp; use lance::datafusion::LanceTableProvider; +use re_byte_size::SizeBytes as _; use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle}; use re_log_types::{EntryId, StoreKind}; use re_protos::{ @@ -133,34 +134,40 @@ impl Dataset { } pub fn partition_table(&self) -> arrow::error::Result { - let (partition_ids, registration_times): (Vec<_>, Vec<_>) = self - .partitions - .iter() - .map(|(store_id, partition)| { - ( - store_id.to_string(), - partition.registration_time.as_nanosecond() as i64, - ) - }) - .unzip(); + let (partition_ids, last_updated_at, num_chunks, size_bytes): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = itertools::multiunzip(self.partitions.iter().map(|(store_id, partition)| { + let store = partition.store_handle.read(); + let size_bytes: u64 = store + .iter_chunks() + .map(|chunk| chunk.heap_size_bytes()) + .sum(); - let partition_types = vec!["rrd".to_owned(); partition_ids.len()]; + ( + store_id.to_string(), + partition.registration_time.as_nanosecond() as i64, + store.num_chunks() as u64, + size_bytes, + ) + })); + + let layers = vec![vec!["base".to_owned()]; partition_ids.len()]; let storage_urls = partition_ids .iter() - .map(|partition_id| format!("memory:///{}/{partition_id}", self.id)) + .map(|partition_id| vec![format!("memory:///{}/{partition_id}", self.id)]) .collect(); - let partition_manifest_updated_ats = vec![None; partition_ids.len()]; - let partition_manifest_urls = vec![None; partition_ids.len()]; - ScanPartitionTableResponse::create_dataframe( partition_ids, - partition_types, + layers, storage_urls, - registration_times, - partition_manifest_updated_ats, - partition_manifest_urls, + last_updated_at, + num_chunks, + size_bytes, ) } diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index efd49408b802..a1275d0f291b 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1345,6 +1345,9 @@ class DatasetEntry(Entry): def partition_table(self) -> DataFusionTable: """Return the partition table as a Datafusion table provider.""" + def layer_table(self) -> DataFusionTable: + """Return the layer table as a Datafusion table provider.""" + def partition_url( self, partition_id: str, diff --git a/rerun_py/src/catalog/dataset_entry.rs b/rerun_py/src/catalog/dataset_entry.rs index 602488bf5563..d2b229f6cec8 100644 --- a/rerun_py/src/catalog/dataset_entry.rs +++ b/rerun_py/src/catalog/dataset_entry.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt as _; use tracing::instrument; use re_chunk_store::{ChunkStore, ChunkStoreHandle}; -use re_datafusion::{PartitionTableProvider, SearchResultsTableProvider}; +use re_datafusion::{LayerTableProvider, PartitionTableProvider, SearchResultsTableProvider}; use re_log_encoding::codec::wire::encoder::Encode as _; use re_log_types::{StoreId, StoreKind}; use re_protos::cloud::v1alpha1::ext::DatasetDetails; @@ -165,6 +165,28 @@ impl PyDatasetEntry { }) } + /// Return the layer table as a Datafusion table provider. + #[instrument(skip_all)] + fn layer_table(self_: PyRef<'_, Self>) -> PyResult { + let super_ = self_.as_super(); + let connection = super_.client.borrow(self_.py()).connection().clone(); + let dataset_id = super_.details.id; + + let provider = wait_for_future(self_.py(), async move { + LayerTableProvider::new(connection.client().await?, dataset_id) + .into_provider() + .await + .map_err(to_py_err) + })?; + + #[expect(clippy::string_add)] + Ok(PyDataFusionTable { + client: super_.client.clone_ref(self_.py()), + name: super_.name() + "_layer_table", + provider, + }) + } + /// Return the URL for the given partition. /// /// Parameters