Skip to content

Commit 9590495

Browse files
committed
wip: add kes validation module
1 parent 7061589 commit 9590495

File tree

4 files changed

+313
-16
lines changed

4 files changed

+313
-16
lines changed

common/src/validation.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,16 @@ impl PartialEq for BadVrfProofError {
229229
/// https://github.com/IntersectMBO/ouroboros-consensus/blob/e3c52b7c583bdb6708fac4fdaa8bf0b9588f5a88/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos.hs#L342
230230
#[derive(Error, Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
231231
pub enum KesValidationError {
232-
/// Current KES period is before the OCert start period
232+
#[error("{0}")]
233+
KesSignatureError(#[from] KesSignatureError),
234+
#[error("{0}")]
235+
OperationalCertificateError(#[from] OperationalCertificateError),
236+
}
237+
238+
#[derive(Error, Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
239+
pub enum KesSignatureError {
240+
/// **Cause:** Current KES period is before the operational certificate's
241+
/// start period.
233242
#[error(
234243
"KES Before Start OCert: OCert Start Period={}, Current Period={}",
235244
ocert_start_period,
@@ -239,7 +248,8 @@ pub enum KesValidationError {
239248
ocert_start_period: u64,
240249
current_period: u64,
241250
},
242-
/// Current KES period is after the valid range
251+
/// **Cause:** Current KES period exceeds the operational certificate's
252+
/// validity period.
243253
#[error(
244254
"KES After End OCert: Current Period={}, OCert Start Period={}, Max KES Evolutions={}",
245255
current_period,
@@ -251,7 +261,21 @@ pub enum KesValidationError {
251261
ocert_start_period: u64,
252262
max_kes_evolutions: u64,
253263
},
254-
/// The OCert counter is too small
264+
/// **Cause:** The KES signature on the block header is cryptographically invalid.
265+
#[error("Invalid KES Signature OCert: Current KES Period={}, KES Start Period={}, Expected Evolutions={}, Max KES Evolutions={}, Error Message={}", current_kes_period, kes_start_period, expected_evolutions, max_kes_evolutions, error_message)]
266+
InvalidKesSignatureOcert {
267+
current_kes_period: u64,
268+
kes_start_period: u64,
269+
expected_evolutions: u64,
270+
max_kes_evolutions: u64,
271+
error_message: String,
272+
},
273+
}
274+
275+
#[derive(Error, Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
276+
pub enum OperationalCertificateError {
277+
/// **Cause:** The operational certificate counter in the header is not greater
278+
/// than the last counter used by this pool.
255279
#[error(
256280
"Counter Too Small OCert: Last Counter={}, Current Counter={}",
257281
last_counter,
@@ -261,7 +285,8 @@ pub enum KesValidationError {
261285
last_counter: u64,
262286
current_counter: u64,
263287
},
264-
/// The OCert counter incremented by more than 1 (Praos only)
288+
/// **Cause:** OCert counter jumped by more than 1. While not strictly invalid,
289+
/// this is suspicious and may indicate key compromise. (Praos Only)
265290
#[error(
266291
"Counter Over Incremented OCert: Last Counter={}, Current Counter={}",
267292
last_counter,
@@ -271,23 +296,15 @@ pub enum KesValidationError {
271296
last_counter: u64,
272297
current_counter: u64,
273298
},
274-
/// The cold key signature on the OCert is invalid
299+
/// **Cause:** The cold key signature on the operational certificate is invalid.
300+
/// The OCert was not properly signed by the pool's cold key.
275301
#[error(
276302
"Invalid Signature OCert: Counter={}, KES Period={}",
277303
counter,
278304
kes_period
279305
)]
280306
InvalidSignatureOcert { counter: u64, kes_period: u64 },
281-
/// The KES signature verification failed
282-
#[error("Invalid KES Signature OCert: Current KES Period={}, KES Start Period={}, Expected Evolutions={}, Max KES Evolutions={}, Error Message={}", current_kes_period, kes_start_period, expected_evolutions, max_kes_evolutions, error_message)]
283-
InvalidKesSignatureOcert {
284-
current_kes_period: u64,
285-
kes_start_period: u64,
286-
expected_evolutions: u64,
287-
max_kes_evolutions: u64,
288-
error_message: String,
289-
},
290-
/// No counter found for this key hash (not a stake pool or genesis delegate)
307+
/// **Cause:** No counter found for this key hash (not a stake pool or genesis delegate)
291308
#[error("No Counter For Key Hash OCert: Pool ID={}", hex::encode(pool_id))]
292309
NoCounterForKeyHashOcert { pool_id: PoolId },
293310
}
Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,190 @@
1-
mod ouroboros;
1+
//! Acropolis Block KES Validator module for Caryatid
2+
//! Validate KES signatures in the block header
3+
4+
use acropolis_common::{
5+
messages::{CardanoMessage, Message},
6+
state_history::{StateHistory, StateHistoryStore},
7+
BlockInfo, BlockStatus,
8+
};
9+
use anyhow::Result;
10+
use caryatid_sdk::{module, Context, Module, Subscription};
11+
use config::Config;
12+
use std::sync::Arc;
13+
use tokio::sync::Mutex;
14+
use tracing::{error, info, info_span, Instrument};
215
mod state;
16+
use state::State;
17+
18+
use crate::kes_validation_publisher::KesValidationPublisher;
19+
mod kes_validation_publisher;
20+
mod ouroboros;
21+
22+
const DEFAULT_VALIDATION_KES_PUBLISHER_TOPIC: (&str, &str) =
23+
("validation-kes-publisher-topic", "cardano.validation.kes");
24+
25+
const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
26+
"bootstrapped-subscribe-topic",
27+
"cardano.sequence.bootstrapped",
28+
);
29+
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
30+
"protocol-parameters-subscribe-topic",
31+
"cardano.protocol.parameters",
32+
);
33+
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
34+
("blocks-subscribe-topic", "cardano.block.proposed");
35+
36+
/// Block KES Validator module
37+
#[module(
38+
message_type(Message),
39+
name = "block-kes-validator",
40+
description = "Validate the KES signatures in the block header"
41+
)]
42+
43+
pub struct BlockKesValidator;
44+
45+
impl BlockKesValidator {
46+
#[allow(clippy::too_many_arguments)]
47+
async fn run(
48+
history: Arc<Mutex<StateHistory<State>>>,
49+
mut kes_validation_publisher: KesValidationPublisher,
50+
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
51+
mut blocks_subscription: Box<dyn Subscription<Message>>,
52+
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
53+
) -> Result<()> {
54+
let (_, bootstrapped_message) = bootstrapped_subscription.read().await?;
55+
let genesis = match bootstrapped_message.as_ref() {
56+
Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => {
57+
complete.values.clone()
58+
}
59+
_ => panic!("Unexpected message in genesis completion topic: {bootstrapped_message:?}"),
60+
};
61+
62+
// Consume initial protocol parameters
63+
let _ = protocol_parameters_subscription.read().await?;
64+
65+
loop {
66+
// Get a mutable state
67+
let mut state = history.lock().await.get_or_init_with(State::new);
68+
let mut current_block: Option<BlockInfo> = None;
69+
70+
let (_, message) = blocks_subscription.read().await?;
71+
match message.as_ref() {
72+
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
73+
// handle rollback here
74+
if block_info.status == BlockStatus::RolledBack {
75+
state = history.lock().await.get_rolled_back_state(block_info.number);
76+
}
77+
current_block = Some(block_info.clone());
78+
let is_new_epoch = block_info.new_epoch && block_info.epoch > 0;
79+
80+
if is_new_epoch {
81+
// read epoch boundary messages
82+
let protocol_parameters_message_f = protocol_parameters_subscription.read();
83+
84+
let (_, protocol_parameters_msg) = protocol_parameters_message_f.await?;
85+
let span = info_span!(
86+
"block_kes_validator.handle_protocol_parameters",
87+
epoch = block_info.epoch
88+
);
89+
span.in_scope(|| match protocol_parameters_msg.as_ref() {
90+
Message::Cardano((block_info, CardanoMessage::ProtocolParams(msg))) => {
91+
Self::check_sync(&current_block, block_info);
92+
state.handle_protocol_parameters(msg);
93+
}
94+
_ => error!("Unexpected message type: {protocol_parameters_msg:?}"),
95+
});
96+
}
97+
98+
let span =
99+
info_span!("block_kes_validator.validate", block = block_info.number);
100+
async {
101+
let result = state
102+
.validate_block_kes(block_info, &block_msg.header, &genesis)
103+
.map_err(|e| *e);
104+
if let Err(e) = kes_validation_publisher
105+
.publish_kes_validation(block_info, result)
106+
.await
107+
{
108+
error!("Failed to publish KES validation: {e}")
109+
}
110+
}
111+
.instrument(span)
112+
.await;
113+
}
114+
_ => error!("Unexpected message type: {message:?}"),
115+
}
116+
117+
// Commit the new state
118+
if let Some(block_info) = current_block {
119+
history.lock().await.commit(block_info.number, state);
120+
}
121+
}
122+
}
123+
124+
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
125+
// Publish topics
126+
let validation_kes_publisher_topic = config
127+
.get_string(DEFAULT_VALIDATION_KES_PUBLISHER_TOPIC.0)
128+
.unwrap_or(DEFAULT_VALIDATION_KES_PUBLISHER_TOPIC.1.to_string());
129+
info!("Creating validation KES publisher on '{validation_kes_publisher_topic}'");
130+
131+
// Subscribe topics
132+
let bootstrapped_subscribe_topic = config
133+
.get_string(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.0)
134+
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
135+
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
136+
let protocol_parameters_subscribe_topic = config
137+
.get_string(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.0)
138+
.unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string());
139+
info!("Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'");
140+
141+
let blocks_subscribe_topic = config
142+
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
143+
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
144+
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
145+
146+
// publishers
147+
let kes_validation_publisher =
148+
KesValidationPublisher::new(context.clone(), validation_kes_publisher_topic);
149+
150+
// Subscribers
151+
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
152+
let protocol_parameters_subscription =
153+
context.subscribe(&protocol_parameters_subscribe_topic).await?;
154+
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
155+
156+
// state history
157+
let history = Arc::new(Mutex::new(StateHistory::<State>::new(
158+
"block_kes_validator",
159+
StateHistoryStore::default_block_store(),
160+
)));
161+
162+
// Start run task
163+
context.run(async move {
164+
Self::run(
165+
history,
166+
kes_validation_publisher,
167+
bootstrapped_subscription,
168+
blocks_subscription,
169+
protocol_parameters_subscription,
170+
)
171+
.await
172+
.unwrap_or_else(|e| error!("Failed: {e}"));
173+
});
174+
175+
Ok(())
176+
}
177+
178+
/// Check for synchronisation
179+
fn check_sync(expected: &Option<BlockInfo>, actual: &BlockInfo) {
180+
if let Some(ref block) = expected {
181+
if block.number != actual.number {
182+
error!(
183+
expected = block.number,
184+
actual = actual.number,
185+
"Messages out of sync"
186+
);
187+
}
188+
}
189+
}
190+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use acropolis_common::{
2+
messages::{CardanoMessage, Message},
3+
validation::{KesValidationError, ValidationError, ValidationStatus},
4+
BlockInfo,
5+
};
6+
use caryatid_sdk::Context;
7+
use std::sync::Arc;
8+
use tracing::error;
9+
10+
/// Message publisher for Block header KES Validation Result
11+
pub struct KesValidationPublisher {
12+
/// Module context
13+
context: Arc<Context<Message>>,
14+
15+
/// Topic to publish on
16+
topic: String,
17+
}
18+
19+
impl KesValidationPublisher {
20+
/// Construct with context and topic to publish on
21+
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
22+
Self { context, topic }
23+
}
24+
25+
pub async fn publish_kes_validation(
26+
&mut self,
27+
block: &BlockInfo,
28+
validation_result: Result<(), KesValidationError>,
29+
) -> anyhow::Result<()> {
30+
let validation_status = match validation_result {
31+
Ok(_) => ValidationStatus::Go,
32+
Err(error) => {
33+
error!(
34+
"KES validation failed: {} of block {}",
35+
error.clone(),
36+
block.number
37+
);
38+
ValidationStatus::NoGo(ValidationError::from(error))
39+
}
40+
};
41+
self.context
42+
.message_bus
43+
.publish(
44+
&self.topic,
45+
Arc::new(Message::Cardano((
46+
block.clone(),
47+
CardanoMessage::BlockValidation(validation_status),
48+
))),
49+
)
50+
.await
51+
}
52+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use acropolis_common::{
2+
genesis_values::GenesisValues, messages::ProtocolParamsMessage, validation::KesValidationError,
3+
BlockInfo, PoolId,
4+
};
5+
use imbl::HashMap;
6+
7+
#[derive(Default, Debug, Clone)]
8+
pub struct State {
9+
pub ocert_counters: HashMap<PoolId, u64>,
10+
11+
pub slots_per_kes_period: Option<u64>,
12+
13+
pub max_kes_evolutions: Option<u64>,
14+
}
15+
16+
impl State {
17+
pub fn new() -> Self {
18+
Self {
19+
ocert_counters: HashMap::new(),
20+
slots_per_kes_period: None,
21+
max_kes_evolutions: None,
22+
}
23+
}
24+
25+
pub fn handle_protocol_parameters(&mut self, msg: &ProtocolParamsMessage) {
26+
if let Some(shelley_params) = msg.params.shelley.as_ref() {
27+
self.slots_per_kes_period = Some(shelley_params.slots_per_kes_period as u64);
28+
self.max_kes_evolutions = Some(shelley_params.max_kes_evolutions as u64);
29+
}
30+
}
31+
32+
pub fn validate_block_kes(
33+
&self,
34+
block_info: &BlockInfo,
35+
raw_header: &[u8],
36+
genesis: &GenesisValues,
37+
) -> Result<(), Box<KesValidationError>> {
38+
Ok(())
39+
}
40+
}

0 commit comments

Comments
 (0)