@@ -6,7 +6,8 @@ use std::error::Error;
6
6
use std:: time:: Duration ;
7
7
8
8
use clap:: { App , Arg } ;
9
- use futures:: future:: { join, try_join_all} ;
9
+ use futures:: future:: { join, join_all} ;
10
+ use slog:: { error, info, Logger } ;
10
11
use tokio:: runtime:: Runtime ;
11
12
use tokio:: time:: { delay_for, timeout} ;
12
13
@@ -15,7 +16,6 @@ use primitives::adapter::{Adapter, DummyAdapterOptions, KeystoreOptions};
15
16
use primitives:: config:: { configuration, Config } ;
16
17
use primitives:: util:: tests:: prep_db:: { AUTH , IDS } ;
17
18
use primitives:: { Channel , SpecValidator , ValidatorId } ;
18
- use slog:: { error, info, Logger } ;
19
19
use validator_worker:: error:: ValidatorWorker as ValidatorWorkerError ;
20
20
use validator_worker:: { all_channels, follower, leader, SentryApi } ;
21
21
@@ -165,26 +165,26 @@ async fn iterate_channels<A: Adapter + 'static>(args: Args<A>, logger: &Logger)
165
165
let channels = match result {
166
166
Ok ( channels) => channels,
167
167
Err ( e) => {
168
- error ! ( logger, "Failed to get channels {}" , & e; "main" => "iterate_channels" ) ;
168
+ error ! ( logger, "Failed to get channels - {}" , & e; "main" => "iterate_channels" ) ;
169
169
return ;
170
170
}
171
171
} ;
172
172
173
173
let channels_size = channels. len ( ) ;
174
174
175
- let tick = try_join_all (
175
+ let tick_results = join_all (
176
176
channels
177
177
. into_iter ( )
178
178
. map ( |channel| validator_tick ( args. adapter . clone ( ) , channel, & args. config , logger) ) ,
179
179
)
180
180
. await ;
181
181
182
- info ! ( logger, "processed {} channels" , channels_size) ;
183
-
184
- if let Err ( e) = tick {
185
- error ! ( logger, "An occurred while processing channels {}" , & e; "main" => "iterate_channels" ) ;
182
+ for channel_err in tick_results. into_iter ( ) . filter_map ( Result :: err) {
183
+ error ! ( logger, "Error processing channels - {}" , channel_err; "main" => "iterate_channels" ) ;
186
184
}
187
185
186
+ info ! ( logger, "processed {} channels" , channels_size) ;
187
+
188
188
if channels_size >= args. config . max_channels as usize {
189
189
error ! ( logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached" , & args. config. max_channels; "main" => "iterate_channels" ) ;
190
190
}
@@ -202,23 +202,25 @@ async fn validator_tick<A: Adapter + 'static>(
202
202
let duration = Duration :: from_millis ( config. validator_tick_timeout as u64 ) ;
203
203
204
204
match channel. spec . validators . find ( & whoami) {
205
- SpecValidator :: Leader ( _) => {
206
- if let Err ( e) = timeout ( duration, leader:: tick ( & sentry) ) . await {
207
- return Err ( ValidatorWorkerError :: Failed ( e. to_string ( ) ) ) ;
208
- }
209
- }
210
- SpecValidator :: Follower ( _) => {
211
- if let Err ( e) = timeout ( duration, follower:: tick ( & sentry) ) . await {
212
- return Err ( ValidatorWorkerError :: Failed ( e. to_string ( ) ) ) ;
213
- }
214
- }
215
- SpecValidator :: None => {
216
- return Err ( ValidatorWorkerError :: Failed (
217
- "validatorTick: processing a channel where we are not validating" . to_string ( ) ,
218
- ) )
219
- }
220
- } ;
221
- Ok ( ( ) )
205
+ SpecValidator :: Leader ( _) => timeout ( duration, leader:: tick ( & sentry) )
206
+ . await
207
+ . map_err ( |timeout_err| {
208
+ ValidatorWorkerError :: Channel ( channel. id , timeout_err. to_string ( ) )
209
+ } ) ?
210
+ . map_err ( |tick_err| ValidatorWorkerError :: Channel ( channel. id , tick_err. to_string ( ) ) )
211
+ . map ( |_| ( ) ) ,
212
+ SpecValidator :: Follower ( _) => timeout ( duration, follower:: tick ( & sentry) )
213
+ . await
214
+ . map_err ( |timeout_err| {
215
+ ValidatorWorkerError :: Channel ( channel. id , timeout_err. to_string ( ) )
216
+ } ) ?
217
+ . map_err ( |tick_err| ValidatorWorkerError :: Channel ( channel. id , tick_err. to_string ( ) ) )
218
+ . map ( |_| ( ) ) ,
219
+ SpecValidator :: None => Err ( ValidatorWorkerError :: Channel (
220
+ channel. id ,
221
+ "validatorTick: processing a channel which we are not validating" . to_string ( ) ,
222
+ ) ) ,
223
+ }
222
224
}
223
225
224
226
fn logger ( ) -> Logger {
0 commit comments