Skip to content

Commit 1cd8a69

Browse files
committed
add table provider for layer table
1 parent 6850986 commit 1cd8a69

File tree

5 files changed

+133
-3
lines changed

5 files changed

+133
-3
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::sync::Arc;
2+
3+
use arrow::{array::RecordBatch, datatypes::SchemaRef};
4+
use async_trait::async_trait;
5+
use datafusion::{
6+
catalog::TableProvider,
7+
error::{DataFusionError, Result as DataFusionResult},
8+
};
9+
use tracing::instrument;
10+
11+
use re_log_encoding::codec::wire::decoder::Decode as _;
12+
use re_log_types::EntryId;
13+
use re_protos::{
14+
cloud::v1alpha1::{ScanLayerTableRequest, ScanLayerTableResponse},
15+
headers::RerunHeadersInjectorExt as _,
16+
};
17+
use re_redap_client::ConnectionClient;
18+
19+
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
20+
use crate::wasm_compat::make_future_send;
21+
22+
//TODO(ab): deduplicate from PartitionTableProvider
23+
#[derive(Clone)]
24+
pub struct LayerTableProvider {
25+
client: ConnectionClient,
26+
dataset_id: EntryId,
27+
}
28+
29+
impl std::fmt::Debug for LayerTableProvider {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
f.debug_struct("LayerTableProvider")
32+
.field("dataset_id", &self.dataset_id)
33+
.finish()
34+
}
35+
}
36+
37+
impl LayerTableProvider {
38+
pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
39+
Self { client, dataset_id }
40+
}
41+
42+
/// This is a convenience function
43+
pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
44+
Ok(GrpcStreamProvider::prepare(self).await?)
45+
}
46+
}
47+
48+
#[async_trait]
49+
impl GrpcStreamToTable for LayerTableProvider {
50+
type GrpcStreamData = ScanLayerTableResponse;
51+
52+
#[instrument(skip(self), err)]
53+
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
54+
let mut client = self.client.clone();
55+
56+
let dataset_id = self.dataset_id;
57+
58+
Ok(Arc::new(
59+
make_future_send(async move {
60+
client
61+
.get_layer_table_schema(dataset_id)
62+
.await
63+
.map_err(|err| {
64+
DataFusionError::External(
65+
format!("Couldn't get partition table schema: {err}").into(),
66+
)
67+
})
68+
})
69+
.await?,
70+
))
71+
}
72+
73+
// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
74+
// `ConnectionClient`
75+
#[instrument(skip(self), err)]
76+
async fn send_streaming_request(
77+
&mut self,
78+
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
79+
let request = tonic::Request::new(ScanLayerTableRequest {
80+
columns: vec![], // all of them
81+
})
82+
.with_entry_id(self.dataset_id)
83+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
84+
85+
let mut client = self.client.clone();
86+
87+
make_future_send(async move { Ok(client.inner().scan_layer_table(request).await) })
88+
.await?
89+
.map_err(|err| DataFusionError::External(Box::new(err)))
90+
}
91+
92+
fn process_response(
93+
&mut self,
94+
response: Self::GrpcStreamData,
95+
) -> DataFusionResult<RecordBatch> {
96+
response
97+
.data
98+
.ok_or(DataFusionError::Execution(
99+
"DataFrame missing from PartitionList response".to_owned(),
100+
))?
101+
.decode()
102+
.map_err(|err| DataFusionError::External(Box::new(err)))
103+
}
104+
}

crates/store/re_datafusion/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod dataframe_query_provider;
77
#[cfg(target_arch = "wasm32")]
88
mod dataframe_query_provider_wasm;
99
mod grpc_streaming_provider;
10+
mod layer_table;
1011
mod partition_table;
1112
mod search_provider;
1213
mod table_entry_provider;
@@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
1819
pub(crate) use dataframe_query_provider::PartitionStreamExec;
1920
#[cfg(target_arch = "wasm32")]
2021
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
22+
pub use layer_table::LayerTableProvider;
2123
pub use partition_table::PartitionTableProvider;
2224
pub use search_provider::SearchResultsTableProvider;
2325
pub use table_entry_provider::TableEntryTableProvider;

crates/store/re_datafusion/src/partition_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient;
1919
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
2020
use crate::wasm_compat::make_future_send;
2121

22+
//TODO(ab): deduplicate from LayerTableProvider
2223
#[derive(Clone)]
2324
pub struct PartitionTableProvider {
2425
client: ConnectionClient,

crates/store/re_redap_client/src/connection_client.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use re_protos::{
99
TypeConversionError,
1010
cloud::v1alpha1::{
1111
CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind, FindEntriesRequest,
12-
GetPartitionTableSchemaRequest, GetPartitionTableSchemaResponse, ReadDatasetEntryRequest,
13-
ReadTableEntryRequest, RegisterWithDatasetResponse, ScanPartitionTableRequest,
14-
ScanPartitionTableResponse,
12+
GetLayerTableSchemaRequest, GetLayerTableSchemaResponse, GetPartitionTableSchemaRequest,
13+
GetPartitionTableSchemaResponse, ReadDatasetEntryRequest, ReadTableEntryRequest,
14+
RegisterWithDatasetResponse, ScanPartitionTableRequest, ScanPartitionTableResponse,
1515
ext::{
1616
CreateDatasetEntryResponse, DataSource, DataSourceKind, DatasetDetails, DatasetEntry,
1717
EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails as _,
@@ -269,6 +269,26 @@ where
269269
Ok(partition_ids)
270270
}
271271

272+
//TODO(ab): accept entry name
273+
pub async fn get_layer_table_schema(
274+
&mut self,
275+
entry_id: EntryId,
276+
) -> Result<ArrowSchema, StreamError> {
277+
Ok(self
278+
.inner()
279+
.get_layer_table_schema(
280+
tonic::Request::new(GetLayerTableSchemaRequest {})
281+
.with_entry_id(entry_id)
282+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?,
283+
)
284+
.await
285+
.map_err(|err| StreamEntryError::GetLayerTableSchema(err.into()))?
286+
.into_inner()
287+
.schema
288+
.ok_or_else(|| missing_field!(GetLayerTableSchemaResponse, "schema"))?
289+
.try_into()?)
290+
}
291+
272292
/// Initiate registration of the provided recording URIs with a dataset and return the
273293
/// corresponding task descriptors.
274294
///

crates/store/re_redap_client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub enum StreamEntryError {
9696
#[error("Failed reading partition table scheme\nDetails:{0}")]
9797
GetPartitionTableSchema(TonicStatusError),
9898

99+
#[error("Failed reading layer table scheme\nDetails:{0}")]
100+
GetLayerTableSchema(TonicStatusError),
101+
99102
#[error("Failed scanning the partition table \nDetails:{0}")]
100103
ScanPartitionTable(TonicStatusError),
101104

0 commit comments

Comments
 (0)