@@ -93,34 +93,36 @@ where
93
93
// Convert the broadcast receiver into a Stream
94
94
let stream = BroadcastStream :: new ( update_rx) ;
95
95
96
- let sse_stream = stream. then ( move |message| {
97
- let state_clone = state. clone ( ) ; // Clone again to use inside the async block
98
- let price_ids_clone = price_ids. clone ( ) ; // Clone again for use inside the async block
99
- async move {
100
- match message {
101
- Ok ( event) => {
102
- match handle_aggregation_event (
103
- event,
104
- state_clone,
105
- price_ids_clone,
106
- params. encoding ,
107
- params. parsed ,
108
- params. benchmarks_only ,
109
- params. allow_unordered ,
110
- )
111
- . await
112
- {
113
- Ok ( Some ( update) ) => Ok ( Event :: default ( )
114
- . json_data ( update)
115
- . unwrap_or_else ( error_event) ) ,
116
- Ok ( None ) => Ok ( Event :: default ( ) . comment ( "No update available" ) ) ,
117
- Err ( e) => Ok ( error_event ( e) ) ,
96
+ let sse_stream = stream
97
+ . then ( move |message| {
98
+ let state_clone = state. clone ( ) ; // Clone again to use inside the async block
99
+ let price_ids_clone = price_ids. clone ( ) ; // Clone again for use inside the async block
100
+ async move {
101
+ match message {
102
+ Ok ( event) => {
103
+ match handle_aggregation_event (
104
+ event,
105
+ state_clone,
106
+ price_ids_clone,
107
+ params. encoding ,
108
+ params. parsed ,
109
+ params. benchmarks_only ,
110
+ params. allow_unordered ,
111
+ )
112
+ . await
113
+ {
114
+ Ok ( Some ( update) ) => Some ( Ok ( Event :: default ( )
115
+ . json_data ( update)
116
+ . unwrap_or_else ( error_event) ) ) ,
117
+ Ok ( None ) => None ,
118
+ Err ( e) => Some ( Ok ( error_event ( e) ) ) ,
119
+ }
118
120
}
121
+ Err ( e) => Some ( Ok ( error_event ( e) ) ) ,
119
122
}
120
- Err ( e) => Ok ( error_event ( e) ) ,
121
123
}
122
- }
123
- } ) ;
124
+ } )
125
+ . filter_map ( |x| x ) ;
124
126
125
127
Ok ( Sse :: new ( sse_stream) . keep_alive ( KeepAlive :: default ( ) ) )
126
128
}
0 commit comments