Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions crates/store/re_datafusion/src/layer_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::sync::Arc;

use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
};
use tracing::instrument;

use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::EntryId;
use re_protos::{
cloud::v1alpha1::{ScanLayerTableRequest, ScanLayerTableResponse},
headers::RerunHeadersInjectorExt as _,
};
use re_redap_client::ConnectionClient;

use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from PartitionTableProvider
#[derive(Clone)]
pub struct LayerTableProvider {
client: ConnectionClient,
dataset_id: EntryId,
}

impl std::fmt::Debug for LayerTableProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LayerTableProvider")
.field("dataset_id", &self.dataset_id)
.finish()
}
}

impl LayerTableProvider {
pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
Self { client, dataset_id }
}

/// This is a convenience function
pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
Ok(GrpcStreamProvider::prepare(self).await?)
}
}

#[async_trait]
impl GrpcStreamToTable for LayerTableProvider {
type GrpcStreamData = ScanLayerTableResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let mut client = self.client.clone();

let dataset_id = self.dataset_id;

Ok(Arc::new(
make_future_send(async move {
client
.get_layer_table_schema(dataset_id)
.await
.map_err(|err| {
DataFusionError::External(
format!("Couldn't get partition table schema: {err}").into(),
)
})
})
.await?,
))
}

// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
// `ConnectionClient`
#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
let request = tonic::Request::new(ScanLayerTableRequest {
columns: vec![], // all of them
})
.with_entry_id(self.dataset_id)
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let mut client = self.client.clone();

make_future_send(async move { Ok(client.inner().scan_layer_table(request).await) })
.await?
.map_err(|err| DataFusionError::External(Box::new(err)))
}

fn process_response(
&mut self,
response: Self::GrpcStreamData,
) -> DataFusionResult<RecordBatch> {
response
.data
.ok_or(DataFusionError::Execution(
"DataFrame missing from PartitionList response".to_owned(),
))?
.decode()
.map_err(|err| DataFusionError::External(Box::new(err)))
}
}
2 changes: 2 additions & 0 deletions crates/store/re_datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod dataframe_query_provider;
#[cfg(target_arch = "wasm32")]
mod dataframe_query_provider_wasm;
mod grpc_streaming_provider;
mod layer_table;
mod partition_table;
mod search_provider;
mod table_entry_provider;
Expand All @@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
pub(crate) use dataframe_query_provider::PartitionStreamExec;
#[cfg(target_arch = "wasm32")]
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
pub use layer_table::LayerTableProvider;
pub use partition_table::PartitionTableProvider;
pub use search_provider::SearchResultsTableProvider;
pub use table_entry_provider::TableEntryTableProvider;
1 change: 1 addition & 0 deletions crates/store/re_datafusion/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient;
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
use crate::wasm_compat::make_future_send;

//TODO(ab): deduplicate from LayerTableProvider
#[derive(Clone)]
pub struct PartitionTableProvider {
client: ConnectionClient,
Expand Down
38 changes: 38 additions & 0 deletions crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ service RerunCloudService {
// This endpoint requires the standard dataset headers.
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}

// Returns the schema of the layer table.
//
// To inspect the data of the partition table, which is guaranteed to match the schema returned by
// this endpoint, check out `ScanLayerTable`.
//
// This endpoint requires the standard dataset headers.
rpc GetLayerTableSchema(GetLayerTableSchemaRequest) returns (GetLayerTableSchemaResponse) {}

// Inspect the contents of the layer table.
//
// The data will follow the schema returned by `GetLayerTableSchema`.
//
// This endpoint requires the standard dataset headers.
rpc ScanLayerTable(ScanLayerTableRequest) returns (stream ScanLayerTableResponse) {}

// Returns the schema of the dataset.
//
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
Expand Down Expand Up @@ -271,6 +286,29 @@ message ScanPartitionTableResponse {
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetLayerTableSchemaRequest {
reserved "dataset_id";
}

message GetLayerTableSchemaResponse {
rerun.common.v1alpha1.Schema schema = 1;
}

message ScanLayerTableRequest {
// A list of column names to be projected server-side.
//
// All of them if left empty.
repeated string columns = 3;

reserved "dataset_id";
reserved "scan_parameters";
}

message ScanLayerTableResponse {
// Layer metadata as arrow RecordBatch
rerun.common.v1alpha1.DataframePart data = 1;
}

message GetDatasetSchemaRequest {
reserved 1;
reserved "dataset_id";
Expand Down
144 changes: 122 additions & 22 deletions crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use arrow::array::RecordBatchOptions;
use arrow::array::{
FixedSizeBinaryArray, ListBuilder, RecordBatchOptions, StringBuilder, UInt64Array,
};
use arrow::{
array::{Array, ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray},
datatypes::{DataType, Field, Schema, TimeUnit},
Expand All @@ -11,10 +13,9 @@ use re_chunk::TimelineName;
use re_log_types::{EntityPath, EntryId, TimeInt};
use re_sorbet::ComponentColumnDescriptor;

use crate::cloud::v1alpha1::{EntryKind, QueryTasksResponse};
use crate::cloud::v1alpha1::{
GetDatasetSchemaResponse, RegisterWithDatasetResponse, ScanPartitionTableResponse,
VectorDistanceMetric,
EntryKind, GetDatasetSchemaResponse, QueryTasksResponse, RegisterWithDatasetResponse,
ScanLayerTableResponse, ScanPartitionTableResponse, VectorDistanceMetric,
};
use crate::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId};
use crate::common::v1alpha1::{ComponentDescriptor, DataframePart, TaskId};
Expand Down Expand Up @@ -1178,51 +1179,150 @@ pub struct RegisterWithDatasetTaskDescriptor {

impl ScanPartitionTableResponse {
pub const PARTITION_ID: &str = "rerun_partition_id";
pub const PARTITION_TYPE: &str = "rerun_partition_type";
pub const LAYERS: &str = "rerun_layers";
pub const STORAGE_URLS: &str = "rerun_storage_urls";
pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at";
pub const NUM_CHUNKS: &str = "rerun_num_chunks";
pub const SIZE_BYTES: &str = "rerun_size_bytes";

pub fn schema() -> Schema {
Schema::new(vec![
Field::new(Self::PARTITION_ID, DataType::Utf8, false),
Field::new(
Self::LAYERS,
DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))),
false,
),
Field::new(
Self::STORAGE_URLS,
DataType::List(Arc::new(Field::new(
Self::STORAGE_URLS,
DataType::Utf8,
false,
))),
false,
),
Field::new(
Self::LAST_UPDATED_AT,
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new(Self::NUM_CHUNKS, DataType::UInt64, false),
Field::new(Self::SIZE_BYTES, DataType::UInt64, false),
])
}

/// Helper to simplify instantiation of the dataframe in [`Self::data`].
pub fn create_dataframe(
partition_ids: Vec<String>,
layers: Vec<Vec<String>>,
storage_urls: Vec<Vec<String>>,
last_updated_at: Vec<i64>,
num_chunks: Vec<u64>,
size_bytes: Vec<u64>,
) -> arrow::error::Result<RecordBatch> {
let row_count = partition_ids.len();
let schema = Arc::new(Self::schema());

let mut layers_builder = ListBuilder::new(StringBuilder::new())
.with_field(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false)));

for mut inner_vec in layers {
for layer_name in inner_vec.drain(..) {
layers_builder.values().append_value(layer_name)
}
layers_builder.append(true);
}

let mut urls_builder = ListBuilder::new(StringBuilder::new()).with_field(Arc::new(
Field::new(Self::STORAGE_URLS, DataType::Utf8, false),
));

for mut inner_vec in storage_urls {
for layer_name in inner_vec.drain(..) {
urls_builder.values().append_value(layer_name)
}
urls_builder.append(true);
}

let columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(partition_ids)),
Arc::new(layers_builder.finish()),
Arc::new(urls_builder.finish()),
Arc::new(TimestampNanosecondArray::from(last_updated_at)),
Arc::new(UInt64Array::from(num_chunks)),
Arc::new(UInt64Array::from(size_bytes)),
];

RecordBatch::try_new_with_options(
schema,
columns,
&RecordBatchOptions::default().with_row_count(Some(row_count)),
)
}

pub fn data(&self) -> Result<&DataframePart, TypeConversionError> {
Ok(self.data.as_ref().ok_or_else(|| {
missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data")
})?)
}
}

// --- ScanLayerTableResponse --

impl ScanLayerTableResponse {
pub const LAYER_NAME: &str = "rerun_layer_name";
pub const PARTITION_ID: &str = "rerun_partition_id";
pub const STORAGE_URL: &str = "rerun_storage_url";
pub const LAYER_TYPE: &str = "rerun_layer_type";
pub const REGISTRATION_TIME: &str = "rerun_registration_time";
pub const PARTITION_MANIFEST_UPDATED_AT: &str = "rerun_partition_manifest_updated_at";
pub const PARTITION_MANIFEST_URL: &str = "rerun_partition_manifest_url";
pub const NUM_CHUNKS: &str = "rerun_num_chunks";
pub const SIZE_BYTES: &str = "rerun_size_bytes";
pub const SCHEMA_SHA256: &str = "rerun_schema_sha256";

pub fn schema() -> Schema {
Schema::new(vec![
Field::new(Self::LAYER_NAME, DataType::Utf8, false),
Field::new(Self::PARTITION_ID, DataType::Utf8, false),
Field::new(Self::PARTITION_TYPE, DataType::Utf8, false),
Field::new(Self::STORAGE_URL, DataType::Utf8, false),
Field::new(Self::LAYER_TYPE, DataType::Utf8, false),
Field::new(
Self::REGISTRATION_TIME,
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new(
Self::PARTITION_MANIFEST_UPDATED_AT,
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Field::new(Self::PARTITION_MANIFEST_URL, DataType::Utf8, true),
Field::new(Self::NUM_CHUNKS, DataType::UInt64, false),
Field::new(Self::SIZE_BYTES, DataType::UInt64, false),
Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false),
])
}

/// Helper to simplify instantiation of the dataframe in [`Self::data`].
pub fn create_dataframe(
layer_names: Vec<String>,
partition_ids: Vec<String>,
partition_types: Vec<String>,
storage_urls: Vec<String>,
layer_types: Vec<String>,
registration_times: Vec<i64>,
partition_manifest_updated_ats: Vec<Option<i64>>,
partition_manifest_urls: Vec<Option<String>>,
num_chunks: Vec<u64>,
size_bytes: Vec<u64>,
schema_sha256s: Vec<[u8; 32]>,
) -> arrow::error::Result<RecordBatch> {
let row_count = partition_ids.len();
let schema = Arc::new(Self::schema());

let columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(layer_names)),
Arc::new(StringArray::from(partition_ids)),
Arc::new(StringArray::from(partition_types)),
Arc::new(StringArray::from(storage_urls)),
Arc::new(StringArray::from(layer_types)),
Arc::new(TimestampNanosecondArray::from(registration_times)),
Arc::new(TimestampNanosecondArray::from(
partition_manifest_updated_ats,
)),
Arc::new(StringArray::from(partition_manifest_urls)),
Arc::new(UInt64Array::from(num_chunks)),
Arc::new(UInt64Array::from(size_bytes)),
Arc::new(
FixedSizeBinaryArray::try_from_iter(schema_sha256s.into_iter())
.expect("sizes of nested slices are guaranteed to match"),
),
];

RecordBatch::try_new_with_options(
Expand Down
Loading
Loading