Skip to content

Commit f2f0918

Browse files
authored
Decouple legacy StoreHub client from new Rerun Cloud client (#11050)
* `re_grpc_client` is the client code for `re_grpc_server`, the legacy gRPC wrapper around the StoreHub. * `re_cloud_client` is the official gRPC client for the Rerun Cloud platform. The name will probably change later -- that doesn't really matter. What matters is that we decouple these two as they are completely unrelated.
1 parent 751bb0d commit f2f0918

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+445
-239
lines changed

ARCHITECTURE.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ Of course, this will only take us so far. In the future we plan on caching queri
9898
Here is an overview of the crates included in the project:
9999

100100
<picture>
101-
<img src="https://static.rerun.io/crates/9f89376256c984dd7999ec5a4e745889eb1a9c5a/full.png" alt="">
102-
<source media="(max-width: 480px)" srcset="https://static.rerun.io/crates/9f89376256c984dd7999ec5a4e745889eb1a9c5a/480w.png">
103-
<source media="(max-width: 768px)" srcset="https://static.rerun.io/crates/9f89376256c984dd7999ec5a4e745889eb1a9c5a/768w.png">
104-
<source media="(max-width: 1024px)" srcset="https://static.rerun.io/crates/9f89376256c984dd7999ec5a4e745889eb1a9c5a/1024w.png">
105-
<source media="(max-width: 1200px)" srcset="https://static.rerun.io/crates/9f89376256c984dd7999ec5a4e745889eb1a9c5a/1200w.png">
101+
<img src="https://static.rerun.io/crates/1147f3775a6432e22c5015276d0938e9411d54a3/full.png" alt="">
102+
<source media="(max-width: 480px)" srcset="https://static.rerun.io/crates/1147f3775a6432e22c5015276d0938e9411d54a3/480w.png">
103+
<source media="(max-width: 768px)" srcset="https://static.rerun.io/crates/1147f3775a6432e22c5015276d0938e9411d54a3/768w.png">
104+
<source media="(max-width: 1024px)" srcset="https://static.rerun.io/crates/1147f3775a6432e22c5015276d0938e9411d54a3/1024w.png">
105+
<source media="(max-width: 1200px)" srcset="https://static.rerun.io/crates/1147f3775a6432e22c5015276d0938e9411d54a3/1200w.png">
106106
</picture>
107107

108108
<!-- !!! IMPORTANT!!!
@@ -196,10 +196,11 @@ Update instructions:
196196

197197
| Crate | Description |
198198
|----------------------|-------------------------------------------------------------------|
199+
| re_redap_client | Official client for the Rerun Data Protocol |
199200
| re_data_loader | Handles loading of Rerun data from file using data loader plugins |
200201
| re_data_source | Handles loading of Rerun data from different sources |
201-
| re_grpc_client | Communicate with the Rerun Data Platform over gRPC |
202-
| re_grpc_server | Host an in-memory Storage Node |
202+
| re_grpc_client | Client for the legacy StoreHub API |
203+
| re_grpc_server | Server for the legacy StoreHub API |
203204
| re_web_viewer_server | Serves the Rerun web viewer (Wasm and HTML) over HTTP |
204205

205206
### Build support

Cargo.lock

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7515,6 +7515,7 @@ dependencies = [
75157515
"re_log",
75167516
"re_log_encoding",
75177517
"re_log_types",
7518+
"re_redap_client",
75187519
"re_smart_channel",
75197520
"re_tracing",
75207521
"re_uri",
@@ -7633,10 +7634,10 @@ dependencies = [
76337634
"itertools 0.14.0",
76347635
"log",
76357636
"re_dataframe",
7636-
"re_grpc_client",
76377637
"re_log_encoding",
76387638
"re_log_types",
76397639
"re_protos",
7640+
"re_redap_client",
76407641
"re_sorbet",
76417642
"re_tuid",
76427643
"re_uri",
@@ -7749,9 +7750,9 @@ dependencies = [
77497750
"re_chunk_store",
77507751
"re_data_source",
77517752
"re_entity_db",
7752-
"re_grpc_client",
77537753
"re_log",
77547754
"re_log_types",
7755+
"re_redap_client",
77557756
"re_renderer",
77567757
"re_smart_channel",
77577758
"re_tracing",
@@ -7771,33 +7772,22 @@ dependencies = [
77717772
name = "re_grpc_client"
77727773
version = "0.25.0-alpha.1+dev"
77737774
dependencies = [
7774-
"arrow",
77757775
"async-stream",
77767776
"crossbeam",
7777-
"itertools 0.14.0",
7778-
"jiff",
7779-
"re_arrow_util",
7780-
"re_auth",
77817777
"re_chunk",
7782-
"re_error",
77837778
"re_log",
77847779
"re_log_encoding",
77857780
"re_log_types",
7786-
"re_perf_telemetry",
77877781
"re_protos",
77887782
"re_smart_channel",
77897783
"re_sorbet",
77907784
"re_tracing",
77917785
"re_uri",
7792-
"serde",
77937786
"thiserror 1.0.69",
77947787
"tokio",
77957788
"tokio-stream",
77967789
"tonic",
77977790
"tonic-web-wasm-client",
7798-
"tower",
7799-
"tracing",
7800-
"url",
78017791
"wasm-bindgen-futures",
78027792
"web-time",
78037793
]
@@ -7849,8 +7839,8 @@ version = "0.25.0-alpha.1+dev"
78497839
dependencies = [
78507840
"egui",
78517841
"egui_kittest",
7852-
"re_grpc_client",
78537842
"re_protos",
7843+
"re_redap_client",
78547844
"re_sdk",
78557845
"re_server",
78567846
"re_uri",
@@ -8146,11 +8136,11 @@ dependencies = [
81468136
"re_component_ui",
81478137
"re_dataframe_ui",
81488138
"re_datafusion",
8149-
"re_grpc_client",
81508139
"re_log",
81518140
"re_log_encoding",
81528141
"re_log_types",
81538142
"re_protos",
8143+
"re_redap_client",
81548144
"re_sorbet",
81558145
"re_ui",
81568146
"re_uri",
@@ -8160,6 +8150,36 @@ dependencies = [
81608150
"url",
81618151
]
81628152

8153+
[[package]]
8154+
name = "re_redap_client"
8155+
version = "0.25.0-alpha.1+dev"
8156+
dependencies = [
8157+
"arrow",
8158+
"itertools 0.14.0",
8159+
"jiff",
8160+
"re_arrow_util",
8161+
"re_auth",
8162+
"re_chunk",
8163+
"re_error",
8164+
"re_log",
8165+
"re_log_encoding",
8166+
"re_log_types",
8167+
"re_perf_telemetry",
8168+
"re_protos",
8169+
"re_smart_channel",
8170+
"re_uri",
8171+
"serde",
8172+
"thiserror 1.0.69",
8173+
"tokio",
8174+
"tokio-stream",
8175+
"tonic",
8176+
"tonic-web-wasm-client",
8177+
"tower",
8178+
"tracing",
8179+
"url",
8180+
"wasm-bindgen-futures",
8181+
]
8182+
81638183
[[package]]
81648184
name = "re_renderer"
81658185
version = "0.25.0-alpha.1+dev"
@@ -8404,10 +8424,10 @@ dependencies = [
84048424
"re_chunk_store",
84058425
"re_entity_db",
84068426
"re_global_context",
8407-
"re_grpc_client",
84088427
"re_log",
84098428
"re_log_encoding",
84108429
"re_log_types",
8430+
"re_redap_client",
84118431
"re_renderer",
84128432
"re_tracing",
84138433
"re_types",
@@ -8976,7 +8996,6 @@ dependencies = [
89768996
"re_entity_db",
89778997
"re_error",
89788998
"re_format",
8979-
"re_grpc_client",
89808999
"re_log",
89819000
"re_log_encoding",
89829001
"re_log_types",
@@ -8985,6 +9004,7 @@ dependencies = [
89859004
"re_query",
89869005
"re_recording_panel",
89879006
"re_redap_browser",
9007+
"re_redap_client",
89889008
"re_renderer",
89899009
"re_selection_panel",
89909010
"re_smart_channel",
@@ -9360,7 +9380,6 @@ dependencies = [
93609380
"re_format",
93619381
"re_format_arrow",
93629382
"re_global_context",
9363-
"re_grpc_client",
93649383
"re_grpc_server",
93659384
"re_log",
93669385
"re_log_encoding",
@@ -9369,6 +9388,7 @@ dependencies = [
93699388
"re_memory",
93709389
"re_perf_telemetry",
93719390
"re_protos",
9391+
"re_redap_client",
93729392
"re_sdk",
93739393
"re_server",
93749394
"re_smart_channel",
@@ -9463,6 +9483,7 @@ dependencies = [
94639483
"re_memory",
94649484
"re_perf_telemetry",
94659485
"re_protos",
9486+
"re_redap_client",
94669487
"re_sdk",
94679488
"re_sorbet",
94689489
"re_tuid",

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ re_dataframe = { path = "crates/store/re_dataframe", version = "=0.25.0-alpha.1"
7171
re_datafusion = { path = "crates/store/re_datafusion", version = "=0.25.0-alpha.1", default-features = false }
7272
re_entity_db = { path = "crates/store/re_entity_db", version = "=0.25.0-alpha.1", default-features = false }
7373
re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.25.0-alpha.1", default-features = false }
74+
re_redap_client = { path = "crates/store/re_redap_client", version = "=0.25.0-alpha.1", default-features = false }
7475
re_grpc_client = { path = "crates/store/re_grpc_client", version = "=0.25.0-alpha.1", default-features = false }
7576
re_grpc_server = { path = "crates/store/re_grpc_server", version = "=0.25.0-alpha.1", default-features = false }
7677
re_protos = { path = "crates/store/re_protos", version = "=0.25.0-alpha.1", default-features = false }
@@ -452,7 +453,7 @@ debug = false
452453
"re_error".debug = true
453454
"re_format".debug = true
454455
"re_format_arrow".debug = true
455-
"re_grpc_client".debug = true
456+
"re_redap_client".debug = true
456457
"re_grpc_server".debug = true
457458
"re_int_histogram".debug = true
458459
"re_log".debug = true

crates/store/re_data_source/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ default = []
2525
[dependencies]
2626
re_data_loader.workspace = true
2727
re_error.workspace = true
28+
re_redap_client.workspace = true
2829
re_grpc_client.workspace = true
2930
re_log_encoding = { workspace = true, features = [
3031
"decoder",

crates/store/re_data_source/src/data_source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use re_grpc_client::ConnectionRegistryHandle;
21
use re_log_types::{LogMsg, RecordingId};
2+
use re_redap_client::ConnectionRegistryHandle;
33
use re_smart_channel::{Receiver, SmartChannelSource, SmartMessageSource};
44

55
use crate::FileContents;
@@ -146,7 +146,7 @@ impl LogDataSource {
146146
pub fn stream(
147147
self,
148148
connection_registry: &ConnectionRegistryHandle,
149-
on_ui_cmd: Option<Box<dyn Fn(re_grpc_client::UiCommand) + Send + Sync>>,
149+
on_ui_cmd: Option<Box<dyn Fn(re_redap_client::UiCommand) + Send + Sync>>,
150150
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
151151
) -> anyhow::Result<Receiver<LogMsg>> {
152152
re_tracing::profile_function!();
@@ -253,7 +253,7 @@ impl LogDataSource {
253253
let uri_clone = uri.clone();
254254
let stream_partition = async move {
255255
let client = connection_registry.client(uri_clone.origin.clone()).await?;
256-
re_grpc_client::stream_blueprint_and_partition_from_server(
256+
re_redap_client::stream_blueprint_and_partition_from_server(
257257
client, tx, uri_clone, on_ui_cmd, on_msg,
258258
)
259259
.await
@@ -270,7 +270,7 @@ impl LogDataSource {
270270
Ok(rx)
271271
}
272272

273-
Self::RedapProxy(uri) => Ok(re_grpc_client::message_proxy::stream(uri, on_msg)),
273+
Self::RedapProxy(uri) => Ok(re_grpc_client::stream(uri, on_msg)),
274274
}
275275
}
276276
}

crates/store/re_datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ default = []
3131
[dependencies]
3232
# Rerun dependencies:
3333
re_dataframe.workspace = true
34-
re_grpc_client.workspace = true
34+
re_redap_client.workspace = true
3535
re_log_encoding.workspace = true
3636
re_log_types.workspace = true
3737
re_protos.workspace = true

crates/store/re_datafusion/examples/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use re_tuid::Tuid;
1010
async fn main() -> anyhow::Result<()> {
1111
let local_addr = "rerun+http:://127.0.0.1:51234";
1212

13-
let connection_registry = re_grpc_client::ConnectionRegistry::new();
13+
let connection_registry = re_redap_client::ConnectionRegistry::new();
1414

1515
let client = connection_registry.client(local_addr.parse()?).await?;
1616
let mut df_connector = DataFusionConnector::new(client).await?;

crates/store/re_datafusion/src/dataframe_query_common.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
use std::any::Any;
2+
use std::cmp::Ordering;
3+
use std::collections::{BTreeMap, BTreeSet};
4+
use std::str::FromStr as _;
5+
use std::sync::Arc;
6+
17
use arrow::array::{
28
ArrayRef, DurationNanosecondArray, Int64Array, RecordBatch, StringArray,
39
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
@@ -12,25 +18,21 @@ use datafusion::datasource::TableType;
1218
use datafusion::logical_expr::{Expr, Operator, TableProviderFilterPushDown};
1319
use datafusion::physical_plan::ExecutionPlan;
1420
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
21+
1522
use re_dataframe::external::re_chunk::ChunkId;
1623
use re_dataframe::external::re_chunk_store::ChunkStore;
1724
use re_dataframe::{Index, QueryExpression};
18-
use re_grpc_client::{ConnectionClient, ConnectionRegistryHandle};
1925
use re_log_encoding::codec::wire::decoder::Decode as _;
2026
use re_log_types::EntryId;
2127
use re_log_types::external::re_types_core::Loggable as _;
2228
use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
2329
use re_protos::cloud::v1alpha1::ext::{Query, QueryLatestAt, QueryRange};
2430
use re_protos::cloud::v1alpha1::{GetChunksRequest, GetDatasetSchemaRequest, QueryDatasetRequest};
2531
use re_protos::common::v1alpha1::ext::ScanParameters;
32+
use re_redap_client::{ConnectionClient, ConnectionRegistryHandle};
2633
use re_sorbet::{BatchType, ChunkColumnDescriptors, ColumnKind, ComponentColumnSelector};
2734
use re_tuid::Tuid;
2835
use re_uri::Origin;
29-
use std::any::Any;
30-
use std::cmp::Ordering;
31-
use std::collections::{BTreeMap, BTreeSet};
32-
use std::str::FromStr as _;
33-
use std::sync::Arc;
3436

3537
/// Sets the size for output record batches in rows. The last batch will likely be smaller.
3638
/// The default for Data Fusion is 8192, which leads to a 256Kb record batch on average for

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
use crate::dataframe_query_common::{
2-
ChunkInfo, align_record_batch_to_schema, compute_partition_stream_chunk_info,
3-
prepend_string_column_schema,
4-
};
1+
use std::any::Any;
2+
use std::collections::BTreeMap;
3+
use std::fmt::Debug;
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::task::{Context, Poll};
7+
58
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringArray};
69
use arrow::compute::SortOptions;
710
use arrow::datatypes::{Schema, SchemaRef};
@@ -17,28 +20,28 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1720
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1821
use datafusion::{error::DataFusionError, execution::SendableRecordBatchStream};
1922
use futures_util::{Stream, StreamExt as _};
23+
use tokio::runtime::Handle;
24+
use tokio::sync::Notify;
25+
use tokio::sync::mpsc::{Receiver, Sender};
26+
use tokio::task::JoinHandle;
27+
use tracing::Instrument as _;
28+
2029
use re_dataframe::external::re_chunk::Chunk;
2130
use re_dataframe::external::re_chunk_store::ChunkStore;
2231
use re_dataframe::{
2332
ChunkStoreHandle, Index, QueryCache, QueryEngine, QueryExpression, QueryHandle, StorageEngine,
2433
};
25-
use re_grpc_client::ConnectionClient;
2634
use re_log_types::{ApplicationId, StoreId, StoreInfo, StoreKind, StoreSource};
2735
use re_protos::cloud::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
2836
use re_protos::cloud::v1alpha1::GetChunksRequest;
2937
use re_protos::common::v1alpha1::PartitionId;
38+
use re_redap_client::ConnectionClient;
3039
use re_sorbet::{ColumnDescriptor, ColumnSelector};
31-
use std::any::Any;
32-
use std::collections::BTreeMap;
33-
use std::fmt::Debug;
34-
use std::pin::Pin;
35-
use std::sync::Arc;
36-
use std::task::{Context, Poll};
37-
use tokio::runtime::Handle;
38-
use tokio::sync::Notify;
39-
use tokio::sync::mpsc::{Receiver, Sender};
40-
use tokio::task::JoinHandle;
41-
use tracing::Instrument as _;
40+
41+
use crate::dataframe_query_common::{
42+
ChunkInfo, align_record_batch_to_schema, compute_partition_stream_chunk_info,
43+
prepend_string_column_schema,
44+
};
4245

4346
/// This parameter sets the back pressure that either the streaming provider
4447
/// can place on the CPU worker thread or the CPU worker thread can place on
@@ -455,7 +458,7 @@ async fn chunk_stream_io_loop(
455458

456459
// Then we need to fully decode these chunks, i.e. both the transport layer (Protobuf)
457460
// and the app layer (Arrow).
458-
let mut chunk_stream = re_grpc_client::get_chunks_response_to_chunk_and_partition_id(
461+
let mut chunk_stream = re_redap_client::get_chunks_response_to_chunk_and_partition_id(
459462
get_chunks_response_stream,
460463
);
461464

0 commit comments

Comments
 (0)