Skip to content

Commit f367379

Browse files
committed
Miner listens to signers state machine updates and stores their tx replay set on the side
Signed-off-by: Jacinta Ferrant <[email protected]>
1 parent 87b628e commit f367379

File tree

3 files changed

+147
-6
lines changed

3 files changed

+147
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE
1111

1212
- Added new `ValidateRejectCode` values to the `/v3/block_proposal` endpoint
1313
- Added `StateMachineUpdateContent::V1` to support a vector of `StacksTransaction` expected to be replayed in subsequent Stacks blocks
14+
- Updated `StackerDBListener` to now listens for signers' state machine updates and stores replay info to the side to enable a miner to perform transaction replay
1415

1516
### Changed
1617

testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash};
2828
use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState};
2929
use stacks::chainstate::stacks::boot::{RewardSet, MINERS_NAME};
3030
use stacks::chainstate::stacks::db::StacksChainState;
31-
use stacks::chainstate::stacks::Error as ChainstateError;
31+
use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction};
3232
use stacks::codec::StacksMessageCodec;
3333
use stacks::libstackerdb::StackerDBChunkData;
3434
use stacks::net::stackerdb::StackerDBs;
@@ -97,6 +97,7 @@ impl SignerCoordinator {
9797
reward_set,
9898
election_block,
9999
burnchain,
100+
config,
100101
)?;
101102
let is_mainnet = config.is_mainnet();
102103
let rpc_socket = config
@@ -484,6 +485,13 @@ impl SignerCoordinator {
484485
.get_tenure_extend_timestamp(self.weight_threshold)
485486
}
486487

488+
/// Get the transactions that at least 70% of the signing power are
489+
/// expecting to be replayed.
490+
pub fn get_replay_transactions(&self) -> Vec<StacksTransaction> {
491+
self.stackerdb_comms
492+
.get_replay_transactions(self.weight_threshold)
493+
}
494+
487495
/// Check if the tenure needs to change
488496
fn check_burn_tip_changed(&self, sortdb: &SortitionDB) -> bool {
489497
let cur_burn_chain_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn())

testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ use std::sync::{Arc, Condvar, Mutex};
2222
use std::time::Duration;
2323

2424
use hashbrown::{HashMap, HashSet};
25-
use libsigner::v0::messages::{BlockAccepted, BlockResponse, SignerMessage as SignerMessageV0};
26-
use libsigner::SignerEvent;
25+
use libsigner::v0::messages::{
26+
BlockAccepted, BlockResponse, MessageSlotID, SignerMessage as SignerMessageV0,
27+
StateMachineUpdate, StateMachineUpdateContent,
28+
};
29+
use libsigner::{SignerEvent, SignerSession, StackerDBSession};
2730
use stacks::burnchains::Burnchain;
2831
use stacks::chainstate::burn::BlockSnapshot;
2932
use stacks::chainstate::nakamoto::NakamotoBlockHeader;
3033
use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, SIGNERS_NAME};
3134
use stacks::chainstate::stacks::events::StackerDBChunksEvent;
32-
use stacks::chainstate::stacks::Error as ChainstateError;
35+
use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction};
36+
use stacks::codec::StacksMessageCodec;
37+
use stacks::net::stackerdb::StackerDBs;
3338
use stacks::types::chainstate::StacksPublicKey;
3439
use stacks::types::PublicKey;
3540
use stacks::util::get_epoch_time_secs;
@@ -40,6 +45,7 @@ use stacks_common::util::tests::TestFlag;
4045

4146
use super::Error as NakamotoNodeError;
4247
use crate::event_dispatcher::StackerDBChannel;
48+
use crate::Config;
4349

4450
#[cfg(test)]
4551
/// Fault injection flag to prevent the miner from seeing enough signer signatures.
@@ -68,6 +74,12 @@ pub(crate) struct TimestampInfo {
6874
pub weight: u32,
6975
}
7076

77+
#[derive(Debug, Clone)]
78+
pub(crate) struct ReplayInfo {
79+
pub transactions: Vec<StacksTransaction>,
80+
pub weight: u32,
81+
}
82+
7183
/// The listener for the StackerDB, which listens for messages from the
7284
/// signers and tracks the state of block signatures and idle timestamps.
7385
pub struct StackerDBListener {
@@ -96,6 +108,11 @@ pub struct StackerDBListener {
96108
/// - key: StacksPublicKey
97109
/// - value: TimestampInfo
98110
pub(crate) signer_idle_timestamps: Arc<Mutex<HashMap<StacksPublicKey, TimestampInfo>>>,
111+
/// Tracks any replay transactions from signers to decide when the miner should
112+
/// attempt to replay reorged blocks
113+
/// - key: StacksPublicKey
114+
/// - value: Vec<StacksTransaction>
115+
pub(crate) replay_info: Arc<Mutex<HashMap<StacksPublicKey, ReplayInfo>>>,
99116
}
100117

101118
/// Interface for other threads to retrieve info from the StackerDBListener
@@ -109,6 +126,11 @@ pub struct StackerDBListenerComms {
109126
/// - key: StacksPublicKey
110127
/// - value: TimestampInfo
111128
signer_idle_timestamps: Arc<Mutex<HashMap<StacksPublicKey, TimestampInfo>>>,
129+
/// Tracks any replay transactions from signers to decide when the miner should
130+
/// attempt to replay reorged blocks
131+
/// - key: StacksPublicKey
132+
/// - value: ReplayInfo
133+
replay_info: Arc<Mutex<HashMap<StacksPublicKey, ReplayInfo>>>,
112134
}
113135

114136
impl StackerDBListener {
@@ -119,6 +141,7 @@ impl StackerDBListener {
119141
reward_set: &RewardSet,
120142
burn_tip: &BlockSnapshot,
121143
burnchain: &Burnchain,
144+
config: &Config,
122145
) -> Result<Self, ChainstateError> {
123146
let (receiver, replaced_other) = stackerdb_channel
124147
.lock()
@@ -161,6 +184,60 @@ impl StackerDBListener {
161184
})
162185
.collect::<Result<HashMap<_, _>, ChainstateError>>()?;
163186

187+
let reward_cycle = burnchain
188+
.block_height_to_reward_cycle(burn_tip.block_height)
189+
.expect("BUG: unknown reward cycle");
190+
let signers_contract_id = MessageSlotID::StateMachineUpdate
191+
.stacker_db_contract(config.is_mainnet(), reward_cycle);
192+
let rpc_socket = config
193+
.node
194+
.get_rpc_loopback()
195+
.ok_or_else(|| ChainstateError::MinerAborted)?;
196+
let mut signers_session =
197+
StackerDBSession::new(&rpc_socket.to_string(), signers_contract_id.clone());
198+
let stackerdbs = StackerDBs::connect(&config.get_stacker_db_file_path(), false)?;
199+
let slot_ids: Vec<_> = stackerdbs
200+
.get_signers(&signers_contract_id)
201+
.expect("FATAL: could not get signers from stacker DB")
202+
.into_iter()
203+
.enumerate()
204+
.map(|(slot_id, _)| {
205+
u32::try_from(slot_id).expect("FATAL: too many signers to fit into u32 range")
206+
})
207+
.collect();
208+
let chunks = signers_session
209+
.get_latest_chunks(&slot_ids)
210+
.inspect_err(|e| warn!("Unable to read the latest signer state from signer db: {e}."))
211+
.unwrap_or_default();
212+
let mut replay_infos = HashMap::new();
213+
for (chunk, slot_id) in chunks.into_iter().zip(slot_ids) {
214+
let Some(chunk) = chunk else {
215+
continue;
216+
};
217+
let Some(signer_entry) = &signer_entries.get(&slot_id) else {
218+
continue;
219+
};
220+
let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) else {
221+
continue;
222+
};
223+
if let Ok(SignerMessageV0::StateMachineUpdate(update)) =
224+
SignerMessageV0::consensus_deserialize(&mut chunk.as_slice())
225+
{
226+
let transactions = match update.content {
227+
StateMachineUpdateContent::V0 { .. } => vec![],
228+
StateMachineUpdateContent::V1 {
229+
replay_transactions,
230+
..
231+
} => replay_transactions,
232+
};
233+
let replay_info = ReplayInfo {
234+
transactions,
235+
weight: signer_entry.weight,
236+
};
237+
replay_infos.insert(signer_pubkey, replay_info);
238+
}
239+
}
240+
164241
Ok(Self {
165242
stackerdb_channel,
166243
receiver: Some(receiver),
@@ -172,13 +249,15 @@ impl StackerDBListener {
172249
signer_entries,
173250
blocks: Arc::new((Mutex::new(HashMap::new()), Condvar::new())),
174251
signer_idle_timestamps: Arc::new(Mutex::new(HashMap::new())),
252+
replay_info: Arc::new(Mutex::new(replay_infos)),
175253
})
176254
}
177255

178256
pub fn get_comms(&self) -> StackerDBListenerComms {
179257
StackerDBListenerComms {
180258
blocks: self.blocks.clone(),
181259
signer_idle_timestamps: self.signer_idle_timestamps.clone(),
260+
replay_info: self.replay_info.clone(),
182261
}
183262
}
184263

@@ -445,8 +524,8 @@ impl StackerDBListener {
445524
| SignerMessageV0::MockBlock(_) => {
446525
debug!("Received mock message. Ignoring.");
447526
}
448-
SignerMessageV0::StateMachineUpdate(_) => {
449-
debug!("Received state machine update message. Ignoring.");
527+
SignerMessageV0::StateMachineUpdate(update) => {
528+
self.update_replay_info(signer_pubkey, signer_entry.weight, update);
450529
}
451530
};
452531
}
@@ -472,6 +551,32 @@ impl StackerDBListener {
472551
idle_timestamps.insert(signer_pubkey, timestamp_info);
473552
}
474553

554+
fn update_replay_info(
555+
&self,
556+
signer_pubkey: StacksPublicKey,
557+
weight: u32,
558+
update: StateMachineUpdate,
559+
) {
560+
let transactions = match update.content {
561+
StateMachineUpdateContent::V0 { .. } => vec![],
562+
StateMachineUpdateContent::V1 {
563+
replay_transactions,
564+
..
565+
} => replay_transactions,
566+
};
567+
let mut replay_infos = self
568+
.replay_info
569+
.lock()
570+
.expect("FATAL: failed to lock idle timestamps");
571+
572+
// Update the map with the replay info and weight
573+
let replay_info = ReplayInfo {
574+
transactions,
575+
weight,
576+
};
577+
replay_infos.insert(signer_pubkey, replay_info);
578+
}
579+
475580
/// Do we ignore signer signatures?
476581
#[cfg(test)]
477582
fn fault_injection_ignore_signatures() -> bool {
@@ -597,4 +702,31 @@ impl StackerDBListenerComms {
597702
// tenure.
598703
u64::MAX
599704
}
705+
706+
/// Get the transactions that at least 70% of the signing power expect to be replayed in
707+
/// the next stacks block
708+
pub fn get_replay_transactions(&self, weight_threshold: u32) -> Vec<StacksTransaction> {
709+
let replay_info = self
710+
.replay_info
711+
.lock()
712+
.expect("FATAL: failed to lock replay transactions");
713+
714+
let replay_info = replay_info.values().collect::<Vec<_>>();
715+
let mut weights: HashMap<&Vec<StacksTransaction>, u32> = HashMap::new();
716+
for info in replay_info {
717+
// We only care about signers voting for us to replay a specific set of transactions
718+
if info.transactions.is_empty() {
719+
continue;
720+
}
721+
let entry = weights.entry(&info.transactions).or_default();
722+
*entry += info.weight;
723+
if *entry >= weight_threshold {
724+
debug!("SignerCoordinator: 70% threshold reached to attempt replay transactions";
725+
"replay_transactions" => ?info.transactions,
726+
);
727+
return info.transactions.clone();
728+
}
729+
}
730+
vec![]
731+
}
600732
}

0 commit comments

Comments
 (0)