@@ -8,14 +8,11 @@ use re_protos::cloud::v1alpha1::GetChunksRequest;
8
8
use re_protos:: cloud:: v1alpha1:: ext:: { Query , QueryLatestAt , QueryRange } ;
9
9
use re_protos:: cloud:: v1alpha1:: rerun_cloud_service_client:: RerunCloudServiceClient ;
10
10
use re_protos:: common:: v1alpha1:: ext:: PartitionId ;
11
- use re_uri:: { DatasetPartitionUri , Origin , TimeSelection } ;
11
+ use re_uri:: { Origin , TimeSelection } ;
12
12
13
13
use tokio_stream:: { Stream , StreamExt as _} ;
14
14
15
- use crate :: {
16
- ConnectionClient , ConnectionRegistryHandle , MAX_DECODING_MESSAGE_SIZE , StreamError ,
17
- StreamPartitionError , spawn_future,
18
- } ;
15
+ use crate :: { ConnectionClient , MAX_DECODING_MESSAGE_SIZE , StreamError , StreamPartitionError } ;
19
16
20
17
/// UI commands issued when streaming in datasets.
21
18
///
@@ -35,55 +32,6 @@ pub enum UiCommand {
35
32
} ,
36
33
}
37
34
38
- /// Stream an rrd file or metadata catalog over gRPC from a Rerun Data Platform server.
39
- ///
40
- /// `on_msg` can be used to wake up the UI thread on Wasm.
41
- pub fn stream_dataset_from_redap (
42
- connection_registry : & ConnectionRegistryHandle ,
43
- uri : DatasetPartitionUri ,
44
- on_ui_cmd : Option < Box < dyn Fn ( UiCommand ) + Send + Sync > > ,
45
- on_msg : Option < Box < dyn Fn ( ) + Send + Sync > > ,
46
- ) -> re_smart_channel:: Receiver < LogMsg > {
47
- re_log:: debug!( "Loading {uri}…" ) ;
48
-
49
- let ( tx, rx) = re_smart_channel:: smart_channel (
50
- re_smart_channel:: SmartMessageSource :: RedapGrpcStream {
51
- uri : uri. clone ( ) ,
52
- select_when_loaded : true ,
53
- } ,
54
- re_smart_channel:: SmartChannelSource :: RedapGrpcStream {
55
- uri : uri. clone ( ) ,
56
- select_when_loaded : true ,
57
- } ,
58
- ) ;
59
-
60
- async fn stream_partition (
61
- connection_registry : ConnectionRegistryHandle ,
62
- tx : re_smart_channel:: Sender < LogMsg > ,
63
- uri : DatasetPartitionUri ,
64
- on_ui_cmd : Option < Box < dyn Fn ( UiCommand ) + Send + Sync > > ,
65
- on_msg : Option < Box < dyn Fn ( ) + Send + Sync > > ,
66
- ) -> Result < ( ) , StreamError > {
67
- let client = connection_registry. client ( uri. origin . clone ( ) ) . await ?;
68
-
69
- stream_blueprint_and_partition_from_server ( client, tx, uri. clone ( ) , on_ui_cmd, on_msg) . await
70
- }
71
-
72
- let connection_registry = connection_registry. clone ( ) ;
73
- spawn_future ( async move {
74
- if let Err ( err) =
75
- stream_partition ( connection_registry, tx, uri. clone ( ) , on_ui_cmd, on_msg) . await
76
- {
77
- re_log:: error!(
78
- "Error while streaming {uri}: {}" ,
79
- re_error:: format_ref( & err)
80
- ) ;
81
- }
82
- } ) ;
83
-
84
- rx
85
- }
86
-
87
35
// TODO(ab): do not publish this out of this crate (for now it is still being used by rerun_py
88
36
// the viewer grpc connection). Ideally we'd only publish `ClientConnectionError`.
89
37
#[ derive( Debug , thiserror:: Error ) ]
@@ -568,9 +516,20 @@ async fn stream_partition_from_server(
568
516
569
517
let store_id = store_info. store_id . clone ( ) ;
570
518
519
+ if tx
520
+ . send ( LogMsg :: SetStoreInfo ( SetStoreInfo {
521
+ row_id : * re_chunk:: RowId :: new ( ) ,
522
+ info : store_info,
523
+ } ) )
524
+ . is_err ( )
525
+ {
526
+ re_log:: debug!( "Receiver disconnected" ) ;
527
+ return Ok ( ( ) ) ;
528
+ }
529
+
571
530
// Send UI commands for recording (as opposed to blueprint) stores.
572
531
if let Some ( on_ui_cmd) = on_ui_cmd
573
- && store_info . store_id . is_recording ( )
532
+ && store_id. is_recording ( )
574
533
{
575
534
if let Some ( time_range) = time_range {
576
535
on_ui_cmd ( UiCommand :: AddValidTimeRange {
@@ -594,17 +553,6 @@ async fn stream_partition_from_server(
594
553
}
595
554
}
596
555
597
- if tx
598
- . send ( LogMsg :: SetStoreInfo ( SetStoreInfo {
599
- row_id : * re_chunk:: RowId :: new ( ) ,
600
- info : store_info,
601
- } ) )
602
- . is_err ( )
603
- {
604
- re_log:: debug!( "Receiver disconnected" ) ;
605
- return Ok ( ( ) ) ;
606
- }
607
-
608
556
// TODO(#10229): this looks to be converting back and forth?
609
557
610
558
let static_chunk_stream = get_chunks_response_to_chunk_and_partition_id ( static_chunk_stream) ;
0 commit comments