Skip to content

Commit 42ac01a

Browse files
authored
table provider: propagate stream error from IO loop (#11397)
### What Right now if there's an error in the chunk fetching stream, it will be silently ignored. Instead here we make a change to propagate results from the IO loop and handle them in the CPU loop as we already handle errors in CPU loop. This ensures that receiving an error will actually result in stream termination (with an error).
1 parent 90479b3 commit 42ac01a

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub struct DataframePartitionStreamInner {
7171
client: ConnectionClient,
7272
chunk_infos: Vec<RecordBatch>,
7373

74-
chunk_tx: Option<Sender<ChunksWithPartition>>,
74+
chunk_tx: Option<Sender<Result<ChunksWithPartition, re_redap_client::StreamError>>>,
7575
store_output_channel: Receiver<RecordBatch>,
7676
io_join_handle: Option<JoinHandle<Result<(), DataFusionError>>>,
7777

@@ -325,13 +325,16 @@ async fn send_next_row(
325325
// TODO(#10781) - support for sending intermediate results/chunks
326326
#[tracing::instrument(level = "trace", skip_all)]
327327
async fn chunk_store_cpu_worker_thread(
328-
mut input_channel: Receiver<ChunksWithPartition>,
328+
mut input_channel: Receiver<Result<ChunksWithPartition, re_redap_client::StreamError>>,
329329
output_channel: Sender<RecordBatch>,
330330
query_expression: QueryExpression,
331331
projected_schema: Arc<Schema>,
332332
) -> Result<(), DataFusionError> {
333333
let mut current_stores: Option<(String, ChunkStoreHandle, QueryHandle<StorageEngine>)> = None;
334334
while let Some(chunks_and_partition_ids) = input_channel.recv().await {
335+
let chunks_and_partition_ids =
336+
chunks_and_partition_ids.map_err(|err| exec_datafusion_err!("{err}"))?;
337+
335338
for (chunk, partition_id) in chunks_and_partition_ids {
336339
let partition_id = partition_id
337340
.ok_or_else(|| exec_datafusion_err!("Received chunk without a partition id"))?;
@@ -405,7 +408,7 @@ async fn chunk_store_cpu_worker_thread(
405408
async fn chunk_stream_io_loop(
406409
mut client: ConnectionClient,
407410
chunk_infos: Vec<RecordBatch>,
408-
output_channel: Sender<ChunksWithPartition>,
411+
output_channel: Sender<Result<ChunksWithPartition, re_redap_client::StreamError>>,
409412
) -> Result<(), DataFusionError> {
410413
let chunk_infos = chunk_infos
411414
.into_iter()
@@ -438,7 +441,7 @@ async fn chunk_stream_io_loop(
438441
fetch_chunks_response_stream,
439442
);
440443

441-
while let Some(Ok(chunk_and_partition_id)) = chunk_stream.next().await {
444+
while let Some(chunk_and_partition_id) = chunk_stream.next().await {
442445
if output_channel.send(chunk_and_partition_id).await.is_err() {
443446
break;
444447
}

0 commit comments

Comments
 (0)