Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions crates/store/re_datafusion/src/dataset_manifest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::sync::Arc;

use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
};
use tracing::instrument;

use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::EntryId;
use re_protos::{
cloud::v1alpha1::{ScanDatasetManifestRequest, ScanDatasetManifestResponse},
headers::RerunHeadersInjectorExt as _,
};
use re_redap_client::ConnectionClient;

use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from PartitionTableProvider
#[derive(Clone)]
pub struct DatasetManifestProvider {
client: ConnectionClient,
dataset_id: EntryId,
}

impl std::fmt::Debug for DatasetManifestProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatasetManifestProvider")
.field("dataset_id", &self.dataset_id)
.finish()
}
}

impl DatasetManifestProvider {
pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
Self { client, dataset_id }
}

/// This is a convenience function
pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
Ok(GrpcStreamProvider::prepare(self).await?)
}
}

#[async_trait]
impl GrpcStreamToTable for DatasetManifestProvider {
type GrpcStreamData = ScanDatasetManifestResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let mut client = self.client.clone();

let dataset_id = self.dataset_id;

Ok(Arc::new(
make_future_send(async move {
client
.get_dataset_manifest_schema(dataset_id)
.await
.map_err(|err| {
DataFusionError::External(
format!("Couldn't get partition table schema: {err}").into(),
)
})
})
.await?,
))
}

// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
// `ConnectionClient`
#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
let request = tonic::Request::new(ScanDatasetManifestRequest {
columns: vec![], // all of them
})
.with_entry_id(self.dataset_id)
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let mut client = self.client.clone();

make_future_send(async move { Ok(client.inner().scan_dataset_manifest(request).await) })
.await?
.map_err(|err| DataFusionError::External(Box::new(err)))
}

fn process_response(
&mut self,
response: Self::GrpcStreamData,
) -> DataFusionResult<RecordBatch> {
response
.data
.ok_or(DataFusionError::Execution(
"DataFrame missing from PartitionList response".to_owned(),
))?
.decode()
.map_err(|err| DataFusionError::External(Box::new(err)))
}
}
2 changes: 2 additions & 0 deletions crates/store/re_datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod dataframe_query_common;
mod dataframe_query_provider;
#[cfg(target_arch = "wasm32")]
mod dataframe_query_provider_wasm;
mod dataset_manifest;
mod grpc_streaming_provider;
mod partition_table;
mod search_provider;
Expand All @@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
pub(crate) use dataframe_query_provider::PartitionStreamExec;
#[cfg(target_arch = "wasm32")]
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
pub use dataset_manifest::DatasetManifestProvider;
pub use partition_table::PartitionTableProvider;
pub use search_provider::SearchResultsTableProvider;
pub use table_entry_provider::TableEntryTableProvider;
1 change: 1 addition & 0 deletions crates/store/re_datafusion/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient;
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from DatasetManifestProvider
#[derive(Clone)]
pub struct PartitionTableProvider {
client: ConnectionClient,
Expand Down
44 changes: 41 additions & 3 deletions crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ service RerunCloudService {
// This endpoint requires the standard dataset headers.
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}

// Returns the schema of the dataset manifest.
//
// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by
// this endpoint, check out `ScanDatasetManifest`.
//
// This endpoint requires the standard dataset headers.
rpc GetDatasetManifestSchema(GetDatasetManifestSchemaRequest) returns (GetDatasetManifestSchemaResponse) {}

// Inspect the contents of the dataset manifest.
//
// The data will follow the schema returned by `GetDatasetManifestSchema`.
//
// This endpoint requires the standard dataset headers.
rpc ScanDatasetManifest(ScanDatasetManifestRequest) returns (stream ScanDatasetManifestResponse) {}

// Returns the schema of the dataset.
//
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
Expand Down Expand Up @@ -271,6 +286,29 @@ message ScanPartitionTableResponse {
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetDatasetManifestSchemaRequest {
reserved "dataset_id";
}

message GetDatasetManifestSchemaResponse {
rerun.common.v1alpha1.Schema schema = 1;
}

message ScanDatasetManifestRequest {
// A list of column names to be projected server-side.
//
// All of them if left empty.
repeated string columns = 3;

reserved "dataset_id";
reserved "scan_parameters";
}

message ScanDatasetManifestResponse {
// Layer metadata as arrow RecordBatch
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetDatasetSchemaRequest {
reserved 1;
reserved "dataset_id";
Expand Down Expand Up @@ -778,7 +816,7 @@ message CreateDatasetEntryRequest {
// Name of the dataset entry to create.
//
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
// with the same name already exists, the request will fail.
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
optional string name = 1;

// If specified, create the entry using this specific ID. Use at your own risk.
Expand Down Expand Up @@ -820,10 +858,10 @@ message UpdateDatasetEntryResponse {
// RegisterTable

message RegisterTableRequest {
// Name of the table entry to create.
// Name of the dataset entry to create.
//
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
// with the same name already exists, the request will fail.
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
string name = 1;

// Information about the table to register.
Expand Down
Loading
Loading