Skip to content

Commit 2f675a4

Browse files
committed
chore: attempt a fix at continue_tenure_extend, but it will need some cleanup even if it works
1 parent 81ce1ca commit 2f675a4

File tree

3 files changed

+142
-38
lines changed

3 files changed

+142
-38
lines changed

testnet/stacks-node/src/nakamoto_node/miner.rs

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
//
1414
// You should have received a copy of the GNU General Public License
1515
// along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
use std::sync::atomic::{AtomicBool, Ordering};
17+
use std::sync::Arc;
1618
#[cfg(test)]
1719
use std::sync::LazyLock;
1820
use std::thread;
19-
use std::thread::JoinHandle;
2021
use std::time::{Duration, Instant};
2122

2223
use clarity::boot_util::boot_code_id;
@@ -52,7 +53,7 @@ use stacks_common::types::{PrivateKey, StacksEpochId};
5253
use stacks_common::util::tests::TestFlag;
5354
use stacks_common::util::vrf::VRFProof;
5455

55-
use super::relayer::RelayerThread;
56+
use super::relayer::{MinerStopHandle, RelayerThread};
5657
use super::{Config, Error as NakamotoNodeError, EventDispatcher, Keychain};
5758
use crate::nakamoto_node::signer_coordinator::SignerCoordinator;
5859
use crate::nakamoto_node::VRF_MOCK_MINER_KEY;
@@ -185,6 +186,8 @@ pub struct BlockMinerThread {
185186
signer_set_cache: Option<RewardSet>,
186187
/// The time at which tenure change/extend was attempted
187188
tenure_change_time: Instant,
189+
/// flag to indicate an abort driven from the relayer
190+
abort_flag: Arc<AtomicBool>,
188191
}
189192

190193
impl BlockMinerThread {
@@ -213,9 +216,14 @@ impl BlockMinerThread {
213216
p2p_handle: rt.get_p2p_handle(),
214217
signer_set_cache: None,
215218
tenure_change_time: Instant::now(),
219+
abort_flag: Arc::new(AtomicBool::new(false)),
216220
}
217221
}
218222

223+
pub fn get_abort_flag(&self) -> Arc<AtomicBool> {
224+
self.abort_flag.clone()
225+
}
226+
219227
#[cfg(test)]
220228
fn fault_injection_block_broadcast_stall(new_block: &NakamotoBlock) {
221229
if TEST_BROADCAST_STALL.get() {
@@ -278,29 +286,6 @@ impl BlockMinerThread {
278286
}
279287

280288
/// Stop a miner tenure by blocking the miner and then joining the tenure thread
281-
pub fn stop_miner(
282-
globals: &Globals,
283-
prior_miner: JoinHandle<Result<(), NakamotoNodeError>>,
284-
) -> Result<(), NakamotoNodeError> {
285-
debug!(
286-
"Stopping prior miner thread ID {:?}",
287-
prior_miner.thread().id()
288-
);
289-
globals.block_miner();
290-
let prior_miner_result = prior_miner
291-
.join()
292-
.map_err(|_| ChainstateError::MinerAborted)?;
293-
if let Err(e) = prior_miner_result {
294-
// it's okay if the prior miner thread exited with an error.
295-
// in many cases this is expected (i.e., a burnchain block occurred)
296-
// if some error condition should be handled though, this is the place
297-
// to do that handling.
298-
debug!("Prior mining thread exited with: {e:?}");
299-
}
300-
globals.unblock_miner();
301-
Ok(())
302-
}
303-
304289
#[cfg(test)]
305290
fn fault_injection_stall_miner() {
306291
if TEST_MINE_STALL.get() {
@@ -318,7 +303,7 @@ impl BlockMinerThread {
318303

319304
pub fn run_miner(
320305
mut self,
321-
prior_miner: Option<JoinHandle<Result<(), NakamotoNodeError>>>,
306+
prior_miner: Option<MinerStopHandle>,
322307
) -> Result<(), NakamotoNodeError> {
323308
// when starting a new tenure, block the mining thread if its currently running.
324309
// the new mining thread will join it (so that the new mining thread stalls, not the relayer)
@@ -332,7 +317,12 @@ impl BlockMinerThread {
332317
"reason" => %self.reason,
333318
);
334319
if let Some(prior_miner) = prior_miner {
335-
Self::stop_miner(&self.globals, prior_miner)?;
320+
debug!(
321+
"Miner thread {:?}: will try and stop prior miner {:?}",
322+
thread::current().id(),
323+
prior_miner.inner_thread().id()
324+
);
325+
prior_miner.stop(&self.globals)?;
336326
}
337327
let mut stackerdbs = StackerDBs::connect(&self.config.get_stacker_db_file_path(), true)?;
338328
let mut last_block_rejected = false;
@@ -461,6 +451,13 @@ impl BlockMinerThread {
461451
break Some(x);
462452
}
463453
Err(NakamotoNodeError::MiningFailure(ChainstateError::MinerAborted)) => {
454+
if self.abort_flag.load(Ordering::SeqCst) {
455+
info!("Miner interrupted while mining in order to shut down");
456+
self.globals
457+
.raise_initiative(format!("MiningFailure: aborted by node"));
458+
return Err(ChainstateError::MinerAborted.into());
459+
}
460+
464461
info!("Miner interrupted while mining, will try again");
465462
// sleep, and try again. if the miner was interrupted because the burnchain
466463
// view changed, the next `mine_block()` invocation will error

testnet/stacks-node/src/nakamoto_node/relayer.rs

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
use core::fmt;
1717
use std::collections::HashSet;
18-
use std::fs;
1918
use std::io::Read;
19+
use std::sync::atomic::{AtomicBool, Ordering};
2020
use std::sync::mpsc::{Receiver, RecvTimeoutError};
21+
use std::sync::Arc;
2122
#[cfg(test)]
2223
use std::sync::LazyLock;
2324
use std::thread::JoinHandle;
2425
use std::time::{Duration, Instant};
26+
use std::{fs, thread};
2527

2628
use rand::{thread_rng, Rng};
2729
use stacks::burnchains::{Burnchain, Txid};
@@ -40,6 +42,7 @@ use stacks::chainstate::stacks::db::StacksChainState;
4042
use stacks::chainstate::stacks::miner::{
4143
get_mining_spend_amount, signal_mining_blocked, signal_mining_ready,
4244
};
45+
use stacks::chainstate::stacks::Error as ChainstateError;
4346
use stacks::core::mempool::MemPoolDB;
4447
use stacks::core::STACKS_EPOCH_3_1_MARKER;
4548
use stacks::monitoring::increment_stx_blocks_mined_counter;
@@ -186,6 +189,101 @@ impl LastCommit {
186189
}
187190
}
188191

192+
pub type MinerThreadJoinHandle = JoinHandle<Result<(), NakamotoNodeError>>;
193+
194+
/// Miner thread join handle.
195+
/// This can be a "bare" miner thread, or a "tenure-stop" miner thread which itself stops a "bare"
196+
/// miner thread.
197+
pub enum MinerStopHandle {
198+
Miner(MinerThreadJoinHandle, Arc<AtomicBool>),
199+
TenureStop(MinerThreadJoinHandle, Arc<AtomicBool>),
200+
}
201+
202+
impl MinerStopHandle {
203+
pub fn new_miner(jh: MinerThreadJoinHandle, abort_flag: Arc<AtomicBool>) -> Self {
204+
Self::Miner(jh, abort_flag)
205+
}
206+
207+
pub fn new_tenure_stop(jh: MinerThreadJoinHandle, abort_flag: Arc<AtomicBool>) -> Self {
208+
Self::TenureStop(jh, abort_flag)
209+
}
210+
211+
pub fn inner_thread(&self) -> &std::thread::Thread {
212+
match self {
213+
Self::Miner(jh, ..) => jh.thread(),
214+
Self::TenureStop(jh, ..) => jh.thread(),
215+
}
216+
}
217+
218+
pub fn into_inner(self) -> MinerThreadJoinHandle {
219+
match self {
220+
Self::Miner(jh, ..) => jh,
221+
Self::TenureStop(jh, ..) => jh,
222+
}
223+
}
224+
225+
pub fn is_tenure_stop(&self) -> bool {
226+
match self {
227+
Self::TenureStop(..) => true,
228+
_ => false,
229+
}
230+
}
231+
232+
pub fn is_miner(&self) -> bool {
233+
match self {
234+
Self::Miner(..) => true,
235+
_ => false,
236+
}
237+
}
238+
239+
pub fn set_abort_flag(&self) {
240+
match self {
241+
Self::Miner(_, abort_flag) => {
242+
(*abort_flag).store(true, Ordering::SeqCst);
243+
}
244+
Self::TenureStop(_, abort_flag) => {
245+
(*abort_flag).store(true, Ordering::SeqCst);
246+
}
247+
}
248+
}
249+
250+
pub fn get_abort_flag(&self) -> Arc<AtomicBool> {
251+
match self {
252+
Self::Miner(_, abort_flag) => abort_flag.clone(),
253+
Self::TenureStop(_, abort_flag) => abort_flag.clone(),
254+
}
255+
}
256+
257+
pub fn stop(self, globals: &Globals) -> Result<(), NakamotoNodeError> {
258+
let my_id = thread::current().id();
259+
let prior_thread_id = self.inner_thread().id();
260+
debug!(
261+
"[Thread {:?}]: Stopping prior miner thread ID {:?}",
262+
&my_id, &prior_thread_id
263+
);
264+
265+
self.set_abort_flag();
266+
globals.block_miner();
267+
268+
let prior_miner = self.into_inner();
269+
let prior_miner_result = prior_miner.join().map_err(|_| {
270+
error!("Miner: failed to join prior miner");
271+
ChainstateError::MinerAborted
272+
})?;
273+
debug!("Stopped prior miner thread ID {:?}", &prior_thread_id);
274+
if let Err(e) = prior_miner_result {
275+
// it's okay if the prior miner thread exited with an error.
276+
// in many cases this is expected (i.e., a burnchain block occurred)
277+
// if some error condition should be handled though, this is the place
278+
// to do that handling.
279+
debug!("Prior mining thread exited with: {e:?}");
280+
}
281+
282+
globals.unblock_miner();
283+
Ok(())
284+
}
285+
}
286+
189287
/// Relayer thread
190288
/// * accepts network results and stores blocks and microblocks
191289
/// * forwards new blocks, microblocks, and transactions to the p2p thread
@@ -242,7 +340,7 @@ pub struct RelayerThread {
242340
relayer: Relayer,
243341

244342
/// handle to the subordinate miner thread
245-
miner_thread: Option<JoinHandle<Result<(), NakamotoNodeError>>>,
343+
miner_thread: Option<MinerStopHandle>,
246344
/// miner thread's burn view
247345
miner_thread_burn_view: Option<BlockSnapshot>,
248346

@@ -1053,6 +1151,7 @@ impl RelayerThread {
10531151
parent_tenure_start,
10541152
reason,
10551153
)?;
1154+
let miner_abort_flag = new_miner_state.get_abort_flag();
10561155

10571156
debug!("Relayer: starting new tenure thread");
10581157

@@ -1062,6 +1161,10 @@ impl RelayerThread {
10621161
.name(format!("miner.{parent_tenure_start}.{rand_id}",))
10631162
.stack_size(BLOCK_PROCESSOR_STACK_SIZE)
10641163
.spawn(move || {
1164+
debug!(
1165+
"New block miner thread ID is {:?}",
1166+
std::thread::current().id()
1167+
);
10651168
Self::fault_injection_stall_miner_thread_startup();
10661169
if let Err(e) = new_miner_state.run_miner(prior_tenure_thread) {
10671170
info!("Miner thread failed: {e:?}");
@@ -1078,7 +1181,10 @@ impl RelayerThread {
10781181
"Relayer: started tenure thread ID {:?}",
10791182
new_miner_handle.thread().id()
10801183
);
1081-
self.miner_thread.replace(new_miner_handle);
1184+
self.miner_thread.replace(MinerStopHandle::new_miner(
1185+
new_miner_handle,
1186+
miner_abort_flag,
1187+
));
10821188
self.miner_thread_burn_view.replace(burn_tip);
10831189
Ok(())
10841190
}
@@ -1092,18 +1198,23 @@ impl RelayerThread {
10921198
};
10931199
self.miner_thread_burn_view = None;
10941200

1095-
let id = prior_tenure_thread.thread().id();
1201+
let id = prior_tenure_thread.inner_thread().id();
1202+
let abort_flag = prior_tenure_thread.get_abort_flag();
10961203
let globals = self.globals.clone();
10971204

10981205
let stop_handle = std::thread::Builder::new()
1099-
.name(format!("tenure-stop-{}", self.local_peer.data_url))
1100-
.spawn(move || BlockMinerThread::stop_miner(&globals, prior_tenure_thread))
1206+
.name(format!(
1207+
"tenure-stop({:?})-{}",
1208+
id, self.local_peer.data_url
1209+
))
1210+
.spawn(move || prior_tenure_thread.stop(&globals))
11011211
.map_err(|e| {
11021212
error!("Relayer: Failed to spawn a stop-tenure thread: {e:?}");
11031213
NakamotoNodeError::SpawnError(e)
11041214
})?;
11051215

1106-
self.miner_thread.replace(stop_handle);
1216+
self.miner_thread
1217+
.replace(MinerStopHandle::new_tenure_stop(stop_handle, abort_flag));
11071218
debug!("Relayer: stopped tenure thread ID {id:?}");
11081219
Ok(())
11091220
}

testnet/stacks-node/src/tests/nakamoto_integrations.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ lazy_static! {
196196
pub static TEST_SIGNING: Mutex<Option<TestSigningChannel>> = Mutex::new(None);
197197

198198
pub struct TestSigningChannel {
199-
// pub recv: Option<Receiver<ThresholdSignature>>,
200199
pub recv: Option<Receiver<Vec<MessageSignature>>>,
201-
// pub send: Sender<ThresholdSignature>,
202200
pub send: Sender<Vec<MessageSignature>>,
203201
}
204202

@@ -208,8 +206,6 @@ impl TestSigningChannel {
208206
/// Returns None if the singleton isn't instantiated and the miner should coordinate
209207
/// a real signer set signature.
210208
/// Panics if the blind-signer times out.
211-
///
212-
/// TODO: update to use signatures vec
213209
pub fn get_signature() -> Option<Vec<MessageSignature>> {
214210
let mut signer = TEST_SIGNING.lock().unwrap();
215211
let sign_channels = signer.as_mut()?;

0 commit comments

Comments
 (0)