|
| 1 | +// Copyright 2019-2023 Parity Technologies (UK) Ltd. |
| 2 | +// This file is part of Parity Bridges Common. |
| 3 | + |
| 4 | +// Parity Bridges Common is free software: you can redistribute it and/or modify |
| 5 | +// it under the terms of the GNU General Public License as published by |
| 6 | +// the Free Software Foundation, either version 3 of the License, or |
| 7 | +// (at your option) any later version. |
| 8 | + |
| 9 | +// Parity Bridges Common is distributed in the hope that it will be useful, |
| 10 | +// but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +// GNU General Public License for more details. |
| 13 | + |
| 14 | +// You should have received a copy of the GNU General Public License |
| 15 | +// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. |
| 16 | + |
| 17 | +use crate::{ |
| 18 | + reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo, |
| 19 | + SourceClient, TargetClient, |
| 20 | +}; |
| 21 | + |
| 22 | +use bp_header_chain::{FinalityProof, FindEquivocations}; |
| 23 | +use finality_relay::{FinalityProofsBuf, FinalityProofsStream}; |
| 24 | +use futures::{select, FutureExt}; |
| 25 | +use num_traits::Saturating; |
| 26 | +use relay_utils::{ |
| 27 | + relay_loop::{reconnect_failed_client, RECONNECT_DELAY}, |
| 28 | + FailedClient, MaybeConnectionError, |
| 29 | +}; |
| 30 | +use std::{future::Future, time::Duration}; |
| 31 | + |
| 32 | +/// The context needed for finding equivocations inside finality proofs and reporting them. |
| 33 | +struct EquivocationReportingContext<P: EquivocationDetectionPipeline> { |
| 34 | + synced_header_hash: P::Hash, |
| 35 | + synced_verification_context: P::FinalityVerificationContext, |
| 36 | +} |
| 37 | + |
| 38 | +impl<P: EquivocationDetectionPipeline> EquivocationReportingContext<P> { |
| 39 | + /// Try to get the `EquivocationReportingContext` used by the target chain |
| 40 | + /// at the provided block. |
| 41 | + async fn try_read_from_target<TC: TargetClient<P>>( |
| 42 | + target_client: &TC, |
| 43 | + at: P::TargetNumber, |
| 44 | + ) -> Result<Option<Self>, TC::Error> { |
| 45 | + let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?; |
| 46 | + Ok(match maybe_best_synced_header_hash { |
| 47 | + Some(best_synced_header_hash) => Some(EquivocationReportingContext { |
| 48 | + synced_header_hash: best_synced_header_hash, |
| 49 | + synced_verification_context: target_client |
| 50 | + .finality_verification_context(at) |
| 51 | + .await?, |
| 52 | + }), |
| 53 | + None => None, |
| 54 | + }) |
| 55 | + } |
| 56 | + |
| 57 | + /// Update with the new context introduced by the `HeaderFinalityInfo<P>` if any. |
| 58 | + fn update(&mut self, info: HeaderFinalityInfo<P>) { |
| 59 | + if let Some(new_verification_context) = info.new_verification_context { |
| 60 | + self.synced_header_hash = info.finality_proof.target_header_hash(); |
| 61 | + self.synced_verification_context = new_verification_context; |
| 62 | + } |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +/// Equivocations detection loop state. |
| 67 | +struct EquivocationDetectionLoop< |
| 68 | + P: EquivocationDetectionPipeline, |
| 69 | + SC: SourceClient<P>, |
| 70 | + TC: TargetClient<P>, |
| 71 | +> { |
| 72 | + source_client: SC, |
| 73 | + target_client: TC, |
| 74 | + |
| 75 | + from_block_num: Option<P::TargetNumber>, |
| 76 | + until_block_num: Option<P::TargetNumber>, |
| 77 | + |
| 78 | + reporter: EquivocationsReporter<P, SC>, |
| 79 | + |
| 80 | + finality_proofs_stream: FinalityProofsStream<P, SC>, |
| 81 | + finality_proofs_buf: FinalityProofsBuf<P>, |
| 82 | +} |
| 83 | + |
| 84 | +impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>> |
| 85 | + EquivocationDetectionLoop<P, SC, TC> |
| 86 | +{ |
| 87 | + async fn handle_source_error(&mut self, e: SC::Error) { |
| 88 | + if e.is_connection_error() { |
| 89 | + reconnect_failed_client( |
| 90 | + FailedClient::Source, |
| 91 | + RECONNECT_DELAY, |
| 92 | + &mut self.source_client, |
| 93 | + &mut self.target_client, |
| 94 | + ) |
| 95 | + .await; |
| 96 | + } else { |
| 97 | + async_std::task::sleep(RECONNECT_DELAY).await; |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + async fn handle_target_error(&mut self, e: TC::Error) { |
| 102 | + if e.is_connection_error() { |
| 103 | + reconnect_failed_client( |
| 104 | + FailedClient::Target, |
| 105 | + RECONNECT_DELAY, |
| 106 | + &mut self.source_client, |
| 107 | + &mut self.target_client, |
| 108 | + ) |
| 109 | + .await; |
| 110 | + } else { |
| 111 | + async_std::task::sleep(RECONNECT_DELAY).await; |
| 112 | + } |
| 113 | + } |
| 114 | + |
| 115 | + async fn ensure_finality_proofs_stream(&mut self) { |
| 116 | + match self.finality_proofs_stream.ensure_stream(&self.source_client).await { |
| 117 | + Ok(_) => {}, |
| 118 | + Err(e) => { |
| 119 | + log::error!( |
| 120 | + target: "bridge", |
| 121 | + "Could not connect to the {} `FinalityProofsStream`: {e:?}", |
| 122 | + P::SOURCE_NAME, |
| 123 | + ); |
| 124 | + |
| 125 | + // Reconnect to the source client if needed |
| 126 | + self.handle_source_error(e).await |
| 127 | + }, |
| 128 | + } |
| 129 | + } |
| 130 | + |
| 131 | + async fn best_finalized_target_block_number(&mut self) -> Option<P::TargetNumber> { |
| 132 | + match self.target_client.best_finalized_header_number().await { |
| 133 | + Ok(block_num) => Some(block_num), |
| 134 | + Err(e) => { |
| 135 | + log::error!( |
| 136 | + target: "bridge", |
| 137 | + "Could not read best finalized header number from {}: {e:?}", |
| 138 | + P::TARGET_NAME, |
| 139 | + ); |
| 140 | + |
| 141 | + // Reconnect target client and move on |
| 142 | + self.handle_target_error(e).await; |
| 143 | + |
| 144 | + None |
| 145 | + }, |
| 146 | + } |
| 147 | + } |
| 148 | + |
| 149 | + async fn build_equivocation_reporting_context( |
| 150 | + &mut self, |
| 151 | + block_num: P::TargetNumber, |
| 152 | + ) -> Option<EquivocationReportingContext<P>> { |
| 153 | + match EquivocationReportingContext::try_read_from_target( |
| 154 | + &self.target_client, |
| 155 | + block_num.saturating_sub(1.into()), |
| 156 | + ) |
| 157 | + .await |
| 158 | + { |
| 159 | + Ok(Some(context)) => Some(context), |
| 160 | + Ok(None) => None, |
| 161 | + Err(e) => { |
| 162 | + log::error!( |
| 163 | + target: "bridge", |
| 164 | + "Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}", |
| 165 | + P::SOURCE_NAME, |
| 166 | + P::TARGET_NAME, |
| 167 | + ); |
| 168 | + |
| 169 | + // Reconnect target client if needed and move on. |
| 170 | + self.handle_target_error(e).await; |
| 171 | + None |
| 172 | + }, |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + /// Try to get the finality info associated to the source headers synced with the target chain |
| 177 | + /// at the specified block. |
| 178 | + async fn synced_source_headers_at_target( |
| 179 | + &mut self, |
| 180 | + at: P::TargetNumber, |
| 181 | + ) -> Vec<HeaderFinalityInfo<P>> { |
| 182 | + match self.target_client.synced_headers_finality_info(at).await { |
| 183 | + Ok(synced_headers) => synced_headers, |
| 184 | + Err(e) => { |
| 185 | + log::error!( |
| 186 | + target: "bridge", |
| 187 | + "Could not get {} headers synced to {} at block {at:?}", |
| 188 | + P::SOURCE_NAME, |
| 189 | + P::TARGET_NAME |
| 190 | + ); |
| 191 | + |
| 192 | + // Reconnect in case of a connection error. |
| 193 | + self.handle_target_error(e).await; |
| 194 | + // And move on to the next block. |
| 195 | + vec![] |
| 196 | + }, |
| 197 | + } |
| 198 | + } |
| 199 | + |
| 200 | + async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) { |
| 201 | + match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await { |
| 202 | + Ok(_) => {}, |
| 203 | + Err(e) => { |
| 204 | + log::error!( |
| 205 | + target: "bridge", |
| 206 | + "Could not submit equivocation report to {} for {equivocation:?}: {e:?}", |
| 207 | + P::SOURCE_NAME, |
| 208 | + ); |
| 209 | + |
| 210 | + // Reconnect source client and move on |
| 211 | + self.handle_source_error(e).await; |
| 212 | + }, |
| 213 | + } |
| 214 | + } |
| 215 | + |
| 216 | + async fn check_block( |
| 217 | + &mut self, |
| 218 | + block_num: P::TargetNumber, |
| 219 | + context: &mut EquivocationReportingContext<P>, |
| 220 | + ) { |
| 221 | + let synced_headers = self.synced_source_headers_at_target(block_num).await; |
| 222 | + |
| 223 | + for synced_header in synced_headers { |
| 224 | + self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); |
| 225 | + |
| 226 | + let equivocations = match P::EquivocationsFinder::find_equivocations( |
| 227 | + &context.synced_verification_context, |
| 228 | + &synced_header.finality_proof, |
| 229 | + self.finality_proofs_buf.buf().as_slice(), |
| 230 | + ) { |
| 231 | + Ok(equivocations) => equivocations, |
| 232 | + Err(e) => { |
| 233 | + log::error!( |
| 234 | + target: "bridge", |
| 235 | + "Could not search for equivocations in the finality proof \ |
| 236 | + for source header {:?} synced at target block {block_num:?}: {e:?}", |
| 237 | + synced_header.finality_proof.target_header_hash() |
| 238 | + ); |
| 239 | + continue |
| 240 | + }, |
| 241 | + }; |
| 242 | + for equivocation in equivocations { |
| 243 | + self.report_equivocation(context.synced_header_hash, equivocation).await; |
| 244 | + } |
| 245 | + |
| 246 | + self.finality_proofs_buf |
| 247 | + .prune(synced_header.finality_proof.target_header_number(), None); |
| 248 | + context.update(synced_header); |
| 249 | + } |
| 250 | + } |
| 251 | + |
| 252 | + async fn do_run(&mut self, tick: Duration, exit_signal: impl Future<Output = ()>) { |
| 253 | + let exit_signal = exit_signal.fuse(); |
| 254 | + futures::pin_mut!(exit_signal); |
| 255 | + |
| 256 | + loop { |
| 257 | + // Make sure that we are connected to the source finality proofs stream. |
| 258 | + self.ensure_finality_proofs_stream().await; |
| 259 | + // Check the status of the pending equivocation reports |
| 260 | + self.reporter.process_pending_reports().await; |
| 261 | + |
| 262 | + // Update blocks range. |
| 263 | + if let Some(block_number) = self.best_finalized_target_block_number().await { |
| 264 | + self.from_block_num.get_or_insert(block_number); |
| 265 | + self.until_block_num = Some(block_number); |
| 266 | + } |
| 267 | + let (from, until) = match (self.from_block_num, self.until_block_num) { |
| 268 | + (Some(from), Some(until)) => (from, until), |
| 269 | + _ => continue, |
| 270 | + }; |
| 271 | + |
| 272 | + // Check the available blocks |
| 273 | + let mut current_block_number = from; |
| 274 | + while current_block_number <= until { |
| 275 | + let mut context = |
| 276 | + match self.build_equivocation_reporting_context(current_block_number).await { |
| 277 | + Some(context) => context, |
| 278 | + None => continue, |
| 279 | + }; |
| 280 | + self.check_block(current_block_number, &mut context).await; |
| 281 | + current_block_number = current_block_number.saturating_add(1.into()); |
| 282 | + } |
| 283 | + self.until_block_num = Some(current_block_number); |
| 284 | + |
| 285 | + select! { |
| 286 | + _ = async_std::task::sleep(tick).fuse() => {}, |
| 287 | + _ = exit_signal => return, |
| 288 | + } |
| 289 | + } |
| 290 | + } |
| 291 | + |
| 292 | + pub async fn run( |
| 293 | + source_client: SC, |
| 294 | + target_client: TC, |
| 295 | + tick: Duration, |
| 296 | + exit_signal: impl Future<Output = ()>, |
| 297 | + ) -> Result<(), FailedClient> { |
| 298 | + let mut equivocation_detection_loop = Self { |
| 299 | + source_client, |
| 300 | + target_client, |
| 301 | + from_block_num: None, |
| 302 | + until_block_num: None, |
| 303 | + reporter: EquivocationsReporter::<P, SC>::new(), |
| 304 | + finality_proofs_stream: FinalityProofsStream::new(), |
| 305 | + finality_proofs_buf: FinalityProofsBuf::new(vec![]), |
| 306 | + }; |
| 307 | + |
| 308 | + equivocation_detection_loop.do_run(tick, exit_signal).await; |
| 309 | + Ok(()) |
| 310 | + } |
| 311 | +} |
| 312 | + |
| 313 | +/// Spawn the equivocations detection loop. |
| 314 | +/// TODO: remove `#[allow(dead_code)]` |
| 315 | +#[allow(dead_code)] |
| 316 | +pub async fn run<P: EquivocationDetectionPipeline>( |
| 317 | + source_client: impl SourceClient<P>, |
| 318 | + target_client: impl TargetClient<P>, |
| 319 | + tick: Duration, |
| 320 | + exit_signal: impl Future<Output = ()> + 'static + Send, |
| 321 | +) -> Result<(), relay_utils::Error> { |
| 322 | + let exit_signal = exit_signal.shared(); |
| 323 | + relay_utils::relay_loop(source_client, target_client) |
| 324 | + .run( |
| 325 | + format!("{}_to_{}_EquivocationDetection", P::SOURCE_NAME, P::TARGET_NAME), |
| 326 | + move |source_client, target_client, _metrics| { |
| 327 | + EquivocationDetectionLoop::run( |
| 328 | + source_client, |
| 329 | + target_client, |
| 330 | + tick, |
| 331 | + exit_signal.clone(), |
| 332 | + ) |
| 333 | + }, |
| 334 | + ) |
| 335 | + .await |
| 336 | +} |
0 commit comments