Skip to content

Commit db8d96a

Browse files
authored
End-to-end out-of-band entry headers: datasets (part 2) (#11411)
With this second part, everything related to the manifest registry is now fully migrated. As usual, the interesting stuff happens in the sibling. * Sibling: rerun-io/dataplatform#1796
1 parent c007fab commit db8d96a

File tree

8 files changed

+179
-119
lines changed

8 files changed

+179
-119
lines changed

crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ service RerunCloudService {
4646

4747
/* Write data */
4848

49-
// Register new partitions with the Dataset
49+
// Register new partitions with the Dataset.
50+
//
51+
// This endpoint requires the standard dataset headers.
5052
rpc RegisterWithDataset(RegisterWithDatasetRequest) returns (RegisterWithDatasetResponse) {}
5153

5254
// Write chunks to one or more partitions.
@@ -71,22 +73,30 @@ service RerunCloudService {
7173
// Inspect the contents of the partition table.
7274
//
7375
// The data will follow the schema returned by `GetPartitionTableSchema`.
76+
//
77+
// This endpoint requires the standard dataset headers.
7478
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}
7579

7680
// Returns the schema of the dataset.
7781
//
7882
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
7983
// entities and components present in the dataset.
84+
//
85+
// This endpoint requires the standard dataset headers.
8086
rpc GetDatasetSchema(GetDatasetSchemaRequest) returns (GetDatasetSchemaResponse) {}
8187

8288
/* Indexing */
8389

8490
// Creates a custom index for a specific column (vector search, full-text search, etc).
91+
//
92+
// This endpoint requires the standard dataset headers.
8593
rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse) {}
8694

8795
/* Queries */
8896

8997
// Search a previously created index.
98+
//
99+
// This endpoint requires the standard dataset headers.
90100
rpc SearchDataset(SearchDatasetRequest) returns (stream SearchDatasetResponse) {}
91101

92102
// Perform Rerun-native queries on a dataset, returning the matching chunk IDs, as well
@@ -103,6 +113,8 @@ service RerunCloudService {
103113
// To fetch the actual chunks themselves, see `GetChunks`.
104114
//
105115
// Passing chunk IDs to this method effectively acts as a IF_EXIST filter.
116+
//
117+
// This endpoint requires the standard dataset headers.
106118
rpc QueryDataset(QueryDatasetRequest) returns (stream QueryDatasetResponse) {}
107119

108120
// Perform Rerun-native queries on a dataset, returning the underlying chunks.
@@ -113,6 +125,8 @@ service RerunCloudService {
113125
// * Arbitrary Lance filters.
114126
//
115127
// To fetch only the actual chunk IDs rather than the chunks themselves, see `QueryDataset`.
128+
//
129+
// This endpoint requires the standard dataset headers.
116130
rpc GetChunks(GetChunksRequest) returns (stream GetChunksResponse) {}
117131

118132
// Fetch specific chunks from Rerun Cloud. In a 2-step query process, result of 1st phase,
@@ -146,6 +160,8 @@ service RerunCloudService {
146160
// --- Utilities ---
147161

148162
// Rerun Manifests maintenance operations: scalar index creation, compaction, etc.
163+
//
164+
// This endpoint requires the standard dataset headers.
149165
rpc DoMaintenance(DoMaintenanceRequest) returns (DoMaintenanceResponse) {}
150166

151167
// Run global maintenance operations on the platform: this includes optimization
@@ -515,8 +531,6 @@ message QueryRange {
515531
}
516532

517533
message GetChunksRequest {
518-
rerun.common.v1alpha1.EntryId dataset_id = 1;
519-
520534
// Client can specify from which partitions to get chunks. If left unspecified (empty list),
521535
// data from all partition (that match other query parameters) will be included.
522536
repeated rerun.common.v1alpha1.PartitionId partition_ids = 2;
@@ -562,6 +576,9 @@ message GetChunksRequest {
562576

563577
// Query details
564578
Query query = 5;
579+
580+
reserved 1;
581+
reserved "dataset_id";
565582
}
566583

567584
message GetChunksResponse {
@@ -609,8 +626,6 @@ message ScanTableResponse {
609626
// --- Maintenance ---
610627

611628
message DoMaintenanceRequest {
612-
rerun.common.v1alpha1.EntryId dataset_id = 1;
613-
614629
// Optimize all builtin and user-defined indexes on this dataset.
615630
//
616631
// This merges all individual index deltas back in the main index, improving runtime performance
@@ -645,6 +660,9 @@ message DoMaintenanceRequest {
645660
//
646661
// ⚠️ Do not ever use this unless you know exactly what you're doing. Improper use will lead to data loss.
647662
bool unsafe_allow_recent_cleanup = 5;
663+
664+
reserved 1;
665+
reserved "dataset_id";
648666
}
649667

650668
message DoMaintenanceResponse {

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ impl From<RegisterWithDatasetRequest> for crate::cloud::v1alpha1::RegisterWithDa
6363

6464
#[derive(Debug, Clone)]
6565
pub struct GetChunksRequest {
66-
pub dataset_id: EntryId,
6766
pub partition_ids: Vec<crate::common::v1alpha1::ext::PartitionId>,
6867
pub chunk_ids: Vec<re_chunk::ChunkId>,
6968
pub entity_paths: Vec<EntityPath>,
@@ -75,11 +74,6 @@ impl TryFrom<crate::cloud::v1alpha1::GetChunksRequest> for GetChunksRequest {
7574

7675
fn try_from(value: crate::cloud::v1alpha1::GetChunksRequest) -> Result<Self, Self::Error> {
7776
Ok(Self {
78-
dataset_id: value
79-
.dataset_id
80-
.ok_or_else(|| tonic::Status::invalid_argument("dataset_id is required"))?
81-
.try_into()?,
82-
8377
partition_ids: value
8478
.partition_ids
8579
.into_iter()
@@ -112,7 +106,6 @@ impl TryFrom<crate::cloud::v1alpha1::GetChunksRequest> for GetChunksRequest {
112106

113107
#[derive(Debug, Clone)]
114108
pub struct DoMaintenanceRequest {
115-
pub dataset_id: Option<crate::common::v1alpha1::EntryId>,
116109
pub optimize_indexes: bool,
117110
pub retrain_indexes: bool,
118111
pub compact_fragments: bool,
@@ -123,7 +116,6 @@ pub struct DoMaintenanceRequest {
123116
impl From<DoMaintenanceRequest> for crate::cloud::v1alpha1::DoMaintenanceRequest {
124117
fn from(value: DoMaintenanceRequest) -> Self {
125118
Self {
126-
dataset_id: value.dataset_id,
127119
optimize_indexes: value.optimize_indexes,
128120
retrain_indexes: value.retrain_indexes,
129121
compact_fragments: value.compact_fragments,

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs

Lines changed: 34 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/store/re_redap_client/src/connection_client.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,23 @@ where
370370
Ok(response.table_entry)
371371
}
372372

373+
pub async fn get_chunks(
374+
&mut self,
375+
dataset_id: EntryId,
376+
req: re_protos::cloud::v1alpha1::GetChunksRequest,
377+
) -> Result<tonic::Streaming<re_protos::cloud::v1alpha1::GetChunksResponse>, StreamError> {
378+
Ok(self
379+
.inner()
380+
.get_chunks(
381+
tonic::Request::new(req)
382+
.with_entry_id(dataset_id)
383+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?,
384+
)
385+
.await
386+
.map_err(|err| crate::StreamPartitionError::StreamingChunks(err.into()))?
387+
.into_inner())
388+
}
389+
373390
#[allow(clippy::fn_params_excessive_bools)]
374391
pub async fn do_maintenance(
375392
&mut self,
@@ -381,17 +398,20 @@ where
381398
unsafe_allow_recent_cleanup: bool,
382399
) -> Result<(), StreamError> {
383400
self.inner()
384-
.do_maintenance(tonic::Request::new(
385-
re_protos::cloud::v1alpha1::ext::DoMaintenanceRequest {
386-
dataset_id: Some(dataset_id.into()),
387-
optimize_indexes,
388-
retrain_indexes,
389-
compact_fragments,
390-
cleanup_before,
391-
unsafe_allow_recent_cleanup,
392-
}
393-
.into(),
394-
))
401+
.do_maintenance(
402+
tonic::Request::new(
403+
re_protos::cloud::v1alpha1::ext::DoMaintenanceRequest {
404+
optimize_indexes,
405+
retrain_indexes,
406+
compact_fragments,
407+
cleanup_before,
408+
unsafe_allow_recent_cleanup,
409+
}
410+
.into(),
411+
)
412+
.with_entry_id(dataset_id)
413+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?,
414+
)
395415
.await
396416
.map_err(|err| StreamEntryError::Maintenance(err.into()))?;
397417

0 commit comments

Comments
 (0)