Skip to content

Commit 39522a6

Browse files
authored
Partition-to-segment rename (wave 1): redap layer (#12017)
1 parent 69483dc commit 39522a6

File tree

105 files changed

+1155
-1076
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+1155
-1076
lines changed

crates/store/re_data_source/src/data_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl LogDataSource {
255255
.client(uri_clone.origin.clone())
256256
.await
257257
.map_err(|err| ApiError::connection(err, "failed to connect to server"))?;
258-
re_redap_client::stream_blueprint_and_partition_from_server(
258+
re_redap_client::stream_blueprint_and_segment_from_server(
259259
client, tx, uri_clone, on_msg,
260260
)
261261
.await

crates/store/re_datafusion/src/dataframe_query_common.rs

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
2222
use re_dataframe::external::re_chunk_store::ChunkStore;
2323
use re_dataframe::{Index, QueryExpression};
2424
use re_log_types::EntryId;
25-
use re_protos::cloud::v1alpha1::FetchChunksRequest;
2625
use re_protos::{
2726
cloud::v1alpha1::{
28-
GetDatasetSchemaRequest, QueryDatasetRequest, ScanPartitionTableResponse,
29-
ext::{Query, QueryLatestAt, QueryRange},
27+
FetchChunksRequest, GetDatasetSchemaRequest, QueryDatasetResponse,
28+
ScanSegmentTableResponse,
29+
ext::{Query, QueryDatasetRequest, QueryLatestAt, QueryRange},
3030
},
3131
common::v1alpha1::ext::ScanParameters,
3232
headers::RerunHeadersInjectorExt as _,
@@ -109,33 +109,27 @@ impl DataframeQueryTableProvider {
109109
let query = query_from_query_expression(query_expression);
110110

111111
let dataset_query = QueryDatasetRequest {
112-
partition_ids: partition_ids
112+
segment_ids: partition_ids
113113
.iter()
114114
.map(|id| id.as_ref().to_owned().into())
115115
.collect(),
116116
chunk_ids: vec![],
117-
entity_paths: entity_paths
118-
.into_iter()
119-
.map(|p| (*p).clone().into())
120-
.collect(),
117+
entity_paths: entity_paths.into_iter().map(|p| (*p).clone()).collect(),
121118
select_all_entity_paths,
122119
fuzzy_descriptors,
123120
exclude_static_data: false,
124121
exclude_temporal_data: false,
125-
query: Some(query.into()),
126-
scan_parameters: Some(
127-
ScanParameters {
128-
columns: FetchChunksRequest::required_column_names(),
129-
..Default::default()
130-
}
131-
.into(),
132-
),
122+
query: Some(query),
123+
scan_parameters: Some(ScanParameters {
124+
columns: FetchChunksRequest::required_column_names(),
125+
..Default::default()
126+
}),
133127
};
134128

135129
let response_stream = client
136130
.inner()
137131
.query_dataset(
138-
tonic::Request::new(dataset_query)
132+
tonic::Request::new(dataset_query.into())
139133
.with_entry_id(dataset_id)
140134
.map_err(|err| exec_datafusion_err!("{err}"))?,
141135
)
@@ -161,7 +155,7 @@ impl DataframeQueryTableProvider {
161155

162156
let schema = Arc::new(prepend_string_column_schema(
163157
&schema,
164-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
158+
ScanSegmentTableResponse::FIELD_SEGMENT_ID,
165159
));
166160

167161
Ok(Self {
@@ -415,21 +409,24 @@ pub(crate) fn group_chunk_infos_by_partition_id(
415409

416410
for batch in chunk_info_batches.as_ref() {
417411
let partition_ids = batch
418-
.column_by_name("chunk_partition_id")
412+
.column_by_name(QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID)
419413
.ok_or(exec_datafusion_err!(
420-
"Unable to find chunk_partition_id column"
414+
"Unable to find {} column",
415+
QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID
421416
))?
422417
.as_any()
423418
.downcast_ref::<StringArray>()
424419
.ok_or(exec_datafusion_err!(
425-
"chunk_partition_id must be string type"
420+
"{} must be string type",
421+
QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID
426422
))?;
427423

428424
// group rows by partition ID
429425
let mut partition_rows: BTreeMap<String, Vec<usize>> = BTreeMap::new();
430426
for (row_idx, partition_id) in partition_ids.iter().enumerate() {
431427
let pid = partition_id.ok_or(exec_datafusion_err!(
432-
"Found null partition_id in chunk_partition_id column at row {row_idx}"
428+
"Found null segment id in {} column at row {row_idx}",
429+
QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID
433430
))?;
434431
partition_rows
435432
.entry(pid.to_owned())
@@ -532,19 +529,19 @@ mod tests {
532529
fn test_batches_grouping() {
533530
let schema = Arc::new(Schema::new_with_metadata(
534531
vec![
535-
Field::new("chunk_partition_id", DataType::Utf8, false),
536-
Field::new("chunk_id", DataType::FixedSizeBinary(32), false),
532+
QueryDatasetResponse::field_chunk_segment_id(),
533+
QueryDatasetResponse::field_chunk_id(),
537534
],
538535
HashMap::default(),
539536
));
540537

541538
let capacity = 4;
542-
let byte_width = 32;
539+
let byte_width = 16;
543540
let mut chunk_id_builder = FixedSizeBinaryBuilder::with_capacity(capacity, byte_width);
544-
chunk_id_builder.append_value([0u8; 32]).unwrap();
545-
chunk_id_builder.append_value([1u8; 32]).unwrap();
546-
chunk_id_builder.append_value([2u8; 32]).unwrap();
547-
chunk_id_builder.append_value([3u8; 32]).unwrap();
541+
chunk_id_builder.append_value([0u8; 16]).unwrap();
542+
chunk_id_builder.append_value([1u8; 16]).unwrap();
543+
chunk_id_builder.append_value([2u8; 16]).unwrap();
544+
chunk_id_builder.append_value([3u8; 16]).unwrap();
548545
let chunk_id_array = Arc::new(chunk_id_builder.finish());
549546

550547
let batch1 = RecordBatch::try_new_with_options(
@@ -563,9 +560,9 @@ mod tests {
563560
.unwrap();
564561

565562
let mut chunk_id_builder = FixedSizeBinaryBuilder::with_capacity(capacity, byte_width);
566-
chunk_id_builder.append_value([4u8; 32]).unwrap();
567-
chunk_id_builder.append_value([5u8; 32]).unwrap();
568-
chunk_id_builder.append_value([6u8; 32]).unwrap();
563+
chunk_id_builder.append_value([4u8; 16]).unwrap();
564+
chunk_id_builder.append_value([5u8; 16]).unwrap();
565+
chunk_id_builder.append_value([6u8; 16]).unwrap();
569566
let chunk_id_array = Arc::new(chunk_id_builder.finish());
570567

571568
let batch2 = RecordBatch::try_new_with_options(
@@ -593,8 +590,8 @@ mod tests {
593590
.downcast_ref::<FixedSizeBinaryArray>()
594591
.unwrap();
595592
assert_eq!(chunk_ids_a.len(), 2);
596-
assert_eq!(chunk_ids_a.value(0), [0u8; 32]);
597-
assert_eq!(chunk_ids_a.value(1), [2u8; 32]);
593+
assert_eq!(chunk_ids_a.value(0), [0u8; 16]);
594+
assert_eq!(chunk_ids_a.value(1), [2u8; 16]);
598595

599596
let group_b = grouped.get("B").unwrap();
600597
assert_eq!(group_b.len(), 2);
@@ -605,15 +602,15 @@ mod tests {
605602
.downcast_ref::<FixedSizeBinaryArray>()
606603
.unwrap();
607604
assert_eq!(chunk_ids_b1.len(), 1);
608-
assert_eq!(chunk_ids_b1.value(0), [1u8; 32]);
605+
assert_eq!(chunk_ids_b1.value(0), [1u8; 16]);
609606
let chunk_ids_b2 = group_b[1]
610607
.column_by_name("chunk_id")
611608
.unwrap()
612609
.as_any()
613610
.downcast_ref::<FixedSizeBinaryArray>()
614611
.unwrap();
615612
assert_eq!(chunk_ids_b2.len(), 1);
616-
assert_eq!(chunk_ids_b2.value(0), [4u8; 32]);
613+
assert_eq!(chunk_ids_b2.value(0), [4u8; 16]);
617614

618615
let group_c = grouped.get("C").unwrap();
619616
assert_eq!(group_c.len(), 2);
@@ -624,15 +621,15 @@ mod tests {
624621
.downcast_ref::<FixedSizeBinaryArray>()
625622
.unwrap();
626623
assert_eq!(chunk_ids_c1.len(), 1);
627-
assert_eq!(chunk_ids_c1.value(0), [3u8; 32]);
624+
assert_eq!(chunk_ids_c1.value(0), [3u8; 16]);
628625
let chunk_ids_c2 = group_c[1]
629626
.column_by_name("chunk_id")
630627
.unwrap()
631628
.as_any()
632629
.downcast_ref::<FixedSizeBinaryArray>()
633630
.unwrap();
634631
assert_eq!(chunk_ids_c2.len(), 1);
635-
assert_eq!(chunk_ids_c2.value(0), [5u8; 32]);
632+
assert_eq!(chunk_ids_c2.value(0), [5u8; 16]);
636633

637634
let group_d = grouped.get("D").unwrap();
638635
assert_eq!(group_d.len(), 1);
@@ -643,6 +640,6 @@ mod tests {
643640
.downcast_ref::<FixedSizeBinaryArray>()
644641
.unwrap();
645642
assert_eq!(chunk_ids_d.len(), 1);
646-
assert_eq!(chunk_ids_d.value(0), [6u8; 32]);
643+
assert_eq!(chunk_ids_d.value(0), [6u8; 16]);
647644
}
648645
}

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use re_dataframe::{
3232
ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine,
3333
};
3434
use re_log_types::{ApplicationId, StoreId, StoreKind};
35-
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
35+
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanSegmentTableResponse};
3636
use re_redap_client::ConnectionClient;
3737
use re_sorbet::{ColumnDescriptor, ColumnSelector};
3838

@@ -199,12 +199,10 @@ impl PartitionStreamExec {
199199
let orderings = if projected_schema
200200
.fields()
201201
.iter()
202-
.any(|f| f.name().as_str() == ScanPartitionTableResponse::FIELD_PARTITION_ID)
202+
.any(|f| f.name().as_str() == ScanSegmentTableResponse::FIELD_SEGMENT_ID)
203203
{
204-
let partition_col = Arc::new(Column::new(
205-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
206-
0,
207-
)) as Arc<dyn PhysicalExpr>;
204+
let partition_col = Arc::new(Column::new(ScanSegmentTableResponse::FIELD_SEGMENT_ID, 0))
205+
as Arc<dyn PhysicalExpr>;
208206
let order_col = sort_index
209207
.and_then(|index| {
210208
let index_name = index.as_str();
@@ -243,7 +241,7 @@ impl PartitionStreamExec {
243241
let output_partitioning = if partition_in_output_schema {
244242
Partitioning::Hash(
245243
vec![Arc::new(Column::new(
246-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
244+
ScanSegmentTableResponse::FIELD_SEGMENT_ID,
247245
0,
248246
))],
249247
num_partitions,
@@ -306,7 +304,7 @@ async fn send_next_row(
306304

307305
let batch_schema = Arc::new(prepend_string_column_schema(
308306
&query_schema,
309-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
307+
ScanSegmentTableResponse::FIELD_SEGMENT_ID,
310308
));
311309

312310
let batch = RecordBatch::try_new_with_options(
@@ -436,7 +434,7 @@ async fn chunk_stream_io_loop(
436434

437435
// Then we need to fully decode these chunks, i.e. both the transport layer (Protobuf)
438436
// and the app layer (Arrow).
439-
let mut chunk_stream = re_redap_client::fetch_chunks_response_to_chunk_and_partition_id(
437+
let mut chunk_stream = re_redap_client::fetch_chunks_response_to_chunk_and_segment_id(
440438
fetch_chunks_response_stream,
441439
);
442440

crates/store/re_datafusion/src/dataframe_query_provider_wasm.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use re_dataframe::{
2727
ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine,
2828
};
2929
use re_log_types::{StoreId, StoreKind};
30-
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanPartitionTableResponse};
30+
use re_protos::cloud::v1alpha1::{FetchChunksRequest, ScanSegmentTableResponse};
3131
use re_redap_client::ConnectionClient;
3232

3333
use crate::dataframe_query_common::{
@@ -78,7 +78,7 @@ impl DataframePartitionStream {
7878

7979
// Then we need to fully decode these chunks, i.e. both the transport layer (Protobuf)
8080
// and the app layer (Arrow).
81-
let mut chunk_stream = re_redap_client::fetch_chunks_response_to_chunk_and_partition_id(
81+
let mut chunk_stream = re_redap_client::fetch_chunks_response_to_chunk_and_segment_id(
8282
fetch_chunks_response_stream,
8383
);
8484

@@ -191,10 +191,8 @@ impl PartitionStreamExec {
191191
None => Arc::clone(table_schema),
192192
};
193193

194-
let partition_col = Arc::new(Column::new(
195-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
196-
0,
197-
)) as Arc<dyn PhysicalExpr>;
194+
let partition_col = Arc::new(Column::new(ScanSegmentTableResponse::FIELD_SEGMENT_ID, 0))
195+
as Arc<dyn PhysicalExpr>;
198196
let order_col = sort_index
199197
.and_then(|index| {
200198
let index_name = index.as_str();
@@ -231,7 +229,7 @@ impl PartitionStreamExec {
231229
let output_partitioning = if partition_in_output_schema {
232230
Partitioning::Hash(
233231
vec![Arc::new(Column::new(
234-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
232+
ScanSegmentTableResponse::FIELD_SEGMENT_ID,
235233
0,
236234
))],
237235
num_partitions,
@@ -292,7 +290,7 @@ fn create_next_row(
292290

293291
let batch_schema = Arc::new(prepend_string_column_schema(
294292
&query_schema,
295-
ScanPartitionTableResponse::FIELD_PARTITION_ID,
293+
ScanSegmentTableResponse::FIELD_SEGMENT_ID,
296294
));
297295

298296
let batch = RecordBatch::try_new_with_options(

crates/store/re_datafusion/src/dataset_manifest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use re_redap_client::ConnectionClient;
1818
use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
1919
use crate::wasm_compat::make_future_send;
2020

21-
//TODO(ab): deduplicate from PartitionTableProvider
21+
//TODO(ab): deduplicate from SegmentTableProvider
2222
#[derive(Clone)]
2323
pub struct DatasetManifestProvider {
2424
client: ConnectionClient,

crates/store/re_datafusion/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ pub(crate) use dataframe_query_provider::PartitionStreamExec;
2020
#[cfg(target_arch = "wasm32")]
2121
pub(crate) use dataframe_query_provider_wasm::PartitionStreamExec;
2222
pub use dataset_manifest::DatasetManifestProvider;
23-
pub use partition_table::PartitionTableProvider;
23+
pub use partition_table::SegmentTableProvider;
2424
pub use search_provider::SearchResultsTableProvider;
2525
pub use table_entry_provider::TableEntryTableProvider;

0 commit comments

Comments
 (0)