-
Notifications
You must be signed in to change notification settings - Fork 540
Introduce the dataset manifest and remove layer information from the partition table #11423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Web viewer built successfully.
Note: This comment is updated whenever you push a commit. |
701aed1
to
1cd8a69
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces the dataset manifest table functionality and removes layer-specific columns from the partition table. The dataset manifest provides layer-level metadata while the partition table now focuses solely on partition-level information.
- Adds new gRPC endpoints for dataset manifest schema and scanning operations
- Refactors partition table structure to remove layer information and add partition metadata
- Implements dataset manifest provider for DataFusion integration
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
rerun_py/src/catalog/dataset_entry.rs | Adds manifest() method to expose dataset manifest as DataFusion table |
rerun_py/rerun_bindings/rerun_bindings.pyi | Python type hints for new manifest() method |
crates/store/re_server/src/store.rs | Updates partition table schema removing layer columns and adding metadata |
crates/store/re_server/src/rerun_cloud.rs | Implements placeholder gRPC handlers for dataset manifest endpoints |
crates/store/re_redap_client/src/lib.rs | Adds error variant for dataset manifest schema operations |
crates/store/re_redap_client/src/connection_client.rs | Implements client method for dataset manifest schema fetching |
crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs | Generated protobuf code for new dataset manifest endpoints |
crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs | Schema definitions and helper methods for dataset manifest responses |
crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto | Protocol buffer definitions for dataset manifest endpoints |
crates/store/re_datafusion/src/partition_table.rs | Adds TODO comment for deduplication |
crates/store/re_datafusion/src/lib.rs | Exports new DatasetManifestProvider |
crates/store/re_datafusion/src/dataset_manifest.rs | Implements DatasetManifestProvider for DataFusion integration |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Latest documentation preview deployed successfully.
Note: This comment is updated whenever you push a commit. |
// Returns the schema of the dataset manifest. | ||
// | ||
// To inspect the data of the dataset manifest, which is guaranteed to match the schema returned by | ||
// this endpoint, check out `ScanDatasetManifest`. | ||
// | ||
// This endpoint requires the standard dataset headers. | ||
rpc GetDatasetManifestSchema(GetDatasetManifestSchemaRequest) returns (GetDatasetManifestSchemaResponse) {} | ||
|
||
// Inspect the contents of the dataset manifest. | ||
// | ||
// The data will follow the schema returned by `GetDatasetManifestSchema`. | ||
// | ||
// This endpoint requires the standard dataset headers. | ||
rpc ScanDatasetManifest(ScanDatasetManifestRequest) returns (stream ScanDatasetManifestResponse) {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main point of this PR.
impl ScanPartitionTableResponse { | ||
pub const PARTITION_ID: &str = "rerun_partition_id"; | ||
pub const PARTITION_TYPE: &str = "rerun_partition_type"; | ||
|
||
/// Layer names for this partition, one per layer. | ||
/// | ||
/// Should have the same length as [`Self::STORAGE_URLS`]. | ||
pub const LAYER_NAMES: &str = "rerun_layer_names"; | ||
|
||
/// Storage URLs for this partition, one per layer. | ||
/// | ||
/// Should have the same length as [`Self::LAYER_NAMES`]. | ||
pub const STORAGE_URLS: &str = "rerun_storage_urls"; | ||
pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at"; | ||
|
||
/// Total number of chunks for this partition. | ||
pub const NUM_CHUNKS: &str = "rerun_num_chunks"; | ||
|
||
/// Total size in bytes for this partition. | ||
pub const SIZE_BYTES: &str = "rerun_size_bytes"; | ||
|
||
pub fn layer_names_inner_field() -> FieldRef { | ||
Arc::new(Field::new(Self::LAYER_NAMES, DataType::Utf8, false)) | ||
} | ||
|
||
pub fn storage_urls_inner_field() -> FieldRef { | ||
Arc::new(Field::new(Self::STORAGE_URLS, DataType::Utf8, false)) | ||
} | ||
|
||
// NOTE: changing this method is a breaking change for implementation (aka it at least breaks | ||
// tests in `dataplatform`) | ||
pub fn fields() -> Vec<Field> { | ||
vec![ | ||
Field::new(Self::PARTITION_ID, DataType::Utf8, false), | ||
Field::new( | ||
Self::LAYER_NAMES, | ||
DataType::List(Self::layer_names_inner_field()), | ||
false, | ||
), | ||
Field::new( | ||
Self::STORAGE_URLS, | ||
DataType::List(Self::storage_urls_inner_field()), | ||
false, | ||
), | ||
Field::new( | ||
Self::LAST_UPDATED_AT, | ||
DataType::Timestamp(TimeUnit::Nanosecond, None), | ||
false, | ||
), | ||
Field::new(Self::NUM_CHUNKS, DataType::UInt64, false), | ||
Field::new(Self::SIZE_BYTES, DataType::UInt64, false), | ||
] | ||
} | ||
|
||
pub fn schema() -> Schema { | ||
Schema::new(Self::fields()) | ||
} | ||
|
||
/// Helper to simplify instantiation of the dataframe in [`Self::data`]. | ||
pub fn create_dataframe( | ||
partition_ids: Vec<String>, | ||
layer_names: Vec<Vec<String>>, | ||
storage_urls: Vec<Vec<String>>, | ||
last_updated_at: Vec<i64>, | ||
num_chunks: Vec<u64>, | ||
size_bytes: Vec<u64>, | ||
) -> arrow::error::Result<RecordBatch> { | ||
let row_count = partition_ids.len(); | ||
let schema = Arc::new(Self::schema()); | ||
|
||
let mut layer_names_builder = | ||
ListBuilder::new(StringBuilder::new()).with_field(Self::layer_names_inner_field()); | ||
|
||
for mut inner_vec in layer_names { | ||
for layer_name in inner_vec.drain(..) { | ||
layer_names_builder.values().append_value(layer_name) | ||
} | ||
layer_names_builder.append(true); | ||
} | ||
|
||
let mut urls_builder = | ||
ListBuilder::new(StringBuilder::new()).with_field(Self::storage_urls_inner_field()); | ||
|
||
for mut inner_vec in storage_urls { | ||
for layer_name in inner_vec.drain(..) { | ||
urls_builder.values().append_value(layer_name) | ||
} | ||
urls_builder.append(true); | ||
} | ||
|
||
let columns: Vec<ArrayRef> = vec![ | ||
Arc::new(StringArray::from(partition_ids)), | ||
Arc::new(layer_names_builder.finish()), | ||
Arc::new(urls_builder.finish()), | ||
Arc::new(TimestampNanosecondArray::from(last_updated_at)), | ||
Arc::new(UInt64Array::from(num_chunks)), | ||
Arc::new(UInt64Array::from(size_bytes)), | ||
]; | ||
|
||
RecordBatch::try_new_with_options( | ||
schema, | ||
columns, | ||
&RecordBatchOptions::default().with_row_count(Some(row_count)), | ||
) | ||
} | ||
|
||
pub fn data(&self) -> Result<&DataframePart, TypeConversionError> { | ||
Ok(self | ||
.data | ||
.as_ref() | ||
.ok_or_else(|| missing_field!(Self, "data"))?) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the secondary point of this PR, aka trying to compensate for the lack of dataframe-level spec in *.proto
and harden all implementations against drifts and mismatch. This works only if all codebases use this as ground truth (which this pair of PR attempts to do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good 👍
Mostly had things to say regarding naming conventions... do with that what you will 🤷
crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
Outdated
Show resolved
Hide resolved
crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
Outdated
Show resolved
Hide resolved
// --- ScanDatasetManifestResponse -- | ||
|
||
impl ScanDatasetManifestResponse { | ||
pub const LAYER_NAME: &str = "rerun_layer_name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, I've been trying (and still failing) to make sure that all field constants are named const FIELD_XXX
and that they are named exactly the same as the string they represent so they A) can be lexically grepped for and B) can have their constant name guessed just by looking at any dataframe in the wild. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna take the middle route here. I like the FIELD_
prefix, but I'll drop the RERUN_
part. It's noisy, and not having it seems not having still allows for the stated goals of this naming scheme.
Co-authored-by: Clement Rey <[email protected]>
Co-authored-by: Clement Rey <[email protected]>
# Conflicts: # crates/store/re_datafusion/src/dataframe_query_common.rs # crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs # crates/store/re_redap_client/src/connection_client.rs # crates/store/re_server/src/rerun_cloud.rs # rerun_py/src/catalog/dataset_entry.rs
Related
What
Introduces gRPC endpoints and associated SDK method to access the dataset manifest table, which contains a row per layer. Also, remove most layer-related columns from the partition table.
This PR also attempts to solidify the notion that
Scan{PartitionTable|DatasetManifest}Response
is the One True Source(tm) of information on the returned dataframe's schema.Future work:
TODO
ext
utilitiesDatasetManifest
(fromLayerTable
)