Skip to content

Commit bb7d135

Browse files
authored
Better stream errors (#11335)
### Related Closes RR-2338 ### What Splits the `StreamError::Tonic` error into multiple different variants, making the errors more readable and useful. Also adds a details section to notification, which are automatically created if the notification contains a "<details>" part.
1 parent 5710c79 commit bb7d135

File tree

8 files changed

+152
-46
lines changed

8 files changed

+152
-46
lines changed

crates/store/re_data_source/src/data_source.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,7 @@ impl LogDataSource {
261261

262262
spawn_future(async move {
263263
if let Err(err) = stream_partition.await {
264-
re_log::warn!(
265-
"Error while streaming {uri}: {}",
266-
re_error::format_ref(&err)
267-
);
264+
re_log::warn!("Error while streaming: {}", re_error::format_ref(&err));
268265
}
269266
});
270267
Ok(rx)

crates/store/re_redap_client/src/connection_client.rs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use re_protos::{
3333
missing_field,
3434
};
3535

36-
use crate::StreamError;
36+
use crate::{StreamEntryError, StreamError};
3737

3838
/// Expose an ergonomic API over the gRPC redap client.
3939
///
@@ -81,7 +81,8 @@ where
8181
.find_entries(FindEntriesRequest {
8282
filter: Some(filter),
8383
})
84-
.await?
84+
.await
85+
.map_err(|err| StreamEntryError::Find(err.into()))?
8586
.into_inner()
8687
.entries;
8788

@@ -97,7 +98,8 @@ where
9798
.delete_entry(DeleteEntryRequest {
9899
id: Some(entry_id.into()),
99100
})
100-
.await?;
101+
.await
102+
.map_err(|err| StreamEntryError::Delete(err.into()))?;
101103

102104
Ok(())
103105
}
@@ -117,7 +119,8 @@ where
117119
}
118120
.into(),
119121
))
120-
.await?
122+
.await
123+
.map_err(|err| StreamEntryError::Update(err.into()))?
121124
.into_inner()
122125
.try_into()?;
123126

@@ -136,7 +139,8 @@ where
136139
name: Some(name),
137140
id: entry_id.map(Into::into),
138141
})
139-
.await?
142+
.await
143+
.map_err(|err| StreamEntryError::Create(err.into()))?
140144
.into_inner()
141145
.try_into()?;
142146

@@ -151,9 +155,12 @@ where
151155
let response: ReadDatasetEntryResponse = self
152156
.inner()
153157
.read_dataset_entry(
154-
tonic::Request::new(ReadDatasetEntryRequest {}).with_entry_id(entry_id)?,
158+
tonic::Request::new(ReadDatasetEntryRequest {})
159+
.with_entry_id(entry_id)
160+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?,
155161
)
156-
.await?
162+
.await
163+
.map_err(|err| StreamEntryError::Read(err.into()))?
157164
.into_inner()
158165
.try_into()?;
159166

@@ -175,7 +182,8 @@ where
175182
}
176183
.into(),
177184
))
178-
.await?
185+
.await
186+
.map_err(|err| StreamEntryError::Update(err.into()))?
179187
.into_inner()
180188
.try_into()?;
181189

@@ -189,7 +197,8 @@ where
189197
.read_table_entry(ReadTableEntryRequest {
190198
id: Some(entry_id.into()),
191199
})
192-
.await?
200+
.await
201+
.map_err(|err| StreamEntryError::Read(err.into()))?
193202
.into_inner()
194203
.try_into()?;
195204

@@ -210,15 +219,20 @@ where
210219
tonic::Request::new(ScanPartitionTableRequest {
211220
columns: vec![COLUMN_NAME.to_owned()],
212221
})
213-
.with_entry_id(entry_id)?,
222+
.with_entry_id(entry_id)
223+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?,
214224
)
215-
.await?
225+
.await
226+
.map_err(|err| StreamEntryError::ReadPartitions(err.into()))?
216227
.into_inner();
217228

218229
let mut partition_ids = Vec::new();
219230

220231
while let Some(resp) = stream.next().await {
221-
let record_batch = resp?.data()?.decode()?;
232+
let record_batch = resp
233+
.map_err(|err| StreamEntryError::ReadPartitions(err.into()))?
234+
.data()?
235+
.decode()?;
222236

223237
let partition_id_col = record_batch
224238
.column_by_name(COLUMN_NAME)
@@ -252,12 +266,14 @@ where
252266
data_sources,
253267
on_duplicate,
254268
})
255-
.with_entry_id(dataset_id)?;
269+
.with_entry_id(dataset_id)
270+
.map_err(|err| StreamEntryError::InvalidId(err.into()))?;
256271

257272
let response = self
258273
.inner()
259274
.register_with_dataset(req.map(Into::into))
260-
.await?
275+
.await
276+
.map_err(|err| StreamEntryError::RegisterData(err.into()))?
261277
.into_inner()
262278
.data
263279
.ok_or_else(|| missing_field!(RegisterWithDatasetResponse, "data"))?
@@ -346,7 +362,8 @@ where
346362
let response: RegisterTableResponse = self
347363
.inner()
348364
.register_table(tonic::Request::new(request.into()))
349-
.await?
365+
.await
366+
.map_err(|err| StreamEntryError::RegisterTable(err.into()))?
350367
.into_inner()
351368
.try_into()?;
352369

@@ -375,7 +392,8 @@ where
375392
}
376393
.into(),
377394
))
378-
.await?;
395+
.await
396+
.map_err(|err| StreamEntryError::Maintenance(err.into()))?;
379397

380398
Ok(())
381399
}
@@ -385,7 +403,8 @@ where
385403
.do_global_maintenance(tonic::Request::new(
386404
re_protos::cloud::v1alpha1::DoGlobalMaintenanceRequest {},
387405
))
388-
.await?;
406+
.await
407+
.map_err(|err| StreamEntryError::Maintenance(err.into()))?;
389408

390409
Ok(())
391410
}

crates/store/re_redap_client/src/connection_registry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,19 @@ impl ConnectionRegistry {
5858
pub enum ClientConnectionError {
5959
/// Native connection error
6060
#[cfg(not(target_arch = "wasm32"))]
61-
#[error("Connection error: {0}")]
61+
#[error("Connection error\nDetails:{0}")]
6262
Tonic(#[from] tonic::transport::Error),
6363

6464
#[error("server is expecting an unencrypted connection (try `rerun+http://` if you are sure)")]
6565
UnencryptedServer,
6666

67-
#[error("the server requires an authentication token, but none was provided: {0}")]
67+
#[error("the server requires an authentication token, but none was provided\nDetails:{0}")]
6868
UnauthenticatedMissingToken(TonicStatusError),
6969

70-
#[error("the server rejected the provided authentication token: {0}")]
70+
#[error("the server rejected the provided authentication token\nDetails:{0}")]
7171
UnauthenticatedBadToken(TonicStatusError),
7272

73-
#[error("failed to obtain server version: {0}")]
73+
#[error("failed to obtain server version\nDetails:{0}")]
7474
AuthCheckError(TonicStatusError),
7575
}
7676

crates/store/re_redap_client/src/grpc.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio_stream::{Stream, StreamExt as _};
1414

1515
use crate::{
1616
ConnectionClient, ConnectionRegistryHandle, MAX_DECODING_MESSAGE_SIZE, StreamError,
17-
spawn_future,
17+
StreamPartitionError, spawn_future,
1818
};
1919

2020
/// UI commands issued when streaming in datasets.
@@ -248,13 +248,15 @@ pub(crate) async fn client(
248248
pub fn get_chunks_response_to_chunk_and_partition_id(
249249
response: tonic::Streaming<re_protos::cloud::v1alpha1::GetChunksResponse>,
250250
) -> impl Stream<Item = Result<Vec<(Chunk, Option<String>)>, StreamError>> {
251+
use crate::StreamPartitionError;
252+
251253
response
252254
.then(|resp| {
253255
// We want to make sure to offload that compute-heavy work to the compute worker pool: it's
254256
// not going to make this one single pipeline any faster, but it will prevent starvation of
255257
// the Tokio runtime (which would slow down every other futures currently scheduled!).
256258
tokio::task::spawn_blocking(move || {
257-
let r = resp.map_err(Into::<StreamError>::into)?;
259+
let r = resp.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?;
258260
let _span =
259261
tracing::trace_span!("get_chunks::batch_decode", num_chunks = r.chunks.len())
260262
.entered();
@@ -292,13 +294,15 @@ pub fn get_chunks_response_to_chunk_and_partition_id(
292294
pub fn fetch_chunks_response_to_chunk_and_partition_id(
293295
response: tonic::Streaming<re_protos::cloud::v1alpha1::FetchChunksResponse>,
294296
) -> impl Stream<Item = Result<Vec<(Chunk, Option<String>)>, StreamError>> {
297+
use crate::StreamPartitionError;
298+
295299
response
296300
.then(|resp| {
297301
// We want to make sure to offload that compute-heavy work to the compute worker pool: it's
298302
// not going to make this one single pipeline any faster, but it will prevent starvation of
299303
// the Tokio runtime (which would slow down every other futures currently scheduled!).
300304
tokio::task::spawn_blocking(move || {
301-
let r = resp.map_err(Into::<StreamError>::into)?;
305+
let r = resp.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?;
302306
let _span =
303307
tracing::trace_span!("get_chunks::batch_decode", num_chunks = r.chunks.len())
304308
.entered();
@@ -332,7 +336,7 @@ pub fn get_chunks_response_to_chunk_and_partition_id(
332336
response: tonic::Streaming<re_protos::cloud::v1alpha1::GetChunksResponse>,
333337
) -> impl Stream<Item = Result<Vec<(Chunk, Option<String>)>, StreamError>> {
334338
response.map(|resp| {
335-
let resp = resp?;
339+
let resp = resp.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?;
336340

337341
let _span =
338342
tracing::trace_span!("get_chunks::batch_decode", num_chunks = resp.chunks.len())
@@ -362,7 +366,7 @@ pub fn fetch_chunks_response_to_chunk_and_partition_id(
362366
response: tonic::Streaming<re_protos::cloud::v1alpha1::FetchChunksResponse>,
363367
) -> impl Stream<Item = Result<Vec<(Chunk, Option<String>)>, StreamError>> {
364368
response.map(|resp| {
365-
let resp = resp?;
369+
let resp = resp.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?;
366370

367371
let _span =
368372
tracing::trace_span!("get_chunks::batch_decode", num_chunks = resp.chunks.len())
@@ -514,7 +518,8 @@ async fn stream_partition_from_server(
514518
exclude_temporal_data: true,
515519
query: None,
516520
})
517-
.await?
521+
.await
522+
.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?
518523
.into_inner()
519524
};
520525

@@ -551,7 +556,8 @@ async fn stream_partition_from_server(
551556
.into()
552557
}),
553558
})
554-
.await?
559+
.await
560+
.map_err(|err| StreamPartitionError::StreamingChunks(err.into()))?
555561
.into_inner()
556562
};
557563

crates/store/re_redap_client/src/lib.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,55 @@ impl std::error::Error for TonicStatusError {
7878
}
7979
}
8080

81+
#[derive(thiserror::Error, Debug)]
82+
pub enum StreamEntryError {
83+
#[error("Failed reading entry\nDetails:{0}")]
84+
Read(TonicStatusError),
85+
86+
#[error("Failed finding entry\nDetails:{0}")]
87+
Find(TonicStatusError),
88+
89+
#[error("Failed deleting entry\nDetails:{0}")]
90+
Delete(TonicStatusError),
91+
92+
#[error("Failed updating entry\nDetails:{0}")]
93+
Update(TonicStatusError),
94+
95+
#[error("Failed creating entry\nDetails:{0}")]
96+
Create(TonicStatusError),
97+
98+
#[error("Failed reading entry's partitions\nDetails:{0}")]
99+
ReadPartitions(TonicStatusError),
100+
101+
#[error("Failed registering data source with entry\nDetails:{0}")]
102+
RegisterData(TonicStatusError),
103+
104+
#[error("Failed registering table\nDetails:{0}")]
105+
RegisterTable(TonicStatusError),
106+
107+
#[error("Error while doing maintenance on entry\nDetails:{0}")]
108+
Maintenance(TonicStatusError),
109+
110+
#[error("Invalid entry id\nDetails:{0}")]
111+
InvalidId(TonicStatusError),
112+
}
113+
114+
#[derive(thiserror::Error, Debug)]
115+
pub enum StreamPartitionError {
116+
#[error("Failed streaming partition chunks\nDetails:{0}")]
117+
StreamingChunks(TonicStatusError),
118+
}
119+
81120
#[derive(thiserror::Error, Debug)]
82121
pub enum StreamError {
83122
#[error(transparent)]
84123
ClientConnectionError(#[from] ClientConnectionError),
85124

86125
#[error(transparent)]
87-
TonicStatus(#[from] TonicStatusError),
126+
EntryError(#[from] StreamEntryError),
127+
128+
#[error(transparent)]
129+
PartitionError(#[from] StreamPartitionError),
88130

89131
#[error(transparent)]
90132
Tokio(#[from] tokio::task::JoinError),
@@ -116,12 +158,6 @@ const _: () = assert!(
116158
"Error type is too large. Try to reduce its size by boxing some of its variants.",
117159
);
118160

119-
impl From<tonic::Status> for StreamError {
120-
fn from(value: tonic::Status) -> Self {
121-
Self::TonicStatus(value.into())
122-
}
123-
}
124-
125161
// TODO(ab, andreas): This should be replaced by the use of `AsyncRuntimeHandle`. However, this
126162
// requires:
127163
// - `AsyncRuntimeHandle` to be moved lower in the crate hierarchy to be available here (unsure

crates/utils/re_log/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
//!
1212
//! The `warn_once` etc macros are for when you want to suppress repeated
1313
//! logging of the exact same message.
14+
//!
15+
//! In the viewer these logs, if >= info, become notifications. See
16+
//! `re_ui::notifications` for more information.
1417
1518
mod channel_logger;
1619
mod result_extensions;

crates/viewer/re_redap_browser/src/entries.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ impl EntryError {
7272
| StreamError::CodecError(_)
7373
| StreamError::ChunkError(_)
7474
| StreamError::DecodeError(_)
75-
| StreamError::TonicStatus(_)
75+
| StreamError::EntryError(_)
76+
| StreamError::PartitionError(_)
7677
| StreamError::TypeConversionError(_)
7778
| StreamError::MissingDataframeColumn(_)
7879
| StreamError::MissingData(_)

0 commit comments

Comments
 (0)