Skip to content

Commit 7ee7fa0

Browse files
committed
feat: implement tx replay to manage fork while in progress, #5971
1 parent 3f4a052 commit 7ee7fa0

File tree

5 files changed

+1586
-13
lines changed

5 files changed

+1586
-13
lines changed

stacks-signer/src/v0/signer.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ pub struct Signer {
125125
pub global_state_evaluator: GlobalStateEvaluator,
126126
/// Whether to validate blocks with replay transactions
127127
pub validate_with_replay_tx: bool,
128+
/// TODO: To understand if keep this as a "local" state
129+
/// or if add this to the State Machine update
130+
/// Scope of Tx Replay in terms of Burn block boundaries
131+
/// - .0 is the fork originating the tx replay,
132+
/// - .1 is the canonical burnchain tip when Tx Replay begun
133+
pub tx_replay_scope: Option<(NewBurnBlock, NewBurnBlock)>,
128134
}
129135

130136
impl std::fmt::Display for SignerMode {
@@ -241,6 +247,7 @@ impl SignerTrait<SignerMessage> for Signer {
241247
recently_processed: RecentlyProcessedBlocks::new(),
242248
global_state_evaluator,
243249
validate_with_replay_tx: signer_config.validate_with_replay_tx,
250+
tx_replay_scope: None,
244251
}
245252
}
246253

@@ -263,7 +270,9 @@ impl SignerTrait<SignerMessage> for Signer {
263270

264271
let mut prior_state = self.local_state_machine.clone();
265272
if self.reward_cycle <= current_reward_cycle {
266-
self.local_state_machine.handle_pending_update(&self.signer_db, stacks_client, &self.proposal_config)
273+
self.local_state_machine.handle_pending_update(&self.signer_db, stacks_client,
274+
&self.proposal_config,
275+
&mut self.tx_replay_scope)
267276
.unwrap_or_else(|e| error!("{self}: failed to update local state machine for pending update"; "err" => ?e));
268277
}
269278

@@ -527,11 +536,14 @@ impl Signer {
527536
);
528537
panic!("{self} Failed to write burn block event to signerdb: {e}");
529538
});
539+
530540
self.local_state_machine
531541
.bitcoin_block_arrival(&self.signer_db, stacks_client, &self.proposal_config, Some(NewBurnBlock {
532542
burn_block_height: *burn_height,
533543
consensus_hash: *consensus_hash,
534-
}))
544+
}),
545+
&mut self.tx_replay_scope
546+
)
535547
.unwrap_or_else(|e| error!("{self}: failed to update local state machine for latest bitcoin block arrival"; "err" => ?e));
536548
*sortition_state = None;
537549
}

stacks-signer/src/v0/signer_state.rs

Lines changed: 133 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl LocalStateMachine {
9797
proposal_config: &ProposalEvalConfig,
9898
) -> Result<Self, SignerChainstateError> {
9999
let mut instance = Self::Uninitialized;
100-
instance.bitcoin_block_arrival(db, client, proposal_config, None)?;
100+
instance.bitcoin_block_arrival(db, client, proposal_config, None, &mut None)?;
101101

102102
Ok(instance)
103103
}
@@ -192,14 +192,19 @@ impl LocalStateMachine {
192192
db: &SignerDb,
193193
client: &StacksClient,
194194
proposal_config: &ProposalEvalConfig,
195+
tx_replay_scope: &mut Option<(NewBurnBlock, NewBurnBlock)>,
195196
) -> Result<(), SignerChainstateError> {
196197
let LocalStateMachine::Pending { update, .. } = self else {
197198
return self.check_miner_inactivity(db, client, proposal_config);
198199
};
199200
match update.clone() {
200-
StateMachineUpdate::BurnBlock(expected_burn_height) => {
201-
self.bitcoin_block_arrival(db, client, proposal_config, Some(expected_burn_height))
202-
}
201+
StateMachineUpdate::BurnBlock(expected_burn_height) => self.bitcoin_block_arrival(
202+
db,
203+
client,
204+
proposal_config,
205+
Some(expected_burn_height),
206+
tx_replay_scope,
207+
),
203208
}
204209
}
205210

@@ -497,6 +502,7 @@ impl LocalStateMachine {
497502
client: &StacksClient,
498503
proposal_config: &ProposalEvalConfig,
499504
mut expected_burn_block: Option<NewBurnBlock>,
505+
tx_replay_scope: &mut Option<(NewBurnBlock, NewBurnBlock)>,
500506
) -> Result<(), SignerChainstateError> {
501507
// set self to uninitialized so that if this function errors,
502508
// self is left as uninitialized.
@@ -558,6 +564,7 @@ impl LocalStateMachine {
558564
&expected_burn_block,
559565
&prior_state_machine,
560566
tx_replay_set.is_some(),
567+
tx_replay_scope,
561568
)? {
562569
tx_replay_set = ReplayTransactionSet::new(new_replay_set);
563570
}
@@ -889,6 +896,7 @@ impl LocalStateMachine {
889896
expected_burn_block: &NewBurnBlock,
890897
prior_state_machine: &SignerStateMachine,
891898
is_in_tx_replay_mode: bool,
899+
tx_replay_scope: &mut Option<(NewBurnBlock, NewBurnBlock)>,
892900
) -> Result<Option<Vec<StacksTransaction>>, SignerChainstateError> {
893901
if expected_burn_block.burn_block_height > prior_state_machine.burn_block_height {
894902
// no bitcoin fork, because we're advancing the burn block height
@@ -899,10 +907,45 @@ impl LocalStateMachine {
899907
return Ok(None);
900908
}
901909
if is_in_tx_replay_mode {
902-
// TODO: handle fork while still in replay
903-
info!("Detected bitcoin fork while in replay mode, will not try to handle the fork");
904-
return Ok(None);
910+
info!("Tx Replay: detected bitcoin fork while in replay mode. Tryng to handle the fork";
911+
"expected_burn_block.height" => expected_burn_block.burn_block_height,
912+
"expected_burn_block.hash" => %expected_burn_block.consensus_hash,
913+
"prior_state_machine.burn_block_height" => prior_state_machine.burn_block_height,
914+
"prior_state_machine.burn_block" => %prior_state_machine.burn_block,
915+
);
916+
917+
//TODO: Remove unwrap once decided the final tx_replay_scope structure format
918+
// and if to handle them as part of State machine update
919+
let curr_scope = tx_replay_scope.clone().unwrap();
920+
921+
let (fork_origin, past_tip) = &curr_scope;
922+
let is_deepest_fork =
923+
expected_burn_block.burn_block_height < fork_origin.burn_block_height;
924+
if !is_deepest_fork {
925+
//if it is within the scope or after - this is not a new fork, but the continue of a reorg
926+
info!("Tx Replay: nothing todo. Reorg in progress!");
927+
return Ok(None);
928+
}
929+
930+
let updated_replay_set;
931+
if let Some(replay_set) =
932+
self.compute_forked_txs_set(db, client, expected_burn_block, &past_tip)?
933+
{
934+
let updated_scope = (expected_burn_block.clone(), past_tip.clone());
935+
936+
info!("Tx Replay: replay set updated with {} tx(s)", replay_set.len();
937+
"tx_replay_set" => ?replay_set,
938+
"tx_replay_scope" => ?updated_scope);
939+
updated_replay_set = replay_set;
940+
*tx_replay_scope = Some(updated_scope);
941+
} else {
942+
info!("Tx Replay: replay set will be cleared, because the fork involves the previous reward cycle.");
943+
updated_replay_set = vec![];
944+
*tx_replay_scope = None;
945+
}
946+
return Ok(Some(updated_replay_set));
905947
}
948+
906949
info!("Signer State: fork detected";
907950
"expected_burn_block.height" => expected_burn_block.burn_block_height,
908951
"expected_burn_block.hash" => %expected_burn_block.consensus_hash,
@@ -920,11 +963,19 @@ impl LocalStateMachine {
920963
return Ok(None);
921964
}
922965
}
966+
923967
// Determine the tenures that were forked
924968
let mut parent_burn_block_info =
925969
db.get_burn_block_by_ch(&prior_state_machine.burn_block)?;
970+
971+
let potential_replay_tip = NewBurnBlock {
972+
burn_block_height: parent_burn_block_info.block_height,
973+
consensus_hash: parent_burn_block_info.consensus_hash,
974+
};
975+
926976
let last_forked_tenure = prior_state_machine.burn_block;
927977
let mut first_forked_tenure = prior_state_machine.burn_block;
978+
928979
let mut forked_tenures = vec![(
929980
prior_state_machine.burn_block,
930981
prior_state_machine.burn_block_height,
@@ -954,6 +1005,81 @@ impl LocalStateMachine {
9541005
return Ok(None);
9551006
}
9561007

1008+
// Collect transactions to be replayed across the forked blocks
1009+
let mut forked_blocks = fork_info
1010+
.iter()
1011+
.flat_map(|fork_info| fork_info.nakamoto_blocks.iter().flatten())
1012+
.collect::<Vec<_>>();
1013+
forked_blocks.sort_by_key(|block| block.header.chain_length);
1014+
let forked_txs = forked_blocks
1015+
.iter()
1016+
.flat_map(|block| block.txs.iter())
1017+
.filter(|tx|
1018+
// Don't include Coinbase, TenureChange, or PoisonMicroblock transactions
1019+
!matches!(
1020+
tx.payload,
1021+
TransactionPayload::TenureChange(..)
1022+
| TransactionPayload::Coinbase(..)
1023+
| TransactionPayload::PoisonMicroblock(..)
1024+
))
1025+
.cloned()
1026+
.collect::<Vec<_>>();
1027+
if forked_txs.len() > 0 {
1028+
let updated_scope = (expected_burn_block.clone(), potential_replay_tip);
1029+
info!("Tx Replay: replay set updated with {} tx(s)", forked_txs.len();
1030+
"tx_replay_set" => ?forked_txs,
1031+
"tx_replay_scope" => ?updated_scope);
1032+
*tx_replay_scope = Some(updated_scope);
1033+
} else {
1034+
info!("Tx Replay: no transactions to be replayed.");
1035+
*tx_replay_scope = None;
1036+
}
1037+
Ok(Some(forked_txs))
1038+
}
1039+
1040+
///TODO: This method can be used to remove dublication in 'handle_possible_bitcoin_fork'
1041+
/// Just waiting to avoid potential merge conflict with PR #6109
1042+
/// Retrieve all the transactions that are involved by the fork
1043+
/// from the start block (highest height) back to the end block (lowest height)
1044+
fn compute_forked_txs_set(
1045+
&self,
1046+
db: &SignerDb,
1047+
client: &StacksClient,
1048+
end_block: &NewBurnBlock,
1049+
start_block: &NewBurnBlock,
1050+
) -> Result<Option<Vec<StacksTransaction>>, SignerChainstateError> {
1051+
// Determine the tenures that were forked
1052+
let mut parent_burn_block_info = db.get_burn_block_by_ch(&start_block.consensus_hash)?;
1053+
let last_forked_tenure = start_block.consensus_hash;
1054+
let mut first_forked_tenure = start_block.consensus_hash;
1055+
let mut forked_tenures = vec![(start_block.consensus_hash, start_block.burn_block_height)];
1056+
while parent_burn_block_info.block_height > end_block.burn_block_height {
1057+
parent_burn_block_info =
1058+
db.get_burn_block_by_hash(&parent_burn_block_info.parent_burn_block_hash)?;
1059+
first_forked_tenure = parent_burn_block_info.consensus_hash;
1060+
forked_tenures.push((
1061+
parent_burn_block_info.consensus_hash,
1062+
parent_burn_block_info.block_height,
1063+
));
1064+
}
1065+
let fork_info =
1066+
client.get_tenure_forking_info(&first_forked_tenure, &last_forked_tenure)?;
1067+
1068+
// Check if fork occurred within current reward cycle. Reject tx replay otherwise.
1069+
let reward_cycle_info = client.get_current_reward_cycle_info()?;
1070+
1071+
let current_reward_cycle = reward_cycle_info.reward_cycle;
1072+
let is_fork_in_current_reward_cycle = fork_info.iter().all(|fork_info| {
1073+
let block_height = fork_info.burn_block_height;
1074+
let block_rc = reward_cycle_info.get_reward_cycle(block_height);
1075+
block_rc == current_reward_cycle
1076+
});
1077+
1078+
if !is_fork_in_current_reward_cycle {
1079+
info!("Detected bitcoin fork occurred in previous reward cycle. Tx replay won't be executed");
1080+
return Ok(None);
1081+
}
1082+
9571083
// Collect transactions to be replayed across the forked blocks
9581084
let mut forked_blocks = fork_info
9591085
.iter()

stackslib/src/chainstate/stacks/miner.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,28 @@ fn fault_injection_stall_tx() {}
9595
/// Test flag to exclude replay txs from the next block
9696
pub static TEST_EXCLUDE_REPLAY_TXS: LazyLock<TestFlag<bool>> = LazyLock::new(TestFlag::default);
9797

98+
#[cfg(any(test, feature = "testing"))]
99+
/// Test flag to mine specific txs belonging to the replay set
100+
pub static TEST_MINE_ALLOWED_REPLAY_TXS: LazyLock<TestFlag<Vec<String>>> =
101+
LazyLock::new(TestFlag::default);
102+
103+
#[cfg(any(test, feature = "testing"))]
104+
/// Given a tx id, check if it is should be skipped
105+
/// if not listed in `TEST_MINE_ALLOWED_REPLAY_TXS` flag.
106+
/// If flag is empty means no tx should be skipped
107+
fn should_skip_replay_tx(tx_id: Txid) -> bool {
108+
let minable_txs = TEST_MINE_ALLOWED_REPLAY_TXS.get();
109+
let allowed =
110+
minable_txs.len() == 0 || minable_txs.iter().any(|tx_ids| *tx_ids == tx_id.to_hex());
111+
if !allowed {
112+
info!(
113+
"Tx skipped due to test flag TEST_MINE_ALLOWED_REPLAY_TXS: {}",
114+
tx_id.to_hex()
115+
);
116+
}
117+
!allowed
118+
}
119+
98120
/// Fully-assembled Stacks anchored, block as well as some extra metadata pertaining to how it was
99121
/// linked to the burnchain and what view(s) the miner had of the burnchain before and after
100122
/// completing the block.
@@ -3005,6 +3027,13 @@ fn select_and_apply_transactions_from_vec<B: BlockBuilder>(
30053027
debug!("Replay block transaction selection begins (parent height = {tip_height})");
30063028
for replay_tx in replay_transactions {
30073029
fault_injection_stall_tx();
3030+
#[cfg(any(test, feature = "testing"))]
3031+
{
3032+
if should_skip_replay_tx(replay_tx.txid()) {
3033+
continue;
3034+
}
3035+
}
3036+
30083037
let txid = replay_tx.txid();
30093038
let tx_result = builder.try_mine_tx_with_len(
30103039
epoch_tx,

testnet/stacks-node/src/tests/signer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
468468
let contract_tx = make_contract_publish(
469469
&sender_sk,
470470
sender_nonce,
471-
1000,
471+
1000000,
472472
self.running_nodes.conf.burnchain.chain_id,
473473
contract_name,
474474
contract_code,

0 commit comments

Comments
 (0)