Skip to content

Commit b356cd7

Browse files
committed
refactor: check blocks length on epochs blocks endpoint, use single block in block message configguration key
1 parent a849f1f commit b356cd7

File tree

7 files changed

+88
-63
lines changed

7 files changed

+88
-63
lines changed

modules/block_kes_validator/src/block_kes_validator.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2626
"bootstrapped-subscribe-topic",
2727
"cardano.sequence.bootstrapped",
2828
);
29-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
30-
("blocks-subscribe-topic", "cardano.block.proposed");
29+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
30+
("block-subscribe-topic", "cardano.block.proposed");
3131
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
3232
"protocol-parameters-subscribe-topic",
3333
"cardano.protocol.parameters",
@@ -50,7 +50,7 @@ impl BlockKesValidator {
5050
history: Arc<Mutex<StateHistory<State>>>,
5151
kes_validation_publisher: KesValidationPublisher,
5252
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
53-
mut blocks_subscription: Box<dyn Subscription<Message>>,
53+
mut block_subscription: Box<dyn Subscription<Message>>,
5454
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
5555
mut spo_state_subscription: Box<dyn Subscription<Message>>,
5656
) -> Result<()> {
@@ -70,7 +70,7 @@ impl BlockKesValidator {
7070
let mut state = history.lock().await.get_or_init_with(State::new);
7171
let mut current_block: Option<BlockInfo> = None;
7272

73-
let (_, message) = blocks_subscription.read().await?;
73+
let (_, message) = block_subscription.read().await?;
7474
match message.as_ref() {
7575
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
7676
// handle rollback here
@@ -160,10 +160,10 @@ impl BlockKesValidator {
160160
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
161161
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
162162

163-
let blocks_subscribe_topic = config
164-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
165-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
166-
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
163+
let block_subscribe_topic = config
164+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
165+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
166+
info!("Creating block subscription on '{block_subscribe_topic}'");
167167

168168
let protocol_parameters_subscribe_topic = config
169169
.get_string(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.0)
@@ -181,7 +181,7 @@ impl BlockKesValidator {
181181

182182
// Subscribers
183183
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
184-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
184+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
185185
let protocol_parameters_subscription =
186186
context.subscribe(&protocol_parameters_subscribe_topic).await?;
187187
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
@@ -198,7 +198,7 @@ impl BlockKesValidator {
198198
history,
199199
kes_validation_publisher,
200200
bootstrapped_subscription,
201-
blocks_subscription,
201+
block_subscription,
202202
protocol_parameters_subscription,
203203
spo_state_subscription,
204204
)

modules/block_vrf_validator/src/block_vrf_validator.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2727
"bootstrapped-subscribe-topic",
2828
"cardano.sequence.bootstrapped",
2929
);
30-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
31-
("blocks-subscribe-topic", "cardano.block.proposed");
30+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
31+
("block-subscribe-topic", "cardano.block.proposed");
3232
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
3333
"protocol-parameters-subscribe-topic",
3434
"cardano.protocol.parameters",
@@ -55,7 +55,7 @@ impl BlockVrfValidator {
5555
history: Arc<Mutex<StateHistory<State>>>,
5656
mut vrf_validation_publisher: VrfValidationPublisher,
5757
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
58-
mut blocks_subscription: Box<dyn Subscription<Message>>,
58+
mut block_subscription: Box<dyn Subscription<Message>>,
5959
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
6060
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
6161
mut spo_state_subscription: Box<dyn Subscription<Message>>,
@@ -77,7 +77,7 @@ impl BlockVrfValidator {
7777
let mut state = history.lock().await.get_or_init_with(State::new);
7878
let mut current_block: Option<BlockInfo> = None;
7979

80-
let (_, message) = blocks_subscription.read().await?;
80+
let (_, message) = block_subscription.read().await?;
8181
match message.as_ref() {
8282
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
8383
// handle rollback here
@@ -190,10 +190,10 @@ impl BlockVrfValidator {
190190
.unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string());
191191
info!("Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'");
192192

193-
let blocks_subscribe_topic = config
194-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
195-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
196-
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
193+
let block_subscribe_topic = config
194+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
195+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
196+
info!("Creating block subscription on '{block_subscribe_topic}'");
197197

198198
let epoch_activity_subscribe_topic = config
199199
.get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0)
@@ -218,7 +218,7 @@ impl BlockVrfValidator {
218218
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
219219
let protocol_parameters_subscription =
220220
context.subscribe(&protocol_parameters_subscribe_topic).await?;
221-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
221+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
222222
let epoch_activity_subscription =
223223
context.subscribe(&epoch_activity_subscribe_topic).await?;
224224
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
@@ -236,7 +236,7 @@ impl BlockVrfValidator {
236236
history,
237237
vrf_validation_publisher,
238238
bootstrapped_subscription,
239-
blocks_subscription,
239+
block_subscription,
240240
protocol_parameters_subscription,
241241
epoch_activity_subscription,
242242
spo_state_subscription,

modules/chain_store/src/stores/fjall.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ impl FjallBlockStore {
193193
}
194194

195195
fn get_by_number_range(&self, min_number: u64, max_number: u64) -> Result<Vec<Block>> {
196+
if max_number < min_number {
197+
return Err(anyhow::anyhow!(
198+
"Invalid number range min={min_number}, max={max_number}"
199+
));
200+
}
201+
let expected_count = max_number - min_number + 1;
202+
196203
let min_number_bytes = min_number.to_be_bytes();
197204
let max_number_bytes = max_number.to_be_bytes();
198205
let mut blocks = vec![];
@@ -202,6 +209,12 @@ impl FjallBlockStore {
202209
blocks.push(block);
203210
}
204211
}
212+
if blocks.len() as u64 != expected_count {
213+
return Err(anyhow::anyhow!(
214+
"Expected {expected_count} blocks, got {}",
215+
blocks.len()
216+
));
217+
}
205218
Ok(blocks)
206219
}
207220

modules/epochs_state/src/epochs_state.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
3232
"bootstrapped-subscribe-topic",
3333
"cardano.sequence.bootstrapped",
3434
);
35-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
36-
("blocks-subscribe-topic", "cardano.block.proposed");
35+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
36+
("block-subscribe-topic", "cardano.block.proposed");
3737
const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) =
3838
("block-txs-subscribe-topic", "cardano.block.txs");
3939
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
@@ -58,7 +58,7 @@ impl EpochsState {
5858
history: Arc<Mutex<StateHistory<State>>>,
5959
epochs_history: EpochsHistoryState,
6060
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
61-
mut blocks_subscription: Box<dyn Subscription<Message>>,
61+
mut block_subscription: Box<dyn Subscription<Message>>,
6262
mut block_txs_subscription: Box<dyn Subscription<Message>>,
6363
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
6464
mut epoch_activity_publisher: EpochActivityPublisher,
@@ -80,11 +80,11 @@ impl EpochsState {
8080
let mut current_block: Option<BlockInfo> = None;
8181

8282
// Read both topics in parallel
83-
let blocks_message_f = blocks_subscription.read();
83+
let block_message_f = block_subscription.read();
8484
let block_txs_message_f = block_txs_subscription.read();
8585

8686
// Handle blocks first
87-
let (_, message) = blocks_message_f.await?;
87+
let (_, message) = block_message_f.await?;
8888
match message.as_ref() {
8989
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
9090
// handle rollback here
@@ -188,10 +188,10 @@ impl EpochsState {
188188
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
189189
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
190190

191-
let blocks_subscribe_topic = config
192-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
193-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
194-
info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'");
191+
let block_subscribe_topic = config
192+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
193+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
194+
info!("Creating subscriber for blocks on '{block_subscribe_topic}'");
195195

196196
let block_txs_subscribe_topic = config
197197
.get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0)
@@ -231,7 +231,7 @@ impl EpochsState {
231231

232232
// Subscribe
233233
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
234-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
234+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
235235
let protocol_parameters_subscription =
236236
context.subscribe(&protocol_parameters_subscribe_topic).await?;
237237
let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?;
@@ -342,7 +342,7 @@ impl EpochsState {
342342
history,
343343
epochs_history,
344344
bootstrapped_subscription,
345-
blocks_subscription,
345+
block_subscription,
346346
block_txs_subscription,
347347
protocol_parameters_subscription,
348348
epoch_activity_publisher,

modules/epochs_state/src/state.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,13 @@ impl State {
6969
epoch: 0,
7070
epoch_start_time: genesis.byron_timestamp,
7171
first_block_time: genesis.byron_timestamp,
72-
first_block_height: 0,
73-
last_block_time: 0,
74-
last_block_height: 0,
72+
// NOTE:
73+
// First block height is 1
74+
// only because we don't handle EBB for now
75+
// so by default, we counter epoch 0's EBB
76+
first_block_height: 1,
77+
last_block_time: genesis.byron_timestamp,
78+
last_block_height: 1,
7579
blocks_minted: HashMap::new(),
7680
epoch_blocks: 0,
7781
epoch_txs: 0,

modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ use tracing::{debug, error, info, info_span, Instrument};
3030
mod pause;
3131
use pause::PauseType;
3232

33-
const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped";
34-
const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available";
35-
const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete";
33+
const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
34+
"bootstrapped-subscribe-topic",
35+
"cardano.sequence.bootstrapped",
36+
);
37+
const DEFAULT_BLOCK_PUBLISH_TOPIC: (&str, &str) =
38+
("block-publish-topic", "cardano.block.available");
39+
const DEFAULT_COMPLETION_TOPIC: (&str, &str) = ("completion-topic", "cardano.snapshot.complete");
3640

3741
const DEFAULT_AGGREGATOR_URL: &str =
3842
"https://aggregator.release-mainnet.api.mithril.network/aggregator";
@@ -237,12 +241,14 @@ impl MithrilSnapshotFetcher {
237241
config: Arc<Config>,
238242
genesis: GenesisValues,
239243
) -> Result<()> {
240-
let block_topic =
241-
config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string());
242-
info!("Publishing blocks on '{block_topic}'");
243-
244-
let completion_topic =
245-
config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string());
244+
let block_publish_topic = config
245+
.get_string(DEFAULT_BLOCK_PUBLISH_TOPIC.0)
246+
.unwrap_or(DEFAULT_BLOCK_PUBLISH_TOPIC.1.to_string());
247+
info!("Publishing blocks on '{block_publish_topic}'");
248+
249+
let completion_topic = config
250+
.get_string(DEFAULT_COMPLETION_TOPIC.0)
251+
.unwrap_or(DEFAULT_COMPLETION_TOPIC.1.to_string());
246252
info!("Publishing completion on '{completion_topic}'");
247253

248254
let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string());
@@ -356,7 +362,7 @@ impl MithrilSnapshotFetcher {
356362

357363
context
358364
.message_bus
359-
.publish(&block_topic, Arc::new(message_enum))
365+
.publish(&block_publish_topic, Arc::new(message_enum))
360366
.await
361367
.unwrap_or_else(|e| error!("Failed to publish block message: {e}"));
362368

@@ -389,21 +395,23 @@ impl MithrilSnapshotFetcher {
389395

390396
/// Main init function
391397
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
392-
let startup_topic =
393-
config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string());
394-
info!("Creating startup subscriber on '{startup_topic}'");
398+
let bootstrapped_subscribe_topic = config
399+
.get_string(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.0)
400+
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
401+
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
395402

396-
let mut subscription = context.subscribe(&startup_topic).await?;
403+
let mut bootstrapped_subscription =
404+
context.subscribe(&bootstrapped_subscribe_topic).await?;
397405
context.clone().run(async move {
398-
let Ok((_, startup_message)) = subscription.read().await else {
406+
let Ok((_, bootstrapped_message)) = bootstrapped_subscription.read().await else {
399407
return;
400408
};
401-
info!("Received startup message");
402-
let genesis = match startup_message.as_ref() {
409+
info!("Received bootstrapped message");
410+
let genesis = match bootstrapped_message.as_ref() {
403411
Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => {
404412
complete.values.clone()
405413
}
406-
x => panic!("unexpected startup message: {x:?}"),
414+
x => panic!("unexpected bootstrapped message: {x:?}"),
407415
};
408416

409417
let mut delay = 1;

modules/spo_state/src/spo_state.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) =
4747
("withdrawals-subscribe-topic", "cardano.withdrawals");
4848
const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) =
4949
("governance-subscribe-topic", "cardano.governance");
50-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
51-
("blocks-subscribe-topic", "cardano.block.proposed");
50+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
51+
("block-subscribe-topic", "cardano.block.proposed");
5252
const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) =
5353
("epoch-activity-subscribe-topic", "cardano.epoch.activity");
5454
const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) =
@@ -87,7 +87,7 @@ impl SPOState {
8787
store_config: &StoreConfig,
8888
// subscribers
8989
mut certificates_subscription: Box<dyn Subscription<Message>>,
90-
mut blocks_subscription: Box<dyn Subscription<Message>>,
90+
mut block_subscription: Box<dyn Subscription<Message>>,
9191
mut withdrawals_subscription: Option<Box<dyn Subscription<Message>>>,
9292
mut governance_subscription: Option<Box<dyn Subscription<Message>>>,
9393
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
@@ -113,7 +113,7 @@ impl SPOState {
113113

114114
// read per-block topics in parallel
115115
let certs_message_f = certificates_subscription.read();
116-
let blocks_message_f = blocks_subscription.read();
116+
let block_message_f = block_subscription.read();
117117
let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read());
118118
let governance_message_f = governance_subscription.as_mut().map(|s| s.read());
119119
let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read());
@@ -140,7 +140,7 @@ impl SPOState {
140140

141141
// handle blocks (handle_mint) before handle_tx_certs
142142
// in case of epoch boundary
143-
let (_, block_message) = blocks_message_f.await?;
143+
let (_, block_message) = block_message_f.await?;
144144
match block_message.as_ref() {
145145
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
146146
let span =
@@ -438,10 +438,10 @@ impl SPOState {
438438
.unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string());
439439
info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'");
440440

441-
let blocks_subscribe_topic = config
442-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
443-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
444-
info!("Creating blocks subscriber on '{blocks_subscribe_topic}'");
441+
let block_subscribe_topic = config
442+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
443+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
444+
info!("Creating block subscriber on '{block_subscribe_topic}'");
445445

446446
let stake_reward_deltas_subscribe_topic = config
447447
.get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0)
@@ -765,7 +765,7 @@ impl SPOState {
765765

766766
// Subscriptions
767767
let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?;
768-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
768+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
769769
let epoch_activity_subscription =
770770
context.subscribe(&epoch_activity_subscribe_topic).await?;
771771
let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?;
@@ -811,7 +811,7 @@ impl SPOState {
811811
retired_pools_history,
812812
&store_config,
813813
certificates_subscription,
814-
blocks_subscription,
814+
block_subscription,
815815
withdrawals_subscription,
816816
governance_subscription,
817817
epoch_activity_subscription,

0 commit comments

Comments
 (0)