Skip to content

Commit 927e824

Browse files
authored
Merge pull request #4658 from stacks-network/feat/signers-read-stackerdb
Feat/signers read stackerdb
2 parents e534515 + ff96f8c commit 927e824

File tree

8 files changed

+388
-66
lines changed

8 files changed

+388
-66
lines changed

.github/workflows/bitcoin-tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,13 @@ jobs:
8383
- tests::nakamoto_integrations::follower_bootup
8484
- tests::nakamoto_integrations::forked_tenure_is_ignored
8585
- tests::signer::stackerdb_dkg
86-
- tests::signer::stackerdb_sign
86+
- tests::signer::stackerdb_sign_request_rejected
8787
- tests::signer::stackerdb_block_proposal
8888
- tests::signer::stackerdb_filter_bad_transactions
8989
- tests::signer::stackerdb_mine_2_nakamoto_reward_cycles
9090
- tests::signer::stackerdb_sign_after_signer_reboot
9191
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
92+
- tests::signer::stackerdb_delayed_dkg
9293
# Do not run this one until we figure out why it fails in CI
9394
# - tests::neon_integrations::bitcoin_reorg_flap
9495
steps:

stacks-signer/src/client/stackerdb.rs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use slog::{slog_debug, slog_error, slog_warn};
2323
use stacks_common::codec::{read_next, StacksMessageCodec};
2424
use stacks_common::types::chainstate::StacksPrivateKey;
2525
use stacks_common::{debug, error, warn};
26+
use wsts::net::Packet;
2627

2728
use super::ClientError;
2829
use crate::client::retry_with_exponential_backoff;
@@ -179,45 +180,79 @@ impl StackerDB {
179180
}
180181
}
181182

182-
/// Get the transactions from stackerdb for the signers
183-
fn get_transactions(
184-
transactions_session: &mut StackerDBSession,
185-
signer_ids: &[SignerSlotID],
186-
) -> Result<Vec<StacksTransaction>, ClientError> {
183+
/// Get all signer messages from stackerdb for the given slot IDs
184+
fn get_messages(
185+
session: &mut StackerDBSession,
186+
slot_ids: &[u32],
187+
) -> Result<Vec<SignerMessage>, ClientError> {
188+
let mut messages = vec![];
187189
let send_request = || {
188-
transactions_session
189-
.get_latest_chunks(&signer_ids.iter().map(|id| id.0).collect::<Vec<_>>())
190+
session
191+
.get_latest_chunks(slot_ids)
190192
.map_err(backoff::Error::transient)
191193
};
192194
let chunk_ack = retry_with_exponential_backoff(send_request)?;
193-
let mut transactions = Vec::new();
194195
for (i, chunk) in chunk_ack.iter().enumerate() {
195-
let signer_id = *signer_ids
196-
.get(i)
197-
.expect("BUG: retrieved an unequal amount of chunks to requested chunks");
198196
let Some(data) = chunk else {
199197
continue;
200198
};
201199
let Ok(message) = read_next::<SignerMessage, _>(&mut &data[..]) else {
202200
if !data.is_empty() {
203201
warn!("Failed to deserialize chunk data into a SignerMessage");
204-
debug!(
205-
"signer #{signer_id}: Failed chunk ({}): {data:?}",
206-
&data.len(),
207-
);
202+
debug!("slot #{i}: Failed chunk ({}): {data:?}", &data.len(),);
208203
}
209204
continue;
210205
};
206+
messages.push(message);
207+
}
208+
Ok(messages)
209+
}
211210

211+
/// Get the ordered DKG packets from stackerdb for the signer slot IDs.
212+
pub fn get_dkg_packets(
213+
&mut self,
214+
signer_ids: &[SignerSlotID],
215+
) -> Result<Vec<Packet>, ClientError> {
216+
let packet_slots = &[
217+
MessageSlotID::DkgBegin,
218+
MessageSlotID::DkgPublicShares,
219+
MessageSlotID::DkgPrivateBegin,
220+
MessageSlotID::DkgPrivateShares,
221+
MessageSlotID::DkgEndBegin,
222+
MessageSlotID::DkgEnd,
223+
];
224+
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
225+
let mut packets = vec![];
226+
for packet_slot in packet_slots {
227+
let session = self
228+
.signers_message_stackerdb_sessions
229+
.get_mut(packet_slot)
230+
.ok_or(ClientError::NotConnected)?;
231+
let messages = Self::get_messages(session, &slot_ids)?;
232+
for message in messages {
233+
let SignerMessage::Packet(packet) = message else {
234+
warn!("Found an unexpected type in a packet slot {packet_slot}");
235+
continue;
236+
};
237+
packets.push(packet);
238+
}
239+
}
240+
Ok(packets)
241+
}
242+
243+
/// Get the transactions from stackerdb for the signers
244+
fn get_transactions(
245+
transactions_session: &mut StackerDBSession,
246+
signer_ids: &[SignerSlotID],
247+
) -> Result<Vec<StacksTransaction>, ClientError> {
248+
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
249+
let messages = Self::get_messages(transactions_session, &slot_ids)?;
250+
let mut transactions = vec![];
251+
for message in messages {
212252
let SignerMessage::Transactions(chunk_transactions) = message else {
213253
warn!("Signer wrote an unexpected type to the transactions slot");
214254
continue;
215255
};
216-
debug!(
217-
"Retrieved {} transactions from signer ID {}.",
218-
chunk_transactions.len(),
219-
signer_id
220-
);
221256
transactions.extend(chunk_transactions);
222257
}
223258
Ok(transactions)

stacks-signer/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ DKG transaction fee: {tx_fee} uSTX
357357
"#,
358358
node_host = self.node_host,
359359
endpoint = self.endpoint,
360-
stacks_address = self.stacks_address.to_string(),
360+
stacks_address = self.stacks_address,
361361
public_key = StacksPublicKey::from_private(&self.stacks_private_key).to_hex(),
362362
network = self.network,
363363
db_path = self.db_path.to_str().unwrap_or_default(),

stacks-signer/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ fn write_chunk_to_stdout(chunk_opt: Option<Vec<u8>>) {
7575
if let Some(chunk) = chunk_opt.as_ref() {
7676
let hexed_string = to_hex(chunk);
7777
let hexed_chunk = hexed_string.as_bytes();
78-
let bytes = io::stdout().write(&hexed_chunk).unwrap();
78+
let bytes = io::stdout().write(hexed_chunk).unwrap();
7979
if bytes < hexed_chunk.len() {
8080
print!(
8181
"Failed to write complete chunk to stdout. Missing {} bytes",

stacks-signer/src/runloop.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,9 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
384384
continue;
385385
}
386386
if signer.approved_aggregate_public_key.is_none() {
387-
if let Err(e) = signer.refresh_dkg(&self.stacks_client) {
387+
if let Err(e) =
388+
signer.refresh_dkg(&self.stacks_client, res.clone(), current_reward_cycle)
389+
{
388390
error!("{signer}: failed to refresh DKG: {e}");
389391
}
390392
}
@@ -421,6 +423,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
421423
None
422424
}
423425
}
426+
424427
#[cfg(test)]
425428
mod tests {
426429
use blockstack_lib::chainstate::stacks::boot::NakamotoSignerEntry;

stacks-signer/src/signer.rs

Lines changed: 111 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ pub enum Operation {
145145
/// The Signer state
146146
#[derive(PartialEq, Eq, Debug, Clone)]
147147
pub enum State {
148+
/// The signer is uninitialized and should read stackerdb to restore state
149+
Uninitialized,
148150
/// The signer is idle, waiting for messages and commands
149151
Idle,
150152
/// The signer is executing a DKG or Sign round
@@ -234,6 +236,43 @@ impl Signer {
234236
fn get_coordinator_dkg(&self) -> (u32, PublicKey) {
235237
self.coordinator_selector.get_coordinator()
236238
}
239+
240+
/// Read stackerdb messages in case the signer was started late or restarted and missed incoming DKG messages
241+
pub fn read_dkg_stackerdb_messages(
242+
&mut self,
243+
stacks_client: &StacksClient,
244+
res: Sender<Vec<OperationResult>>,
245+
current_reward_cycle: u64,
246+
) -> Result<(), ClientError> {
247+
if self.state != State::Uninitialized {
248+
// We should only read stackerdb if we are uninitialized
249+
return Ok(());
250+
}
251+
let ordered_packets = self
252+
.stackerdb
253+
.get_dkg_packets(&self.signer_slot_ids)?
254+
.iter()
255+
.filter_map(|packet| {
256+
let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) {
257+
self.get_coordinator_dkg().1
258+
} else {
259+
debug!(
260+
"{self}: Received a non-DKG message in the DKG message queue. Ignoring it."
261+
);
262+
return None;
263+
};
264+
self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey)
265+
})
266+
.collect::<Vec<_>>();
267+
// We successfully read stackerdb so we are no longer uninitialized
268+
self.state = State::Idle;
269+
debug!(
270+
"{self}: Processing {} DKG messages from stackerdb: {ordered_packets:?}",
271+
ordered_packets.len()
272+
);
273+
self.handle_packets(stacks_client, res, &ordered_packets, current_reward_cycle);
274+
Ok(())
275+
}
237276
}
238277

239278
impl From<SignerConfig> for Signer {
@@ -297,7 +336,7 @@ impl From<SignerConfig> for Signer {
297336

298337
if let Some(state) = load_encrypted_signer_state(
299338
&mut stackerdb,
300-
signer_config.signer_slot_id.into(),
339+
signer_config.signer_slot_id,
301340
&state_machine.network_private_key,
302341
).or_else(|err| {
303342
warn!("Failed to load encrypted signer state from StackerDB, falling back to SignerDB: {err}");
@@ -312,7 +351,7 @@ impl From<SignerConfig> for Signer {
312351
Self {
313352
coordinator,
314353
state_machine,
315-
state: State::Idle,
354+
state: State::Uninitialized,
316355
commands: VecDeque::new(),
317356
stackerdb,
318357
mainnet: signer_config.mainnet,
@@ -403,6 +442,7 @@ impl Signer {
403442
return;
404443
}
405444
}
445+
self.update_operation(Operation::Dkg);
406446
}
407447
Command::Sign {
408448
block_proposal,
@@ -449,6 +489,7 @@ impl Signer {
449489
return;
450490
}
451491
}
492+
self.update_operation(Operation::Sign);
452493
}
453494
}
454495
}
@@ -460,6 +501,10 @@ impl Signer {
460501
current_reward_cycle: u64,
461502
) {
462503
match &self.state {
504+
State::Uninitialized => {
505+
// We cannot process any commands until we have restored our state
506+
warn!("{self}: Cannot process commands until state is restored. Waiting...");
507+
}
463508
State::Idle => {
464509
let Some(command) = self.commands.front() else {
465510
debug!("{self}: Nothing to process. Waiting for command...");
@@ -685,13 +730,13 @@ impl Signer {
685730
}
686731
}
687732

688-
if packets.iter().any(|packet| match packet.msg {
689-
Message::DkgEnd(_) => true,
690-
_ => false,
691-
}) {
733+
if packets
734+
.iter()
735+
.any(|packet| matches!(packet.msg, Message::DkgEnd(_)))
736+
{
692737
debug!("{self}: Saving signer state");
693738
self.save_signer_state()
694-
.expect(&format!("{self}: Failed to save signer state"));
739+
.unwrap_or_else(|_| panic!("{self}: Failed to save signer state"));
695740
}
696741
self.send_outbound_messages(signer_outbound_messages);
697742
self.send_outbound_messages(coordinator_outbound_messages);
@@ -1316,42 +1361,78 @@ impl Signer {
13161361
}
13171362
}
13181363

1319-
/// Refresh DKG value and queue DKG command if necessary
1320-
pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
1321-
// First check if we should queue DKG based on contract vote state and stackerdb transactions
1322-
let should_queue = self.should_queue_dkg(stacks_client)?;
1323-
// Before queueing the command, check one last time if DKG has been
1324-
// approved. It could have happened after the last call to
1325-
// `get_approved_aggregate_key` but before the theshold check in
1326-
// `should_queue_dkg`.
1364+
/// Refresh DKG and queue it if required
1365+
pub fn refresh_dkg(
1366+
&mut self,
1367+
stacks_client: &StacksClient,
1368+
res: Sender<Vec<OperationResult>>,
1369+
current_reward_cycle: u64,
1370+
) -> Result<(), ClientError> {
1371+
// First attempt to retrieve the aggregate key from the contract.
1372+
self.update_approved_aggregate_key(stacks_client)?;
1373+
if self.approved_aggregate_public_key.is_some() {
1374+
return Ok(());
1375+
}
1376+
// Check stackerdb for any missed DKG messages to catch up our state.
1377+
self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle)?;
1378+
// Check if we should still queue DKG
1379+
if !self.should_queue_dkg(stacks_client)? {
1380+
return Ok(());
1381+
}
1382+
// Because there could be a slight delay in reading pending transactions and a key being approved by the contract,
1383+
// check one last time if the approved key was set since we finished the should queue dkg call
1384+
self.update_approved_aggregate_key(stacks_client)?;
1385+
if self.approved_aggregate_public_key.is_some() {
1386+
return Ok(());
1387+
}
1388+
if self.commands.front() != Some(&Command::Dkg) {
1389+
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
1390+
self.commands.push_front(Command::Dkg);
1391+
} else {
1392+
debug!("{self}: DKG command already queued...");
1393+
}
1394+
Ok(())
1395+
}
1396+
1397+
/// Overwrites the approved aggregate key to the value in the contract, updating state accordingly
1398+
pub fn update_approved_aggregate_key(
1399+
&mut self,
1400+
stacks_client: &StacksClient,
1401+
) -> Result<(), ClientError> {
13271402
let old_dkg = self.approved_aggregate_public_key;
13281403
self.approved_aggregate_public_key =
13291404
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
13301405
if self.approved_aggregate_public_key.is_some() {
13311406
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
13321407
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
13331408
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
1409+
let internal_dkg = self.coordinator.aggregate_public_key;
1410+
if internal_dkg != self.approved_aggregate_public_key {
1411+
warn!("{self}: we do not support changing the internal DKG key yet. Expected {internal_dkg:?} got {:?}", self.approved_aggregate_public_key);
1412+
}
13341413
self.coordinator
13351414
.set_aggregate_public_key(self.approved_aggregate_public_key);
13361415
if old_dkg != self.approved_aggregate_public_key {
13371416
warn!(
1338-
"{self}: updated DKG value to {:?}.",
1417+
"{self}: updated DKG value from {old_dkg:?} to {:?}.",
13391418
self.approved_aggregate_public_key
13401419
);
13411420
}
1342-
if let State::OperationInProgress(Operation::Dkg) = self.state {
1343-
debug!(
1344-
"{self}: DKG has already been set. Aborting DKG operation {}.",
1345-
self.coordinator.current_dkg_id
1346-
);
1347-
self.finish_operation();
1348-
}
1349-
} else if should_queue {
1350-
if self.commands.front() != Some(&Command::Dkg) {
1351-
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
1352-
self.commands.push_front(Command::Dkg);
1353-
} else {
1354-
debug!("{self}: DKG command already queued...");
1421+
match self.state {
1422+
State::OperationInProgress(Operation::Dkg) => {
1423+
debug!(
1424+
"{self}: DKG has already been set. Aborting DKG operation {}.",
1425+
self.coordinator.current_dkg_id
1426+
);
1427+
self.finish_operation();
1428+
}
1429+
State::Uninitialized => {
1430+
// If we successfully load the DKG value, we are fully initialized
1431+
self.state = State::Idle;
1432+
}
1433+
_ => {
1434+
// do nothing
1435+
}
13551436
}
13561437
}
13571438
Ok(())
@@ -1433,7 +1514,7 @@ impl Signer {
14331514
else {
14341515
continue;
14351516
};
1436-
let Some(dkg_public_key) = self.coordinator.aggregate_public_key.clone() else {
1517+
let Some(dkg_public_key) = self.coordinator.aggregate_public_key else {
14371518
break;
14381519
};
14391520
if params.aggregate_key == dkg_public_key

0 commit comments

Comments
 (0)