Skip to content

Commit df3fbdf

Browse files
committed
Merge origin/main into feat/el-state-sync
2 parents 6cb3334 + 0401725 commit df3fbdf

File tree

22 files changed

+1540
-146
lines changed

22 files changed

+1540
-146
lines changed

crates/data-chain/src/primary/attestation_collector.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use cipherbft_types::{Hash, ValidatorId};
1010
use std::collections::HashMap;
1111
use std::time::{Duration, Instant};
1212

13+
/// Maximum number of timeout resets allowed for batched Cars.
14+
/// This prevents Cars from being stuck indefinitely when peers cannot sync batches.
15+
const MAX_BATCHED_CAR_RESETS: u32 = 10;
16+
1317
/// Pending attestation collection for a Car
1418
#[derive(Debug)]
1519
struct PendingAttestation {
@@ -24,6 +28,8 @@ struct PendingAttestation {
2428
started_at: Instant,
2529
/// Current backoff duration
2630
current_backoff: Duration,
31+
/// Number of times the timeout has been reset (for batched Cars)
32+
reset_count: u32,
2733
}
2834

2935
impl PendingAttestation {
@@ -34,6 +40,7 @@ impl PendingAttestation {
3440
attestations: HashMap::new(),
3541
started_at: Instant::now(),
3642
current_backoff: base_timeout,
43+
reset_count: 0,
3744
}
3845
}
3946
}
@@ -237,17 +244,31 @@ impl AttestationCollector {
237244
/// Reset timeout for a Car without losing existing attestations
238245
///
239246
/// Used for batched Cars that need extra time for peers to sync batch data.
240-
/// Returns true if the Car was found and reset, false otherwise.
247+
/// Returns true if the Car was found and reset successfully.
248+
/// Returns false if the Car was not found OR if max reset count exceeded.
249+
///
250+
/// This prevents Cars from being stuck indefinitely when peers cannot sync batches
251+
/// (e.g., due to position divergence where peers reject the Car).
241252
pub fn reset_timeout(&mut self, car_hash: &Hash) -> bool {
242253
if let Some(pending) = self.pending.get_mut(car_hash) {
254+
// Check if we've exceeded max resets
255+
if pending.reset_count >= MAX_BATCHED_CAR_RESETS {
256+
return false;
257+
}
243258
pending.started_at = std::time::Instant::now();
244259
pending.current_backoff = self.base_timeout;
260+
pending.reset_count += 1;
245261
true
246262
} else {
247263
false
248264
}
249265
}
250266

267+
/// Get the reset count for a pending Car
268+
pub fn reset_count(&self, car_hash: &Hash) -> Option<u32> {
269+
self.pending.get(car_hash).map(|p| p.reset_count)
270+
}
271+
251272
/// Get current attestation count for a Car
252273
pub fn attestation_count(&self, car_hash: &Hash) -> Option<usize> {
253274
self.pending.get(car_hash).map(|p| p.attestations.len() + 1) // +1 for self

crates/data-chain/src/primary/config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,18 @@ pub struct PrimaryConfig {
4444

4545
impl PrimaryConfig {
4646
/// Create a new configuration with defaults
47+
///
48+
/// Default intervals are tuned for responsive transaction processing:
49+
/// - `car_interval`: 50ms matches Worker flush interval for quick Car creation
50+
/// - `max_empty_cars`: 1 reduces timing races where empty Cars compete with tx-bearing Cars
4751
pub fn new(validator_id: ValidatorId, bls_secret_key: BlsSecretKey) -> Self {
4852
Self {
4953
validator_id,
5054
bls_secret_key,
51-
car_interval: Duration::from_millis(100),
55+
car_interval: Duration::from_millis(50), // Faster Car creation
5256
attestation_timeout_base: Duration::from_millis(500),
5357
attestation_timeout_max: Duration::from_millis(5000),
54-
max_empty_cars: 3,
58+
max_empty_cars: 1, // Reduce empty Car spam during tx processing
5559
worker_count: 1,
5660
equivocation_retention: 1000,
5761
startup_delay: Duration::from_secs(2),

crates/data-chain/src/primary/runner.rs

Lines changed: 121 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,15 @@ impl Primary {
571571
"Checked for ready Cars after batch sync"
572572
);
573573
for car in ready_cars {
574+
// Skip if this is our own Car (defensive check)
575+
if car.proposer == self.config.validator_id {
576+
trace!(
577+
position = car.position,
578+
"Skipping attestation for our own Car in batch sync"
579+
);
580+
continue;
581+
}
582+
574583
// IMPORTANT: The Car was already validated when first received
575584
// (position check, signature, parent_ref all passed). We queued
576585
// it only because batches were missing. Now that batches are
@@ -624,6 +633,11 @@ impl Primary {
624633
"Received consensus decision notification"
625634
);
626635

636+
// CRITICAL: Update last_cut to the DECIDED cut (not what we proposed)
637+
// This ensures monotonicity checks in form_cut use the actual consensus
638+
// state, preventing stale proposed cuts from causing violations
639+
self.last_cut = Some(cut.clone());
640+
627641
// CRITICAL: Sync position tracking from the decided Cut BEFORE advancing state
628642
// This ensures validators that missed some CARs during collection still have
629643
// consistent position tracking for subsequent heights
@@ -858,6 +872,15 @@ impl Primary {
858872

859873
/// Handle a received Car
860874
async fn handle_received_car(&mut self, from: ValidatorId, car: Car) {
875+
// Skip if this is our own Car (we handle our own Cars through attestation collection)
876+
if car.proposer == self.config.validator_id {
877+
trace!(
878+
position = car.position,
879+
"Ignoring received Car for our own Car"
880+
);
881+
return;
882+
}
883+
861884
// DIAGNOSTIC: Log at INFO level for batched Cars to trace attestation flow
862885
let batch_count = car.batch_digests.len();
863886
if batch_count > 0 {
@@ -982,6 +1005,48 @@ impl Primary {
9821005
"Position gap detected, initiating gap recovery"
9831006
);
9841007

1008+
// CRITICAL FIX: Update position tracking when we're behind
1009+
//
1010+
// If actual > expected, we've fallen behind this validator's position.
1011+
// The Car signature was already validated by core.handle_car(), so we
1012+
// know this is a legitimate Car. Update tracking to prevent death spiral:
1013+
//
1014+
// Without this fix:
1015+
// 1. We miss some Cars from validator X
1016+
// 2. New Cars from X trigger position gap errors
1017+
// 3. We never attest to X's new Cars
1018+
// 4. X's Cars never reach quorum
1019+
// 5. Transactions stuck forever
1020+
//
1021+
// With this fix:
1022+
// - We update tracking to actual position
1023+
// - Next Car from X (at actual+1) will be attested normally
1024+
// - Network recovers quickly
1025+
if actual > expected {
1026+
let car_hash = car.hash();
1027+
info!(
1028+
proposer = %validator,
1029+
expected,
1030+
actual,
1031+
car_hash = %car_hash,
1032+
"Updating position tracking to recover from gap - signature was valid"
1033+
);
1034+
self.state.update_last_seen(validator, actual, car_hash);
1035+
1036+
// Also generate attestation since signature is valid and we're syncing
1037+
// This helps the network reach quorum faster
1038+
let attestation = self.core.create_attestation(&car);
1039+
DCL_ATTESTATIONS_SENT.inc();
1040+
self.network
1041+
.send_attestation(car.proposer, &attestation)
1042+
.await;
1043+
info!(
1044+
proposer = %validator,
1045+
position = actual,
1046+
"Generated attestation after position gap recovery"
1047+
);
1048+
}
1049+
9851050
// Queue the out-of-order Car for later processing
9861051
if !self.state.is_awaiting_gap_sync(&validator, actual) {
9871052
self.state.queue_car_awaiting_gap(car.clone(), expected);
@@ -1228,6 +1293,30 @@ impl Primary {
12281293
"Received valid CarWithAttestation from peer"
12291294
);
12301295

1296+
// CRITICAL FIX: Update position tracking from attested Car broadcasts
1297+
//
1298+
// Without this, validators can fall into a position gap death spiral:
1299+
// 1. Validator A's Cars don't reach quorum (for whatever reason)
1300+
// 2. Other validators' last_seen_positions[A] becomes stale
1301+
// 3. When A broadcasts new Cars, others detect a "position gap"
1302+
// 4. No attestations are generated, A's Cars never reach quorum
1303+
// 5. The gap grows forever
1304+
//
1305+
// By updating position tracking when we receive a valid CarWithAttestation
1306+
// (which has quorum verification), we stay in sync even if we missed
1307+
// some intermediate Cars. This breaks the death spiral.
1308+
let current_pos = self.state.last_seen_positions.get(&car.proposer).copied();
1309+
if current_pos.is_none_or(|p| car.position > p) {
1310+
self.state
1311+
.update_last_seen(car.proposer, car.position, car_hash);
1312+
info!(
1313+
proposer = %car.proposer,
1314+
old_position = current_pos,
1315+
new_position = car.position,
1316+
"Updated position tracking from CarWithAttestation broadcast"
1317+
);
1318+
}
1319+
12311320
// Persist attestation to storage if available
12321321
if let Some(ref storage) = self.storage {
12331322
if let Err(e) = storage.put_attestation(attestation.clone()).await {
@@ -1377,20 +1466,39 @@ impl Primary {
13771466
);
13781467
// Could re-broadcast Car here
13791468
} else {
1380-
// IMPORTANT: Don't timeout Cars with batches!
1381-
// Peers need extra time to sync batch data before they can attest.
1382-
// Without this, batched Cars timeout before peers finish syncing,
1383-
// causing attestations to be rejected with UnknownCar error.
1469+
// Max backoff exceeded
13841470
if has_batches {
1385-
// Reset the timeout without losing existing attestations
1386-
info!(
1387-
hash = %hash,
1388-
position = car.position,
1389-
batch_count = car.batch_digests.len(),
1390-
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
1391-
"Extending timeout for batched Car - peers may still be syncing"
1392-
);
1393-
self.attestation_collector.reset_timeout(&hash);
1471+
// Try to extend timeout for batched Cars that need more time.
1472+
// reset_timeout() returns false if max resets exceeded.
1473+
let reset_count = self.attestation_collector.reset_count(&hash).unwrap_or(0);
1474+
if self.attestation_collector.reset_timeout(&hash) {
1475+
info!(
1476+
hash = %hash,
1477+
position = car.position,
1478+
batch_count = car.batch_digests.len(),
1479+
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
1480+
reset_count = reset_count + 1,
1481+
"Extending timeout for batched Car - peers may still be syncing"
1482+
);
1483+
} else {
1484+
// Max resets exceeded - drop the Car and restore batches
1485+
warn!(
1486+
hash = %hash,
1487+
position = car.position,
1488+
batch_count = car.batch_digests.len(),
1489+
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
1490+
reset_count,
1491+
"Batched Car exceeded max timeout resets - dropping and restoring batches"
1492+
);
1493+
self.attestation_collector.remove(&hash);
1494+
self.state.remove_pending_car(&hash);
1495+
1496+
// Restore batch digests to pending so they can be re-batched
1497+
// This ensures transactions are not lost
1498+
for digest in &car.batch_digests {
1499+
self.state.add_batch_digest(digest.clone());
1500+
}
1501+
}
13941502
} else {
13951503
warn!(
13961504
hash = %hash,

crates/data-chain/src/primary/state.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,15 +397,50 @@ impl PrimaryState {
397397
pub fn mark_attested(&mut self, car: Car, aggregated: AggregatedAttestation) {
398398
let validator = car.proposer;
399399
let new_has_batches = !car.batch_digests.is_empty();
400+
let last_included = self
401+
.last_included_positions
402+
.get(&validator)
403+
.copied()
404+
.unwrap_or(0);
405+
406+
// CRITICAL: Never store a Car that has already been finalized
407+
// This prevents monotonicity violations in Cut formation
408+
if car.position <= last_included {
409+
tracing::debug!(
410+
validator = %validator,
411+
car_position = car.position,
412+
last_included,
413+
"Rejecting attested Car - position already finalized"
414+
);
415+
return;
416+
}
400417

401418
// Check if we should replace the existing attested Car
402419
if let Some((existing_car, _)) = self.attested_cars.get(&validator) {
403420
let existing_has_batches = !existing_car.batch_digests.is_empty();
404-
let last_included = self
405-
.last_included_positions
406-
.get(&validator)
407-
.copied()
408-
.unwrap_or(0);
421+
422+
// CRITICAL: Never go backwards in position
423+
// This prevents monotonicity violations in Cut formation
424+
if car.position < existing_car.position {
425+
tracing::debug!(
426+
validator = %validator,
427+
existing_position = existing_car.position,
428+
new_position = car.position,
429+
"Rejecting attested Car - would go backwards in position"
430+
);
431+
return;
432+
}
433+
434+
// Don't replace a Car with batches with an empty Car at same position
435+
if existing_has_batches && !new_has_batches && car.position == existing_car.position {
436+
tracing::debug!(
437+
validator = %validator,
438+
position = car.position,
439+
existing_batches = existing_car.batch_digests.len(),
440+
"Preserving Car with batches over empty Car at same position"
441+
);
442+
return;
443+
}
409444

410445
// Don't replace a Car with batches with an empty Car, UNLESS the
411446
// existing Car's position has already been included in a decided Cut

crates/data-chain/src/worker/config.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@ pub struct WorkerConfig {
2020

2121
impl WorkerConfig {
2222
/// Create a new configuration with defaults
23+
///
24+
/// Default batch thresholds are tuned for responsive transaction processing:
25+
/// - `max_batch_txs`: 100 transactions triggers immediate batch flush
26+
/// - `flush_interval`: 50ms ensures batches don't wait too long
2327
pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self {
2428
Self {
2529
validator_id,
2630
worker_id,
27-
max_batch_bytes: 1024 * 1024, // 1MB
28-
max_batch_txs: 1000,
29-
flush_interval: Duration::from_millis(100),
31+
max_batch_bytes: 1024 * 1024, // 1MB
32+
max_batch_txs: 100, // Flush after 100 txs for responsive batching
33+
flush_interval: Duration::from_millis(50), // Faster time-based flush
3034
}
3135
}
3236

0 commit comments

Comments
 (0)