Skip to content

Commit 120219a

Browse files
authored
Clean-up of the partition table datafusion table provider (#11434)
1 parent 80849ce commit 120219a

File tree

11 files changed

+70
-199
lines changed

11 files changed

+70
-199
lines changed

Cargo.lock

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8657,7 +8657,6 @@ name = "re_datafusion"
86578657
version = "0.26.0-alpha.1+dev"
86588658
dependencies = [
86598659
"ahash",
8660-
"anyhow",
86618660
"arrow",
86628661
"async-stream",
86638662
"async-trait",
@@ -8666,7 +8665,6 @@ dependencies = [
86668665
"futures",
86678666
"futures-util",
86688667
"getrandom 0.3.3",
8669-
"itertools 0.14.0",
86708668
"log",
86718669
"parking_lot",
86728670
"re_arrow_util",
@@ -8676,7 +8674,6 @@ dependencies = [
86768674
"re_protos",
86778675
"re_redap_client",
86788676
"re_sorbet",
8679-
"re_tuid",
86808677
"re_uri",
86818678
"tokio",
86828679
"tokio-stream",

crates/store/re_datafusion/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,16 @@ re_log_encoding.workspace = true
3737
re_log_types.workspace = true
3838
re_protos.workspace = true
3939
re_sorbet.workspace = true
40-
re_tuid.workspace = true
4140
re_uri.workspace = true
4241

4342
# External dependencies:
4443
ahash.workspace = true
45-
anyhow.workspace = true
4644
arrow.workspace = true
4745
async-stream.workspace = true
4846
async-trait.workspace = true
4947
datafusion.workspace = true
5048
futures.workspace = true
5149
futures-util.workspace = true
52-
itertools.workspace = true
5350
log.workspace = true
5451
parking_lot.workspace = true
5552
tokio.workspace = true
@@ -67,5 +64,4 @@ getrandom = { workspace = true, features = ["wasm_js"] }
6764

6865

6966
[dev-dependencies]
70-
re_tuid.workspace = true
7167
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }

crates/store/re_datafusion/examples/catalog.rs

Lines changed: 0 additions & 86 deletions
This file was deleted.

crates/store/re_datafusion/src/catalog_provider.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
use crate::TableEntryTableProvider;
1+
use std::any::Any;
2+
use std::iter;
3+
use std::sync::Arc;
4+
25
use ahash::{HashMap, HashSet};
36
use async_trait::async_trait;
47
use datafusion::catalog::{CatalogProvider, SchemaProvider, TableProvider};
58
use datafusion::common::{DataFusionError, Result as DataFusionResult, TableReference, exec_err};
69
use parking_lot::Mutex;
7-
use re_redap_client::ConnectionClient;
8-
use std::any::Any;
9-
use std::iter;
10-
use std::sync::Arc;
1110
use tokio::runtime::Handle as RuntimeHandle;
1211

12+
use re_redap_client::ConnectionClient;
13+
14+
use crate::TableEntryTableProvider;
15+
1316
// These are to match the defaults in datafusion.
1417
pub const DEFAULT_CATALOG_NAME: &str = "datafusion";
1518
const DEFAULT_SCHEMA_NAME: &str = "public";

crates/store/re_datafusion/src/datafusion_connector.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

crates/store/re_datafusion/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ mod dataframe_query_common;
66
mod dataframe_query_provider;
77
#[cfg(target_arch = "wasm32")]
88
mod dataframe_query_provider_wasm;
9-
mod datafusion_connector;
109
mod grpc_streaming_provider;
1110
mod partition_table;
1211
mod search_provider;
@@ -19,7 +18,6 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
1918
pub(crate) use dataframe_query_provider::PartitionStreamExec;
2019
#[cfg(target_arch = "wasm32")]
2120
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
22-
pub use datafusion_connector::DataFusionConnector;
2321
pub use partition_table::PartitionTableProvider;
2422
pub use search_provider::SearchResultsTableProvider;
2523
pub use table_entry_provider::TableEntryTableProvider;

crates/store/re_datafusion/src/partition_table.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ use tracing::instrument;
1010

1111
use re_log_encoding::codec::wire::decoder::Decode as _;
1212
use re_log_types::EntryId;
13-
use re_protos::cloud::v1alpha1::GetPartitionTableSchemaRequest;
14-
use re_protos::headers::RerunHeadersInjectorExt as _;
1513
use re_protos::{
16-
cloud::v1alpha1::ScanPartitionTableRequest, cloud::v1alpha1::ScanPartitionTableResponse,
14+
cloud::v1alpha1::{ScanPartitionTableRequest, ScanPartitionTableResponse},
15+
headers::RerunHeadersInjectorExt as _,
1716
};
1817
use re_redap_client::ConnectionClient;
1918

@@ -22,7 +21,6 @@ use crate::wasm_compat::make_future_send;
2221

2322
#[derive(Clone)]
2423
pub struct PartitionTableProvider {
25-
//TODO(#10191): this should use a `ConnectionRegistryHandle` instead
2624
client: ConnectionClient,
2725
dataset_id: EntryId,
2826
}
@@ -52,27 +50,27 @@ impl GrpcStreamToTable for PartitionTableProvider {
5250

5351
#[instrument(skip(self), err)]
5452
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
55-
let req = tonic::Request::new(GetPartitionTableSchemaRequest {})
56-
.with_entry_id(self.dataset_id)
57-
.map_err(|err| DataFusionError::External(Box::new(err)))?;
58-
5953
let mut client = self.client.clone();
6054

55+
let dataset_id = self.dataset_id;
56+
6157
Ok(Arc::new(
62-
make_future_send(
63-
async move { Ok(client.inner().get_partition_table_schema(req).await) },
64-
)
65-
.await?
66-
.map_err(|err| DataFusionError::External(Box::new(err)))?
67-
.into_inner()
68-
.schema
69-
.ok_or(DataFusionError::External(
70-
"Schema missing from GetPartitionTableSchema response".into(),
71-
))?
72-
.try_into()?,
58+
make_future_send(async move {
59+
client
60+
.get_partition_table_schema(dataset_id)
61+
.await
62+
.map_err(|err| {
63+
DataFusionError::External(
64+
format!("Couldn't get partition table schema: {err}").into(),
65+
)
66+
})
67+
})
68+
.await?,
7369
))
7470
}
7571

72+
// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
73+
// `ConnectionClient`
7674
#[instrument(skip(self), err)]
7775
async fn send_streaming_request(
7876
&mut self,

crates/store/re_datafusion/src/search_provider.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use datafusion::{
66
catalog::TableProvider,
77
error::{DataFusionError, Result as DataFusionResult},
88
};
9-
use re_log_types::EntryId;
109
use tokio_stream::StreamExt as _;
1110
use tracing::instrument;
1211

1312
use re_log_encoding::codec::wire::decoder::Decode as _;
13+
use re_log_types::EntryId;
1414
use re_protos::{
1515
cloud::v1alpha1::{SearchDatasetRequest, SearchDatasetResponse},
1616
common::v1alpha1::ScanParameters,
@@ -23,7 +23,6 @@ use crate::wasm_compat::make_future_send;
2323

2424
#[derive(Clone)]
2525
pub struct SearchResultsTableProvider {
26-
//TODO(#10191): this should use a `ConnectionRegistryHandle` instead
2726
client: ConnectionClient,
2827
dataset_id: EntryId,
2928
request: SearchDatasetRequest,

crates/store/re_datafusion/src/table_entry_provider.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::wasm_compat::make_future_send;
2121

2222
#[derive(Clone)]
2323
pub struct TableEntryTableProvider {
24-
//TODO(#10191): this should use a `ConnectionRegistryHandle` instead
2524
client: ConnectionClient,
2625
table: EntryIdOrName,
2726

@@ -47,6 +46,10 @@ impl TableEntryTableProvider {
4746
}
4847
}
4948

49+
pub fn new_entry_list(client: ConnectionClient) -> Self {
50+
Self::new(client, "__entries")
51+
}
52+
5053
/// This is a convenience function
5154
pub async fn into_provider(self) -> Result<Arc<dyn TableProvider>, DataFusionError> {
5255
Ok(GrpcStreamProvider::prepare(self).await?)

0 commit comments

Comments
 (0)