Skip to content

Commit e9429eb

Browse files
abey79Copilotteh-cmc
authored
Introduce the dataset manifest and remove layer information from the partition table (#11423)
Co-authored-by: Copilot <[email protected]> Co-authored-by: Clement Rey <[email protected]>
1 parent e7a4258 commit e9429eb

File tree

19 files changed

+801
-106
lines changed

19 files changed

+801
-106
lines changed

crates/store/re_datafusion/src/dataframe_query_common.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@ use re_dataframe::external::re_chunk_store::ChunkStore;
2323
use re_dataframe::{Index, QueryExpression};
2424
use re_log_encoding::codec::wire::decoder::Decode as _;
2525
use re_log_types::EntryId;
26-
use re_protos::cloud::v1alpha1::ext::{Query, QueryLatestAt, QueryRange};
27-
use re_protos::cloud::v1alpha1::{DATASET_MANIFEST_ID_FIELD_NAME, QueryDatasetResponse};
28-
use re_protos::cloud::v1alpha1::{GetDatasetSchemaRequest, QueryDatasetRequest};
29-
use re_protos::common::v1alpha1::ext::ScanParameters;
30-
use re_protos::headers::RerunHeadersInjectorExt as _;
26+
use re_protos::{
27+
cloud::v1alpha1::{
28+
GetDatasetSchemaRequest, QueryDatasetRequest, QueryDatasetResponse,
29+
ScanPartitionTableResponse,
30+
ext::{Query, QueryLatestAt, QueryRange},
31+
},
32+
common::v1alpha1::ext::ScanParameters,
33+
headers::RerunHeadersInjectorExt as _,
34+
};
3135
use re_redap_client::{ConnectionClient, ConnectionRegistryHandle};
3236
use re_sorbet::{BatchType, ChunkColumnDescriptors, ColumnKind, ComponentColumnSelector};
3337
use re_uri::Origin;
@@ -168,7 +172,7 @@ impl DataframeQueryTableProvider {
168172

169173
let schema = Arc::new(prepend_string_column_schema(
170174
&schema,
171-
DATASET_MANIFEST_ID_FIELD_NAME,
175+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
172176
));
173177

174178
Ok(Self {

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use re_dataframe::{
3333
ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine,
3434
};
3535
use re_log_types::{ApplicationId, StoreId, StoreKind};
36-
use re_protos::cloud::v1alpha1::{DATASET_MANIFEST_ID_FIELD_NAME, FetchChunksRequest};
36+
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
3737
use re_redap_client::ConnectionClient;
3838
use re_sorbet::{ColumnDescriptor, ColumnSelector};
3939

@@ -201,10 +201,12 @@ impl PartitionStreamExec {
201201
let orderings = if projected_schema
202202
.fields()
203203
.iter()
204-
.any(|f| f.name().as_str() == DATASET_MANIFEST_ID_FIELD_NAME)
204+
.any(|f| f.name().as_str() == ScanPartitionTableResponse::FIELD_PARTITION_ID)
205205
{
206-
let partition_col =
207-
Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0)) as Arc<dyn PhysicalExpr>;
206+
let partition_col = Arc::new(Column::new(
207+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
208+
0,
209+
)) as Arc<dyn PhysicalExpr>;
208210
let order_col = sort_index
209211
.and_then(|index| {
210212
let index_name = index.as_str();
@@ -242,7 +244,10 @@ impl PartitionStreamExec {
242244

243245
let output_partitioning = if partition_in_output_schema {
244246
Partitioning::Hash(
245-
vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))],
247+
vec![Arc::new(Column::new(
248+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
249+
0,
250+
))],
246251
num_partitions,
247252
)
248253
} else {
@@ -303,7 +308,7 @@ async fn send_next_row(
303308

304309
let batch_schema = Arc::new(prepend_string_column_schema(
305310
&query_schema,
306-
DATASET_MANIFEST_ID_FIELD_NAME,
311+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
307312
));
308313

309314
let batch = RecordBatch::try_new_with_options(

crates/store/re_datafusion/src/dataframe_query_provider_wasm.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ use re_dataframe::{
2828
};
2929
use re_log_encoding::codec::wire::encoder::Encode as _;
3030
use re_log_types::{StoreId, StoreKind};
31-
use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
32-
use re_protos::cloud::v1alpha1::FetchChunksRequest;
31+
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
3332
use re_redap_client::ConnectionClient;
3433

3534
use crate::dataframe_query_common::{
@@ -199,8 +198,10 @@ impl PartitionStreamExec {
199198
None => Arc::clone(table_schema),
200199
};
201200

202-
let partition_col =
203-
Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0)) as Arc<dyn PhysicalExpr>;
201+
let partition_col = Arc::new(Column::new(
202+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
203+
0,
204+
)) as Arc<dyn PhysicalExpr>;
204205
let order_col = sort_index
205206
.and_then(|index| {
206207
let index_name = index.as_str();
@@ -236,7 +237,10 @@ impl PartitionStreamExec {
236237

237238
let output_partitioning = if partition_in_output_schema {
238239
Partitioning::Hash(
239-
vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))],
240+
vec![Arc::new(Column::new(
241+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
242+
0,
243+
))],
240244
num_partitions,
241245
)
242246
} else {
@@ -295,7 +299,7 @@ fn create_next_row(
295299

296300
let batch_schema = Arc::new(prepend_string_column_schema(
297301
&query_schema,
298-
DATASET_MANIFEST_ID_FIELD_NAME,
302+
ScanPartitionTableResponse::FIELD_PARTITION_ID,
299303
));
300304

301305
let batch = RecordBatch::try_new_with_options(
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::sync::Arc;
2+
3+
use arrow::{array::RecordBatch, datatypes::SchemaRef};
4+
use async_trait::async_trait;
5+
use datafusion::{
6+
catalog::TableProvider,
7+
error::{DataFusionError, Result as DataFusionResult},
8+
};
9+
use tracing::instrument;
10+
11+
use re_log_encoding::codec::wire::decoder::Decode as _;
12+
use re_log_types::EntryId;
13+
use re_protos::{
14+
cloud::v1alpha1::{ScanDatasetManifestRequest, ScanDatasetManifestResponse},
15+
headers::RerunHeadersInjectorExt as _,
16+
};
17+
use re_redap_client::ConnectionClient;
18+
19+
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
20+
use crate::wasm_compat::make_future_send;
21+
22+
//TODO(ab): deduplicate from PartitionTableProvider
23+
#[derive(Clone)]
24+
pub struct DatasetManifestProvider {
25+
client: ConnectionClient,
26+
dataset_id: EntryId,
27+
}
28+
29+
impl std::fmt::Debug for DatasetManifestProvider {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
f.debug_struct("DatasetManifestProvider")
32+
.field("dataset_id", &self.dataset_id)
33+
.finish()
34+
}
35+
}
36+
37+
impl DatasetManifestProvider {
38+
pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
39+
Self { client, dataset_id }
40+
}
41+
42+
/// This is a convenience function
43+
pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
44+
Ok(GrpcStreamProvider::prepare(self).await?)
45+
}
46+
}
47+
48+
#[async_trait]
49+
impl GrpcStreamToTable for DatasetManifestProvider {
50+
type GrpcStreamData = ScanDatasetManifestResponse;
51+
52+
#[instrument(skip(self), err)]
53+
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
54+
let mut client = self.client.clone();
55+
56+
let dataset_id = self.dataset_id;
57+
58+
Ok(Arc::new(
59+
make_future_send(async move {
60+
client
61+
.get_dataset_manifest_schema(dataset_id)
62+
.await
63+
.map_err(|err| {
64+
DataFusionError::External(
65+
format!("Couldn't get dataset manifest schema: {err}").into(),
66+
)
67+
})
68+
})
69+
.await?,
70+
))
71+
}
72+
73+
// TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
74+
// `ConnectionClient`
75+
#[instrument(skip(self), err)]
76+
async fn send_streaming_request(
77+
&mut self,
78+
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
79+
let request = tonic::Request::new(ScanDatasetManifestRequest {
80+
columns: vec![], // all of them
81+
})
82+
.with_entry_id(self.dataset_id)
83+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
84+
85+
let mut client = self.client.clone();
86+
87+
make_future_send(async move { Ok(client.inner().scan_dataset_manifest(request).await) })
88+
.await?
89+
.map_err(|err| DataFusionError::External(Box::new(err)))
90+
}
91+
92+
fn process_response(
93+
&mut self,
94+
response: Self::GrpcStreamData,
95+
) -> DataFusionResult<RecordBatch> {
96+
response
97+
.data
98+
.ok_or(DataFusionError::Execution(
99+
"DataFrame missing from DatasetManifest response".to_owned(),
100+
))?
101+
.decode()
102+
.map_err(|err| DataFusionError::External(Box::new(err)))
103+
}
104+
}

crates/store/re_datafusion/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod dataframe_query_common;
66
mod dataframe_query_provider;
77
#[cfg(target_arch = "wasm32")]
88
mod dataframe_query_provider_wasm;
9+
mod dataset_manifest;
910
mod grpc_streaming_provider;
1011
mod partition_table;
1112
mod search_provider;
@@ -18,6 +19,7 @@ pub use dataframe_query_common::{DataframeQueryTableProvider, query_from_query_e
1819
pub(crate) use dataframe_query_provider::PartitionStreamExec;
1920
#[cfg(target_arch = "wasm32")]
2021
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
22+
pub use dataset_manifest::DatasetManifestProvider;
2123
pub use partition_table::PartitionTableProvider;
2224
pub use search_provider::SearchResultsTableProvider;
2325
pub use table_entry_provider::TableEntryTableProvider;

crates/store/re_datafusion/src/partition_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use re_redap_client::ConnectionClient;
1919
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
2020
use crate::wasm_compat::make_future_send;
2121

22+
//TODO(ab): deduplicate from DatasetManifestProvider
2223
#[derive(Clone)]
2324
pub struct PartitionTableProvider {
2425
client: ConnectionClient,

crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,21 @@ service RerunCloudService {
7777
// This endpoint requires the standard dataset headers.
7878
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}
7979

80+
// Returns the schema of the dataset manifest.
81+
//
82+
// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by
83+
// this endpoint, check out `ScanDatasetManifest`.
84+
//
85+
// This endpoint requires the standard dataset headers.
86+
rpc GetDatasetManifestSchema(GetDatasetManifestSchemaRequest) returns (GetDatasetManifestSchemaResponse) {}
87+
88+
// Inspect the contents of the dataset manifest.
89+
//
90+
// The data will follow the schema returned by `GetDatasetManifestSchema`.
91+
//
92+
// This endpoint requires the standard dataset headers.
93+
rpc ScanDatasetManifest(ScanDatasetManifestRequest) returns (stream ScanDatasetManifestResponse) {}
94+
8095
// Returns the schema of the dataset.
8196
//
8297
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
@@ -255,7 +270,25 @@ message ScanPartitionTableRequest {
255270
}
256271

257272
message ScanPartitionTableResponse {
258-
// Partitions metadata as arrow RecordBatch
273+
// Partitions metadata as Arrow RecordBatch.
274+
rerun.common.v1alpha1.DataframePart data = 1;
275+
}
276+
277+
message GetDatasetManifestSchemaRequest {}
278+
279+
message GetDatasetManifestSchemaResponse {
280+
rerun.common.v1alpha1.Schema schema = 1;
281+
}
282+
283+
message ScanDatasetManifestRequest {
284+
// A list of column names to be projected server-side.
285+
//
286+
// All of them if left empty.
287+
repeated string columns = 3;
288+
}
289+
290+
message ScanDatasetManifestResponse {
291+
// The contents of the dataset manifest (i.e. information about layers) as Arrow RecordBatch.
259292
rerun.common.v1alpha1.DataframePart data = 1;
260293
}
261294

@@ -709,7 +742,7 @@ message CreateDatasetEntryRequest {
709742
// Name of the dataset entry to create.
710743
//
711744
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
712-
// with the same name already exists, the request will fail.
745+
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
713746
optional string name = 1;
714747

715748
// If specified, create the entry using this specific ID. Use at your own risk.
@@ -754,7 +787,7 @@ message RegisterTableRequest {
754787
// Name of the table entry to create.
755788
//
756789
// The name should be a short human-readable string. It must be unique within all entries in the catalog. If an entry
757-
// with the same name already exists, the request will fail.
790+
// with the same name already exists, the request will fail. Entry names ending with `__manifest` are reserved.
758791
string name = 1;
759792

760793
// Information about the table to register.

crates/store/re_protos/src/lib.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,6 @@ pub mod cloud {
6969
pub mod ext {
7070
pub use crate::v1alpha1::rerun_cloud_v1alpha1_ext::*;
7171
}
72-
73-
/// `DatasetManifest` mandatory field names. All mandatory metadata fields are prefixed
74-
/// with "rerun_" to avoid conflicts with user-defined fields.
75-
pub const DATASET_MANIFEST_ID_FIELD_NAME: &str = "rerun_partition_id";
76-
pub const DATASET_MANIFEST_PARTITION_MANIFEST_UPDATED_AT_FIELD_NAME: &str = "rerun_partition_manifest_updated_at";
77-
pub const DATASET_MANIFEST_RECORDING_TYPE_FIELD_NAME: &str = "rerun_partition_type";
78-
pub const DATASET_MANIFEST_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time";
79-
pub const DATASET_MANIFEST_START_TIME_FIELD_NAME: &str = "rerun_start_time";
80-
pub const DATASET_MANIFEST_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url";
8172
}
8273
}
8374

0 commit comments

Comments
 (0)