Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6850986
Add grpc endpoint for layer table and cleanup helper objects
abey79 Oct 3, 2025
1cd8a69
add table provider for layer table
abey79 Oct 6, 2025
1ce0fc5
add `DatasetEntry.layer_table` to Python SDK
abey79 Oct 6, 2025
33f7409
reintroduce storage_urls in partition table
abey79 Oct 6, 2025
abc37b0
Fix schema mismatch
abey79 Oct 6, 2025
feb1b31
Rename everything to "DatasetManifest"
abey79 Oct 7, 2025
b070cbe
Fix name + update proto docstring
abey79 Oct 7, 2025
5d43d14
Apply suggestion from @Copilot
abey79 Oct 7, 2025
6bba8ab
Apply suggestion from @Copilot
abey79 Oct 7, 2025
a82dbb4
Minor fix
abey79 Oct 7, 2025
fcd9b23
Merge branch 'main' into antoine/layer-table
abey79 Oct 7, 2025
648d46a
Minor minor fix
abey79 Oct 7, 2025
af5c3df
Add explicit `fields()` method
abey79 Oct 7, 2025
8778501
Add explicit `xxx_inner_field()` methods
abey79 Oct 7, 2025
3e5baa0
Remove utterly deprecated constants
abey79 Oct 7, 2025
ff20d4e
More docstring and rename to `LAYER_NAMES`
abey79 Oct 7, 2025
4e5f440
add unit test
abey79 Oct 7, 2025
f594a75
update migration guide
abey79 Oct 7, 2025
1ddbab3
fix wasm build
abey79 Oct 7, 2025
d27a747
Merge branch 'main' into antoine/layer-table
abey79 Oct 7, 2025
439b734
lint
abey79 Oct 7, 2025
45dd37c
Update crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
abey79 Oct 8, 2025
b824ded
Update docs/content/reference/migration/migration-0-26.md
abey79 Oct 8, 2025
d6edf6e
Review comments
abey79 Oct 8, 2025
138245d
Merge branch 'main' into antoine/layer-table
abey79 Oct 8, 2025
7df55bf
fix imports
abey79 Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions crates/store/re_datafusion/src/dataframe_query_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ use re_dataframe::external::re_chunk_store::ChunkStore;
use re_dataframe::{Index, QueryExpression};
use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::EntryId;
use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
use re_protos::cloud::v1alpha1::ext::{Query, QueryLatestAt, QueryRange};
use re_protos::cloud::v1alpha1::{GetDatasetSchemaRequest, QueryDatasetRequest};
use re_protos::common::v1alpha1::ext::ScanParameters;
use re_protos::headers::RerunHeadersInjectorExt as _;
use re_protos::{
cloud::v1alpha1::{
GetDatasetSchemaRequest, QueryDatasetRequest, ScanPartitionTableResponse,
ext::{Query, QueryLatestAt, QueryRange},
},
common::v1alpha1::ext::ScanParameters,
headers::RerunHeadersInjectorExt as _,
};
use re_redap_client::{ConnectionClient, ConnectionRegistryHandle};
use re_sorbet::{BatchType, ChunkColumnDescriptors, ColumnKind, ComponentColumnSelector};
use re_uri::Origin;
Expand Down Expand Up @@ -168,7 +171,7 @@ impl DataframeQueryTableProvider {

let schema = Arc::new(prepend_string_column_schema(
&schema,
DATASET_MANIFEST_ID_FIELD_NAME,
ScanPartitionTableResponse::PARTITION_ID,
));

Ok(Self {
Expand Down
15 changes: 9 additions & 6 deletions crates/store/re_datafusion/src/dataframe_query_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use re_dataframe::{
ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine,
};
use re_log_types::{ApplicationId, StoreId, StoreKind};
use re_protos::cloud::v1alpha1::{DATASET_MANIFEST_ID_FIELD_NAME, FetchChunksRequest};
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
use re_redap_client::ConnectionClient;
use re_sorbet::{ColumnDescriptor, ColumnSelector};

Expand Down Expand Up @@ -201,10 +201,10 @@ impl PartitionStreamExec {
let orderings = if projected_schema
.fields()
.iter()
.any(|f| f.name().as_str() == DATASET_MANIFEST_ID_FIELD_NAME)
.any(|f| f.name().as_str() == ScanPartitionTableResponse::PARTITION_ID)
{
let partition_col =
Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0)) as Arc<dyn PhysicalExpr>;
let partition_col = Arc::new(Column::new(ScanPartitionTableResponse::PARTITION_ID, 0))
as Arc<dyn PhysicalExpr>;
let order_col = sort_index
.and_then(|index| {
let index_name = index.as_str();
Expand Down Expand Up @@ -242,7 +242,10 @@ impl PartitionStreamExec {

let output_partitioning = if partition_in_output_schema {
Partitioning::Hash(
vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))],
vec![Arc::new(Column::new(
ScanPartitionTableResponse::PARTITION_ID,
0,
))],
num_partitions,
)
} else {
Expand Down Expand Up @@ -303,7 +306,7 @@ async fn send_next_row(

let batch_schema = Arc::new(prepend_string_column_schema(
&query_schema,
DATASET_MANIFEST_ID_FIELD_NAME,
ScanPartitionTableResponse::PARTITION_ID,
));

let batch = RecordBatch::try_new_with_options(
Expand Down
14 changes: 8 additions & 6 deletions crates/store/re_datafusion/src/dataframe_query_provider_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use re_dataframe::{
};
use re_log_encoding::codec::wire::encoder::Encode as _;
use re_log_types::{StoreId, StoreKind};
use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
use re_protos::cloud::v1alpha1::FetchChunksRequest;
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
use re_redap_client::ConnectionClient;

use crate::dataframe_query_common::{
Expand Down Expand Up @@ -199,8 +198,8 @@ impl PartitionStreamExec {
None => Arc::clone(table_schema),
};

let partition_col =
Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0)) as Arc<dyn PhysicalExpr>;
let partition_col = Arc::new(Column::new(ScanPartitionTableResponse::PARTITION_ID, 0))
as Arc<dyn PhysicalExpr>;
let order_col = sort_index
.and_then(|index| {
let index_name = index.as_str();
Expand Down Expand Up @@ -236,7 +235,10 @@ impl PartitionStreamExec {

let output_partitioning = if partition_in_output_schema {
Partitioning::Hash(
vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))],
vec![Arc::new(Column::new(
ScanPartitionTableResponse::PARTITION_ID,
0,
))],
num_partitions,
)
} else {
Expand Down Expand Up @@ -295,7 +297,7 @@ fn create_next_row(

let batch_schema = Arc::new(prepend_string_column_schema(
&query_schema,
DATASET_MANIFEST_ID_FIELD_NAME,
ScanPartitionTableResponse::PARTITION_ID,
));

let batch = RecordBatch::try_new_with_options(
Expand Down
104 changes: 104 additions & 0 deletions crates/store/re_datafusion/src/dataset_manifest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::sync::Arc;

use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
};
use tracing::instrument;

use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::EntryId;
use re_protos::{
cloud::v1alpha1::{ScanDatasetManifestRequest, ScanDatasetManifestResponse},
headers::RerunHeadersInjectorExt as _,
};
use re_redap_client::ConnectionClient;

use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from PartitionTableProvider
#[derive(Clone)]
pub struct DatasetManifestProvider {
client: ConnectionClient,
dataset_id: EntryId,
}

impl std::fmt::Debug for DatasetManifestProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatasetManifestProvider")
.field("dataset_id", &self.dataset_id)
.finish()
}
}

impl DatasetManifestProvider {
pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
Self { client, dataset_id }
}

/// This is a convenience function
pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
Ok(GrpcStreamProvider::prepare(self).await?)
}
}

#[async_trait]
impl GrpcStreamToTable for DatasetManifestProvider {
type GrpcStreamData = ScanDatasetManifestResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let mut client = self.client.clone();

let dataset_id = self.dataset_id;

Ok(Arc::new(
make_future_send(async move {
client
.get_dataset_manifest_schema(dataset_id)
.await
.map_err(|err| {
DataFusionError::External(
format!("Couldn't get dataset manifest schema: {err}").into(),
)
})
})
.await?,
))
}

// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
// `ConnectionClient`
#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
let request = tonic::Request::new(ScanDatasetManifestRequest {
columns: vec![], // all of them
})
.with_entry_id(self.dataset_id)
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let mut client = self.client.clone();

make_future_send(async move { Ok(client.inner().scan_dataset_manifest(request).await) })
.await?
.map_err(|err| DataFusionError::External(Box::new(err)))
}

fn process_response(
&mut self,
response: Self::GrpcStreamData,
) -> DataFusionResult<RecordBatch> {
response
.data
.ok_or(DataFusionError::Execution(
"DataFrame missing from DatasetManifest response".to_owned(),
))?
.decode()
.map_err(|err| DataFusionError::External(Box::new(err)))
}
}
2 changes: 2 additions & 0 deletions crates/store/re_datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod dataframe_query_common;
mod dataframe_query_provider;
#[cfg(target_arch = "wasm32")]
mod dataframe_query_provider_wasm;
mod dataset_manifest;
mod grpc_streaming_provider;
mod partition_table;
mod search_provider;
Expand All @@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
pub(crate) use dataframe_query_provider::PartitionStreamExec;
#[cfg(target_arch = "wasm32")]
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
pub use dataset_manifest::DatasetManifestProvider;
pub use partition_table::PartitionTableProvider;
pub use search_provider::SearchResultsTableProvider;
pub use table_entry_provider::TableEntryTableProvider;
1 change: 1 addition & 0 deletions crates/store/re_datafusion/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient;
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from DatasetManifestProvider
#[derive(Clone)]
pub struct PartitionTableProvider {
client: ConnectionClient,
Expand Down
42 changes: 40 additions & 2 deletions crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ service RerunCloudService {
// This endpoint requires the standard dataset headers.
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}

// Returns the schema of the dataset manifest.
//
// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by
// this endpoint, check out `ScanDatasetManifest`.
//
// This endpoint requires the standard dataset headers.
rpc GetDatasetManifestSchema(GetDatasetManifestSchemaRequest) returns (GetDatasetManifestSchemaResponse) {}

// Inspect the contents of the dataset manifest.
//
// The data will follow the schema returned by `GetDatasetManifestSchema`.
//
// This endpoint requires the standard dataset headers.
rpc ScanDatasetManifest(ScanDatasetManifestRequest) returns (stream ScanDatasetManifestResponse) {}

Comment on lines +80 to +94
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main point of this PR.

// Returns the schema of the dataset.
//
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
Expand Down Expand Up @@ -271,6 +286,29 @@ message ScanPartitionTableResponse {
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetDatasetManifestSchemaRequest {
reserved "dataset_id";
}

message GetDatasetManifestSchemaResponse {
rerun.common.v1alpha1.Schema schema = 1;
}

message ScanDatasetManifestRequest {
// A list of column names to be projected server-side.
//
// All of them if left empty.
repeated string columns = 3;

reserved "dataset_id";
reserved "scan_parameters";
}

message ScanDatasetManifestResponse {
// Layer metadata as arrow RecordBatch
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetDatasetSchemaRequest {
reserved 1;
reserved "dataset_id";
Expand Down Expand Up @@ -778,7 +816,7 @@ message CreateDatasetEntryRequest {
// Name of the dataset entry to create.
//
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
// with the same name already exists, the request will fail.
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
optional string name = 1;

// If specified, create the entry using this specific ID. Use at your own risk.
Expand Down Expand Up @@ -823,7 +861,7 @@ message RegisterTableRequest {
// Name of the table entry to create.
//
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
// with the same name already exists, the request will fail.
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
string name = 1;

// Information about the table to register.
Expand Down
9 changes: 0 additions & 9 deletions crates/store/re_protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,6 @@ pub mod cloud {
pub mod ext {
pub use crate::v1alpha1::rerun_cloud_v1alpha1_ext::*;
}

/// `DatasetManifest` mandatory field names. All mandatory metadata fields are prefixed
/// with "rerun_" to avoid conflicts with user-defined fields.
pub const DATASET_MANIFEST_ID_FIELD_NAME: &str = "rerun_partition_id";
pub const DATASET_MANIFEST_PARTITION_MANIFEST_UPDATED_AT_FIELD_NAME: &str = "rerun_partition_manifest_updated_at";
pub const DATASET_MANIFEST_RECORDING_TYPE_FIELD_NAME: &str = "rerun_partition_type";
pub const DATASET_MANIFEST_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time";
pub const DATASET_MANIFEST_START_TIME_FIELD_NAME: &str = "rerun_start_time";
pub const DATASET_MANIFEST_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url";
}
}

Expand Down
Loading
Loading