Skip to content

Commit 6153432

Browse files
authored
Fix race condition for ui adjustments while loading recordings from redap client (#11365)
1 parent 3a6ed38 commit 6153432

File tree

21 files changed

+400
-267
lines changed

21 files changed

+400
-267
lines changed

crates/store/re_data_loader/src/load_file.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::borrow::Cow;
22

33
use ahash::{HashMap, HashMapExt as _};
4-
use re_log_types::{FileSource, LogMsg};
4+
use re_log_types::{DataSourceMessage, FileSource, LogMsg};
55
use re_smart_channel::Sender;
66

77
use crate::{DataLoader as _, DataLoaderError, LoadedData, RrdLoader};
@@ -21,7 +21,7 @@ pub fn load_from_path(
2121
file_source: FileSource,
2222
path: &std::path::Path,
2323
// NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
24-
tx: &Sender<LogMsg>,
24+
tx: &Sender<DataSourceMessage>,
2525
) -> Result<(), DataLoaderError> {
2626
use re_log_types::ApplicationId;
2727

@@ -71,7 +71,7 @@ pub fn load_from_file_contents(
7171
filepath: &std::path::Path,
7272
contents: std::borrow::Cow<'_, [u8]>,
7373
// NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
74-
tx: &Sender<LogMsg>,
74+
tx: &Sender<DataSourceMessage>,
7575
) -> Result<(), DataLoaderError> {
7676
re_tracing::profile_function!(filepath.to_string_lossy());
7777

@@ -271,7 +271,7 @@ pub(crate) fn send(
271271
settings: crate::DataLoaderSettings,
272272
file_source: FileSource,
273273
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
274-
tx: &Sender<LogMsg>,
274+
tx: &Sender<DataSourceMessage>,
275275
) {
276276
spawn({
277277
re_tracing::profile_function!();
@@ -319,7 +319,7 @@ pub(crate) fn send(
319319
continue;
320320
}
321321
};
322-
tx.send(msg).ok();
322+
tx.send(msg.into()).ok();
323323
}
324324

325325
for (store_id, tracked) in store_info_tracker {
@@ -335,7 +335,7 @@ pub(crate) fn send(
335335

336336
if should_send_new_store_info {
337337
let store_info = prepare_store_info(&store_id, file_source.clone());
338-
tx.send(store_info).ok();
338+
tx.send(store_info.into()).ok();
339339
}
340340
}
341341

crates/store/re_data_source/src/data_source.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use re_log_types::{LogMsg, RecordingId};
1+
use re_log_types::{DataSourceMessage, RecordingId};
22
use re_redap_client::ConnectionRegistryHandle;
33
use re_smart_channel::{Receiver, SmartChannelSource, SmartMessageSource};
44

@@ -146,9 +146,8 @@ impl LogDataSource {
146146
pub fn stream(
147147
self,
148148
connection_registry: &ConnectionRegistryHandle,
149-
on_ui_cmd: Option<Box<dyn Fn(re_redap_client::UiCommand) + Send + Sync>>,
150149
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
151-
) -> anyhow::Result<Receiver<LogMsg>> {
150+
) -> anyhow::Result<Receiver<DataSourceMessage>> {
152151
re_tracing::profile_function!();
153152

154153
match self {
@@ -254,7 +253,7 @@ impl LogDataSource {
254253
let stream_partition = async move {
255254
let client = connection_registry.client(uri_clone.origin.clone()).await?;
256255
re_redap_client::stream_blueprint_and_partition_from_server(
257-
client, tx, uri_clone, on_ui_cmd, on_msg,
256+
client, tx, uri_clone, on_msg,
258257
)
259258
.await
260259
};

crates/store/re_data_source/src/load_stdin.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use re_log_types::LogMsg;
1+
use re_log_types::DataSourceMessage;
22
use re_smart_channel::Sender;
33

44
/// Asynchronously loads RRD data streaming in from standard input.
55
///
66
/// This fails synchronously iff the standard input stream could not be opened, otherwise errors
77
/// are handled asynchronously (as in: they're logged).
8-
pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
8+
pub fn load_stdin(tx: Sender<DataSourceMessage>) -> anyhow::Result<()> {
99
let stdin = std::io::BufReader::new(std::io::stdin());
1010
let decoder = re_log_encoding::decoder::Decoder::new_concatenated(stdin)?;
1111

@@ -20,7 +20,7 @@ pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
2020
continue;
2121
}
2222
};
23-
if tx.send(msg).is_err() {
23+
if tx.send(msg.into()).is_err() {
2424
break; // The other end has decided to hang up, not our problem.
2525
}
2626
}

crates/store/re_grpc_client/src/read.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use re_log_types::DataSourceMessage;
12
use tokio_stream::StreamExt as _;
23

34
use re_log_encoding::protobuf_conversions::log_msg_from_proto;
4-
use re_log_types::LogMsg;
55
use re_protos::sdk_comms::v1alpha1::ReadMessagesRequest;
66
use re_protos::sdk_comms::v1alpha1::ReadMessagesResponse;
77
use re_protos::sdk_comms::v1alpha1::message_proxy_service_client::MessageProxyServiceClient;
@@ -16,7 +16,7 @@ use crate::TonicStatusError;
1616
pub fn stream(
1717
uri: re_uri::ProxyUri,
1818
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
19-
) -> re_smart_channel::Receiver<LogMsg> {
19+
) -> re_smart_channel::Receiver<DataSourceMessage> {
2020
re_log::debug!("Loading {uri} via gRPC…");
2121

2222
let (tx, rx) = re_smart_channel::smart_channel(
@@ -35,7 +35,7 @@ pub fn stream(
3535

3636
async fn stream_async(
3737
uri: re_uri::ProxyUri,
38-
tx: &re_smart_channel::Sender<LogMsg>,
38+
tx: &re_smart_channel::Sender<DataSourceMessage>,
3939
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
4040
) -> Result<(), StreamError> {
4141
let mut client = {
@@ -78,7 +78,7 @@ async fn stream_async(
7878
re_sorbet::timestamp_metadata::now_timestamp(),
7979
);
8080

81-
if tx.send(log_msg).is_err() {
81+
if tx.send(log_msg.into()).is_err() {
8282
re_log::debug!("gRPC stream smart channel closed");
8383
break;
8484
}

crates/store/re_grpc_server/src/lib.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tower_http::cors::CorsLayer;
1414

1515
use re_byte_size::SizeBytes;
1616
use re_log_encoding::codec::wire::decoder::Decode as _;
17-
use re_log_types::TableMsg;
17+
use re_log_types::{DataSourceMessage, TableMsg};
1818
use re_protos::sdk_comms::v1alpha1::{
1919
ReadTablesRequest, ReadTablesResponse, WriteMessagesRequest, WriteTableRequest,
2020
WriteTableResponse,
@@ -274,7 +274,7 @@ pub fn spawn_from_rx_set(
274274
addr: SocketAddr,
275275
options: ServerOptions,
276276
shutdown: shutdown::Shutdown,
277-
rxs: re_smart_channel::ReceiveSet<re_log_types::LogMsg>,
277+
rxs: re_smart_channel::ReceiveSet<re_log_types::DataSourceMessage>,
278278
) {
279279
let message_proxy = MessageProxy::new(options);
280280
let event_tx = message_proxy.event_tx.clone();
@@ -317,6 +317,16 @@ pub fn spawn_from_rx_set(
317317
continue;
318318
};
319319

320+
let msg = match msg {
321+
DataSourceMessage::LogMsg(log_msg) => log_msg,
322+
DataSourceMessage::UiCommand(ui_command) => {
323+
re_log::warn!(
324+
"Received a UI command, grpc server can't forward these yet: {ui_command:?}"
325+
);
326+
continue;
327+
}
328+
};
329+
320330
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(
321331
msg,
322332
re_log_encoding::Compression::LZ4,
@@ -352,7 +362,7 @@ pub fn spawn_with_recv(
352362
options: ServerOptions,
353363
shutdown: shutdown::Shutdown,
354364
) -> (
355-
re_smart_channel::Receiver<re_log_types::LogMsg>,
365+
re_smart_channel::Receiver<re_log_types::DataSourceMessage>,
356366
crossbeam::channel::Receiver<re_log_types::TableMsg>,
357367
) {
358368
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
@@ -402,7 +412,7 @@ pub fn spawn_with_recv(
402412
re_sorbet::timestamp_metadata::now_timestamp(),
403413
);
404414

405-
if channel_log_tx.send(log_msg).is_err() {
415+
if channel_log_tx.send(log_msg.into()).is_err() {
406416
re_log::debug!(
407417
"message proxy smart channel receiver closed, closing sender"
408418
);

crates/store/re_log_encoding/src/stream_rrd_from_http.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::ops::ControlFlow;
33
use std::sync::Arc;
44

55
use re_log::ResultExt as _;
6-
use re_log_types::LogMsg;
6+
use re_log_types::{DataSourceMessage, LogMsg};
77

88
/// Stream an rrd file from a HTTP server.
99
///
@@ -15,7 +15,7 @@ pub fn stream_rrd_from_http_to_channel(
1515
url: String,
1616
follow: bool,
1717
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
18-
) -> re_smart_channel::Receiver<LogMsg> {
18+
) -> re_smart_channel::Receiver<DataSourceMessage> {
1919
let (tx, rx) = re_smart_channel::smart_channel(
2020
re_smart_channel::SmartMessageSource::RrdHttpStream { url: url.clone() },
2121
re_smart_channel::SmartChannelSource::RrdHttpStream {
@@ -31,7 +31,7 @@ pub fn stream_rrd_from_http_to_channel(
3131
}
3232
match msg {
3333
HttpMessage::LogMsg(msg) => {
34-
if tx.send(msg).is_ok() {
34+
if tx.send(msg.into()).is_ok() {
3535
ControlFlow::Continue(())
3636
} else {
3737
re_log::info_once!("Closing connection to {url}");
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// TODO(andreas): Conceptually these should go to `re_data_source`.
2+
// However, `re_data_source` depends on everything that _implements_ a datasource, therefore we would get a circular dependency!
3+
4+
use crate::{AbsoluteTimeRange, LogMsg, StoreId, TimelineName, impl_into_enum};
5+
6+
/// Message from a data source.
7+
///
8+
/// May contain limited UI commands for instrumenting the state of the receiving end.
9+
#[derive(Clone, Debug)]
10+
pub enum DataSourceMessage {
11+
/// See [`LogMsg`].
12+
LogMsg(LogMsg),
13+
14+
/// A UI command that has to be ordered relative to [`LogMsg`]s.
15+
///
16+
/// Non-ui receivers can safely ignore these.
17+
UiCommand(DataSourceUiCommand),
18+
}
19+
20+
impl_into_enum!(LogMsg, DataSourceMessage, LogMsg);
21+
impl_into_enum!(DataSourceUiCommand, DataSourceMessage, UiCommand);
22+
23+
/// UI commands issued when streaming in datasets.
24+
///
25+
/// If you're not in a ui context you can safely ignore these.
26+
#[derive(Clone, Debug)]
27+
pub enum DataSourceUiCommand {
28+
/// Mark a time range as valid.
29+
///
30+
/// Everything outside can still be navigated to, but will be considered potentially lacking some data and therefore "invalid".
31+
/// Visually, it is outside of the normal time range and shown greyed out.
32+
///
33+
/// If timeline is `None`, this signals that all timelines are considered to be valid entirely.
34+
AddValidTimeRange {
35+
store_id: StoreId,
36+
37+
/// If `None`, signals that all timelines are entirely valid.
38+
timeline: Option<TimelineName>,
39+
time_range: AbsoluteTimeRange,
40+
},
41+
42+
/// Navigate to time/entities/anchors/etc. that are set in a `re_uri::Fragment`.
43+
SetUrlFragment {
44+
store_id: StoreId,
45+
46+
/// Uri fragment, see `re_uri::Fragment` on how to parse it.
47+
// Not using `re_uri::Fragment` to avoid further dependency entanglement.
48+
fragment: String, //re_uri::Fragment,
49+
},
50+
}

crates/store/re_log_types/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! `foo.transform * foo/bar.transform * foo/bar/baz.transform`.
1919
2020
pub mod arrow_msg;
21+
mod data_source_message;
2122
mod entry_id;
2223
pub mod example_components;
2324
pub mod hash;
@@ -39,6 +40,7 @@ use re_byte_size::SizeBytes;
3940

4041
pub use self::{
4142
arrow_msg::{ArrowMsg, ArrowRecordBatchReleaseCallback},
43+
data_source_message::{DataSourceMessage, DataSourceUiCommand},
4244
entry_id::{EntryId, EntryIdOrName},
4345
index::{
4446
AbsoluteTimeRange, AbsoluteTimeRangeF, Duration, NonMinI64, TimeCell, TimeInt, TimePoint,

crates/store/re_redap_client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ all-features = true
3434
re_arrow_util.workspace = true
3535
re_auth.workspace = true
3636
re_chunk.workspace = true
37-
re_log.workspace = true
3837
re_log_encoding = { workspace = true, features = ["encoder", "decoder"] }
3938
re_log_types.workspace = true
39+
re_log.workspace = true
4040
re_protos.workspace = true
4141
re_smart_channel.workspace = true
4242
re_uri.workspace = true

0 commit comments

Comments
 (0)