Skip to content

Commit c72ab5d

Browse files
committed
fix bug with late batch commitment - uppend a test for the case
1 parent 8360b77 commit c72ab5d

File tree

3 files changed

+197
-22
lines changed

3 files changed

+197
-22
lines changed

ethexe/consensus/src/announces.rs

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{Result, anyhow};
1+
use anyhow::{Result, anyhow, ensure};
22
use ethexe_common::{
33
Announce, HashOf, SimpleBlockData,
44
db::{AnnounceStorageRW, BlockMetaStorageRW, LatestDataStorageRO, OnChainStorageRO},
@@ -46,7 +46,7 @@ impl<DB: AnnounceStorageRW + BlockMetaStorageRW + OnChainStorageRO + LatestDataS
4646
}
4747

4848
fn include_announce(&self, announce: Announce) -> Result<HashOf<Announce>> {
49-
tracing::trace!(announce = %announce.to_hash(), "Including announce");
49+
tracing::trace!(announce = %announce.to_hash(), "Including announce...");
5050

5151
let block_hash = announce.block_hash;
5252
let announce_hash = self.set_announce(announce);
@@ -114,7 +114,9 @@ pub fn propagate_announces(
114114

115115
announces_chain_recovery_if_needed(
116116
db,
117+
&block,
117118
last_committed_announce_hash,
119+
commitment_delay_limit,
118120
&mut missing_announces,
119121
)?;
120122

@@ -136,31 +138,118 @@ pub fn propagate_announces(
136138
commitment_delay_limit,
137139
)?;
138140
}
141+
142+
debug_assert!(
143+
db.block_meta(block.hash)
144+
.announces
145+
.into_iter()
146+
.flatten()
147+
.next()
148+
.is_some(),
149+
"at least one announce must be propagated for block({})",
150+
block.hash
151+
);
139152
}
140153

141154
Ok(())
142155
}
143156

157+
/// Recover announces chain if it was committed but not included yet by this node.
158+
/// For example node has following chain:
159+
/// ```text
160+
/// [B1] <-- [B2] <-- [B3] <-- [B4] <-- [B5] (blocks)
161+
/// | | | |
162+
/// (A1) <-- (A2) <-- (A3) <-- (A4) (announces)
163+
/// ```
164+
/// Then node checks events that unknown announce `(A3')` was committed at block `B5`.
165+
/// Then node have to recover the chain of announces to include `(A3')` and its predecessors:
166+
/// ```text
167+
/// [B1] <-- [B2] <-- [B3] <-- [B4] <-- [B5] (blocks)
168+
/// | | | |
169+
/// (A1) <-- (A2) <-- (A3) <-- (A4) (announces)
170+
/// \
171+
/// ---- (A2') <- (A3') <- (A4') (recovered announces)
172+
/// ```
173+
/// where `(A3')` and `(A2')` are committed
174+
/// and must be presented in `missing_announces` if they are not included yet,
175+
/// and `(A4')` is base announce propagated from `(A3')`.
144176
fn announces_chain_recovery_if_needed(
145177
db: &impl DBExt,
178+
block: &SimpleBlockData,
146179
last_committed_announce_hash: HashOf<Announce>,
180+
commitment_delay_limit: u32,
147181
missing_announces: &mut HashMap<HashOf<Announce>, Announce>,
148182
) -> Result<()> {
149-
let mut announce_hash = last_committed_announce_hash;
150-
while !db.announce_is_included(announce_hash) {
151-
tracing::debug!(announce = %announce_hash, "Committed announces was not included yet, including...");
152-
153-
let announce = missing_announces.remove(&announce_hash).ok_or_else(|| {
183+
// TODO +_+_+: append recovery from rejected announces
184+
// if node received announce, which was rejected because of incorrect parent,
185+
// but later we receive event from ethereum that parent announce was committed,
186+
// than node should use previously rejected announce to recover the chain.
187+
188+
// Recover backwards the chain of committed announces till last included one
189+
// According to T1, this chain must not be longer than commitment_delay_limit
190+
let mut last_committed_announce_block_hash = None;
191+
let mut current_announce_hash = last_committed_announce_hash;
192+
let mut count = 0;
193+
while count < commitment_delay_limit && !db.announce_is_included(current_announce_hash) {
194+
tracing::debug!(announce = %current_announce_hash, "Committed announces was not included yet, try to recover...");
195+
196+
let announce = missing_announces.remove(&current_announce_hash).ok_or_else(|| {
154197
anyhow!(
155-
"Committed announce {announce_hash} is missing, but not found in missing announces"
198+
"Committed announce {current_announce_hash} is missing, but not found in missing announces"
156199
)
157200
})?;
158201

159-
announce_hash = announce.parent;
202+
last_committed_announce_block_hash.get_or_insert(announce.block_hash);
203+
204+
current_announce_hash = announce.parent;
205+
count += 1;
160206

161207
db.include_announce(announce)?;
162208
}
163209

210+
let Some(last_committed_announce_block_hash) = last_committed_announce_block_hash else {
211+
// No committed announces were missing, no need to recover
212+
return Ok(());
213+
};
214+
215+
// If error: DB is corrupted, or incorrect commitment detected (have not base announce committed after commitment delay limit)
216+
ensure!(
217+
db.announce_is_included(current_announce_hash),
218+
"{current_announce_hash} is not included after checking {commitment_delay_limit} announces",
219+
);
220+
221+
// Recover forward the chain filling with base announces
222+
223+
// First collect a chain of blocks from `last_committed_announce_block_hash` to `block` (exclusive)
224+
// According to T1, this chain must not be longer than commitment_delay_limit
225+
let mut current_block_hash = block.header.parent_hash;
226+
let mut chain = VecDeque::new();
227+
let mut count = 0;
228+
while count < commitment_delay_limit && current_block_hash != last_committed_announce_block_hash
229+
{
230+
chain.push_front(current_block_hash);
231+
current_block_hash = db
232+
.block_header(current_block_hash)
233+
.ok_or_else(|| anyhow!("header not found for block({current_block_hash})"))?
234+
.parent_hash;
235+
count += 1;
236+
}
237+
238+
// If error: DB is corrupted, or incorrect commitment detected (have not base announce committed after commitment delay limit)
239+
ensure!(
240+
current_block_hash == last_committed_announce_block_hash,
241+
"last committed announce block {last_committed_announce_block_hash} not found \
242+
in parent chain of block {} within {commitment_delay_limit} blocks",
243+
block.hash
244+
);
245+
246+
// Now propagate base announces along the chain
247+
let mut parent_announce_hash = last_committed_announce_hash;
248+
for block_hash in chain {
249+
let new_base_announce = Announce::base(block_hash, parent_announce_hash);
250+
parent_announce_hash = db.include_announce(new_base_announce)?;
251+
}
252+
164253
Ok(())
165254
}
166255

@@ -224,6 +313,12 @@ fn propagate_one_base_announce(
224313
{
225314
// We found last committed announce in the neighbor branch, until commitment delay limit
226315
// that means this branch is already expired.
316+
tracing::trace!(
317+
predecessor = %predecessor,
318+
parent_announce = %parent_announce_hash,
319+
latest_committed_announce = %last_committed_announce_hash,
320+
"neighbor announce branch contains last committed announce, so parent announce branch is expired",
321+
);
227322
return Ok(());
228323
};
229324

@@ -332,10 +427,7 @@ fn find_announces_common_predecessor(
332427
.announces
333428
.ok_or_else(|| anyhow!("announces not found for block {block_hash}"))?;
334429

335-
for _ in 0..commitment_delay_limit
336-
.checked_sub(1)
337-
.ok_or_else(|| anyhow!("unsupported 0 commitment delay limit"))?
338-
{
430+
for _ in 0..commitment_delay_limit {
339431
if announces.contains(&start_announce_hash) {
340432
if announces.len() != 1 {
341433
return Err(anyhow!(

ethexe/consensus/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
//! The main components are:
2424
//! - [`ConsensusService`]: A trait defining the core interface for consensus services
2525
//! - [`ConsensusEvent`]: An enum representing various consensus events which have to be processed by outer services
26-
//! - [`SimpleConnectService`]: A basic implementation of "connect-node"
27-
//! - [`ValidatorService`]: Service for handling block validation
26+
//! - [`ConnectService`]: An implementation of consensus to run "connect-node"
27+
//! - [`ValidatorService`]: An implementation of consensus to run "validator-node"
2828
//!
2929
//! The crate is organized into several modules:
3030
//! - `connect`: Connection management functionality
3131
//! - `validator`: Block validation services and implementations
3232
//! - `utils`: Utility functions and shared data structures
33+
//! - `announces`: Logic for handling announce branching and related operations
3334
3435
use anyhow::Result;
3536
use ethexe_common::{

ethexe/consensus/src/validator/initial.rs

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,14 @@ mod tests {
317317
chain.blocks[last].as_prepared_mut().announces = None;
318318

319319
// create 2 missing announces from blocks last - 2 and last - 1
320-
let announce8 = Announce::with_default_gas(
320+
let announce2 = Announce::with_default_gas(
321321
chain.blocks[last - 2].hash,
322322
chain.block_top_announce_hash(last - 3),
323323
);
324-
let announce9 =
325-
Announce::with_default_gas(chain.blocks[last - 1].hash, announce8.to_hash());
324+
let announce1 =
325+
Announce::with_default_gas(chain.blocks[last - 1].hash, announce2.to_hash());
326326

327-
chain.blocks[last].as_prepared_mut().last_committed_announce = announce9.to_hash();
327+
chain.blocks[last].as_prepared_mut().last_committed_announce = announce1.to_hash();
328328
let chain = chain.setup(&ctx.core.db);
329329
let block = chain.blocks[last].to_simple();
330330

@@ -335,22 +335,25 @@ mod tests {
335335
.process_prepared_block(block.hash)
336336
.unwrap();
337337
assert!(state.is_initial(), "got {:?}", state);
338+
339+
let tail = chain.block_top_announce_hash(last - 4);
338340
let expected_request = AnnouncesRequest {
339341
head: chain.blocks[last].as_prepared().last_committed_announce,
340-
until: chain.block_top_announce_hash(last - 3).into(),
342+
until: tail.into(),
341343
};
342344
assert_eq!(state.context().output, vec![expected_request.into()]);
343345

344346
let response = AnnouncesResponse {
345347
announces: vec![
348+
chain.announces.get(&tail).unwrap().announce.clone(),
346349
chain
347350
.announces
348351
.get(&chain.block_top_announce_hash(last - 3))
349352
.unwrap()
350353
.announce
351354
.clone(),
352-
announce8.clone(),
353-
announce9.clone(),
355+
announce2.clone(),
356+
announce1.clone(),
354357
],
355358
}
356359
.try_into_checked(expected_request)
@@ -552,4 +555,83 @@ mod tests {
552555
ConsensusEvent::Warning(_)
553556
));
554557
}
558+
559+
#[test]
560+
fn commitment_with_delay() {
561+
gear_utils::init_default_logger();
562+
563+
let (ctx, _, _) = mock_validator_context();
564+
let last = 10;
565+
let mut chain = BlockChain::mock(last as u32);
566+
567+
// create unknown announce for block last - 6
568+
let unknown_announce = Announce::with_default_gas(
569+
chain.blocks[last - 6].hash,
570+
chain.block_top_announce_hash(last - 7),
571+
);
572+
let unknown_announce_hash = unknown_announce.to_hash();
573+
574+
// remove announces from 5 latest blocks
575+
for idx in last - 4..=last {
576+
chain.blocks[idx]
577+
.as_prepared_mut()
578+
.announces
579+
.iter()
580+
.flatten()
581+
.for_each(|ah| {
582+
chain.announces.remove(ah);
583+
});
584+
chain.blocks[idx].as_prepared_mut().announces = None;
585+
586+
// set unknown_announce as last committed announce
587+
chain.blocks[idx].as_prepared_mut().last_committed_announce = unknown_announce_hash;
588+
}
589+
590+
let chain = chain.setup(&ctx.core.db);
591+
let block = chain.blocks[last].to_simple();
592+
593+
let state = Initial::create_with_chain_head(ctx, block.clone())
594+
.unwrap()
595+
.process_synced_block(block.hash)
596+
.unwrap()
597+
.process_prepared_block(block.hash)
598+
.unwrap();
599+
600+
assert!(state.is_initial(), "got {:?}", state);
601+
602+
let expected_request = AnnouncesRequest {
603+
head: chain.blocks[last].as_prepared().last_committed_announce,
604+
until: chain.block_top_announce_hash(last - 8).into(),
605+
};
606+
assert_eq!(state.context().output, vec![expected_request.into()]);
607+
608+
let response = AnnouncesResponse {
609+
announces: vec![
610+
chain
611+
.announces
612+
.get(&chain.block_top_announce_hash(last - 8))
613+
.unwrap()
614+
.announce
615+
.clone(),
616+
chain
617+
.announces
618+
.get(&chain.block_top_announce_hash(last - 7))
619+
.unwrap()
620+
.announce
621+
.clone(),
622+
unknown_announce,
623+
],
624+
}
625+
.try_into_checked(expected_request)
626+
.unwrap();
627+
628+
let state = state.process_announces_response(response).unwrap();
629+
assert!(state.is_subordinate(), "got {:?}", state);
630+
assert_eq!(
631+
state.context().output.len(),
632+
1,
633+
"No additional output expected, got {:?}",
634+
state.context().output
635+
);
636+
}
555637
}

0 commit comments

Comments
 (0)