Skip to content

Commit 2448acb

Browse files
authored
Merge pull request #366 from input-output-hk/gd/epochs-blocks-endpoint
feat: implement `/epochs/{number}/blocks` endpoint
2 parents d9d274c + 2c687e0 commit 2448acb

File tree

10 files changed

+228
-68
lines changed

10 files changed

+228
-68
lines changed

common/src/queries/blocks.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ pub enum BlocksStateQuery {
6666
GetBlockHashes {
6767
block_numbers: Vec<u64>,
6868
},
69+
GetBlockHashesByNumberRange {
70+
min_number: u64,
71+
max_number: u64,
72+
},
6973
GetTransactionHashes {
7074
tx_ids: Vec<TxIdentifier>,
7175
},
@@ -97,6 +101,7 @@ pub enum BlocksStateQueryResponse {
97101
BlockTransactionsCBOR(BlockTransactionsCBOR),
98102
BlockInvolvedAddresses(BlockInvolvedAddresses),
99103
BlockHashes(BlockHashes),
104+
BlockHashesByNumberRange(Vec<BlockHash>),
100105
TransactionHashes(TransactionHashes),
101106
UTxOHashes(UTxOHashes),
102107
TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps),

modules/block_kes_validator/src/block_kes_validator.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2626
"bootstrapped-subscribe-topic",
2727
"cardano.sequence.bootstrapped",
2828
);
29-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
30-
("blocks-subscribe-topic", "cardano.block.proposed");
29+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
30+
("block-subscribe-topic", "cardano.block.proposed");
3131
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
3232
"protocol-parameters-subscribe-topic",
3333
"cardano.protocol.parameters",
@@ -50,7 +50,7 @@ impl BlockKesValidator {
5050
history: Arc<Mutex<StateHistory<State>>>,
5151
kes_validation_publisher: KesValidationPublisher,
5252
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
53-
mut blocks_subscription: Box<dyn Subscription<Message>>,
53+
mut block_subscription: Box<dyn Subscription<Message>>,
5454
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
5555
mut spo_state_subscription: Box<dyn Subscription<Message>>,
5656
) -> Result<()> {
@@ -70,7 +70,7 @@ impl BlockKesValidator {
7070
let mut state = history.lock().await.get_or_init_with(State::new);
7171
let mut current_block: Option<BlockInfo> = None;
7272

73-
let (_, message) = blocks_subscription.read().await?;
73+
let (_, message) = block_subscription.read().await?;
7474
match message.as_ref() {
7575
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
7676
// handle rollback here
@@ -160,10 +160,10 @@ impl BlockKesValidator {
160160
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
161161
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
162162

163-
let blocks_subscribe_topic = config
164-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
165-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
166-
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
163+
let block_subscribe_topic = config
164+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
165+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
166+
info!("Creating block subscription on '{block_subscribe_topic}'");
167167

168168
let protocol_parameters_subscribe_topic = config
169169
.get_string(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.0)
@@ -181,7 +181,7 @@ impl BlockKesValidator {
181181

182182
// Subscribers
183183
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
184-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
184+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
185185
let protocol_parameters_subscription =
186186
context.subscribe(&protocol_parameters_subscribe_topic).await?;
187187
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
@@ -198,7 +198,7 @@ impl BlockKesValidator {
198198
history,
199199
kes_validation_publisher,
200200
bootstrapped_subscription,
201-
blocks_subscription,
201+
block_subscription,
202202
protocol_parameters_subscription,
203203
spo_state_subscription,
204204
)

modules/block_vrf_validator/src/block_vrf_validator.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2727
"bootstrapped-subscribe-topic",
2828
"cardano.sequence.bootstrapped",
2929
);
30-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
31-
("blocks-subscribe-topic", "cardano.block.proposed");
30+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
31+
("block-subscribe-topic", "cardano.block.proposed");
3232
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
3333
"protocol-parameters-subscribe-topic",
3434
"cardano.protocol.parameters",
@@ -55,7 +55,7 @@ impl BlockVrfValidator {
5555
history: Arc<Mutex<StateHistory<State>>>,
5656
mut vrf_validation_publisher: VrfValidationPublisher,
5757
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
58-
mut blocks_subscription: Box<dyn Subscription<Message>>,
58+
mut block_subscription: Box<dyn Subscription<Message>>,
5959
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
6060
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
6161
mut spo_state_subscription: Box<dyn Subscription<Message>>,
@@ -77,7 +77,7 @@ impl BlockVrfValidator {
7777
let mut state = history.lock().await.get_or_init_with(State::new);
7878
let mut current_block: Option<BlockInfo> = None;
7979

80-
let (_, message) = blocks_subscription.read().await?;
80+
let (_, message) = block_subscription.read().await?;
8181
match message.as_ref() {
8282
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
8383
// handle rollback here
@@ -190,10 +190,10 @@ impl BlockVrfValidator {
190190
.unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string());
191191
info!("Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'");
192192

193-
let blocks_subscribe_topic = config
194-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
195-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
196-
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
193+
let block_subscribe_topic = config
194+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
195+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
196+
info!("Creating block subscription on '{block_subscribe_topic}'");
197197

198198
let epoch_activity_subscribe_topic = config
199199
.get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0)
@@ -218,7 +218,7 @@ impl BlockVrfValidator {
218218
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
219219
let protocol_parameters_subscription =
220220
context.subscribe(&protocol_parameters_subscribe_topic).await?;
221-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
221+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
222222
let epoch_activity_subscription =
223223
context.subscribe(&epoch_activity_subscribe_topic).await?;
224224
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
@@ -236,7 +236,7 @@ impl BlockVrfValidator {
236236
history,
237237
vrf_validation_publisher,
238238
bootstrapped_subscription,
239-
blocks_subscription,
239+
block_subscription,
240240
protocol_parameters_subscription,
241241
epoch_activity_subscription,
242242
spo_state_subscription,

modules/chain_store/src/chain_store.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,26 @@ impl ChainStore {
345345
block_hashes,
346346
}))
347347
}
348+
BlocksStateQuery::GetBlockHashesByNumberRange {
349+
min_number,
350+
max_number,
351+
} => {
352+
if *max_number < *min_number {
353+
return Ok(BlocksStateQueryResponse::Error(
354+
QueryError::invalid_request("Invalid number range"),
355+
));
356+
}
357+
let mut block_hashes = Vec::new();
358+
let blocks = store.get_blocks_by_number_range(*min_number, *max_number)?;
359+
for block in blocks {
360+
if let Ok(hash) = Self::get_block_hash(&block) {
361+
block_hashes.push(hash);
362+
}
363+
}
364+
Ok(BlocksStateQueryResponse::BlockHashesByNumberRange(
365+
block_hashes,
366+
))
367+
}
348368
BlocksStateQuery::GetTransactionHashes { tx_ids } => {
349369
let mut block_ids: HashMap<_, Vec<_>> = HashMap::new();
350370
for tx_id in tx_ids {

modules/chain_store/src/stores/fjall.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ impl FjallBlockStore {
193193
}
194194

195195
fn get_by_number_range(&self, min_number: u64, max_number: u64) -> Result<Vec<Block>> {
196+
if max_number < min_number {
197+
return Err(anyhow::anyhow!(
198+
"Invalid number range min={min_number}, max={max_number}"
199+
));
200+
}
201+
let expected_count = max_number - min_number + 1;
202+
196203
let min_number_bytes = min_number.to_be_bytes();
197204
let max_number_bytes = max_number.to_be_bytes();
198205
let mut blocks = vec![];
@@ -202,6 +209,12 @@ impl FjallBlockStore {
202209
blocks.push(block);
203210
}
204211
}
212+
if blocks.len() as u64 != expected_count {
213+
return Err(anyhow::anyhow!(
214+
"Expected {expected_count} blocks, got {}",
215+
blocks.len()
216+
));
217+
}
205218
Ok(blocks)
206219
}
207220

modules/epochs_state/src/epochs_state.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2626
"bootstrapped-subscribe-topic",
2727
"cardano.sequence.bootstrapped",
2828
);
29-
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
30-
("blocks-subscribe-topic", "cardano.block.proposed");
29+
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
30+
("block-subscribe-topic", "cardano.block.proposed");
3131
const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) =
3232
("block-txs-subscribe-topic", "cardano.block.txs");
3333
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
@@ -51,7 +51,7 @@ impl EpochsState {
5151
async fn run(
5252
history: Arc<Mutex<StateHistory<State>>>,
5353
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
54-
mut blocks_subscription: Box<dyn Subscription<Message>>,
54+
mut block_subscription: Box<dyn Subscription<Message>>,
5555
mut block_txs_subscription: Box<dyn Subscription<Message>>,
5656
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
5757
mut epoch_activity_publisher: EpochActivityPublisher,
@@ -73,11 +73,11 @@ impl EpochsState {
7373
let mut current_block: Option<BlockInfo> = None;
7474

7575
// Read both topics in parallel
76-
let blocks_message_f = blocks_subscription.read();
76+
let block_message_f = block_subscription.read();
7777
let block_txs_message_f = block_txs_subscription.read();
7878

7979
// Handle blocks first
80-
let (_, message) = blocks_message_f.await?;
80+
let (_, message) = block_message_f.await?;
8181
match message.as_ref() {
8282
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
8383
// handle rollback here
@@ -170,10 +170,10 @@ impl EpochsState {
170170
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
171171
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
172172

173-
let blocks_subscribe_topic = config
174-
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
175-
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
176-
info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'");
173+
let block_subscribe_topic = config
174+
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
175+
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
176+
info!("Creating subscriber for blocks on '{block_subscribe_topic}'");
177177

178178
let block_txs_subscribe_topic = config
179179
.get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0)
@@ -206,7 +206,7 @@ impl EpochsState {
206206

207207
// Subscribe
208208
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
209-
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
209+
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
210210
let protocol_parameters_subscription =
211211
context.subscribe(&protocol_parameters_subscribe_topic).await?;
212212
let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?;
@@ -257,7 +257,7 @@ impl EpochsState {
257257
Self::run(
258258
history,
259259
bootstrapped_subscription,
260-
blocks_subscription,
260+
block_subscription,
261261
block_txs_subscription,
262262
protocol_parameters_subscription,
263263
epoch_activity_publisher,

modules/epochs_state/src/state.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,13 @@ impl State {
6969
epoch: 0,
7070
epoch_start_time: genesis.byron_timestamp,
7171
first_block_time: genesis.byron_timestamp,
72-
first_block_height: 0,
73-
last_block_time: 0,
74-
last_block_height: 0,
72+
// NOTE:
73+
// First block height is 1
74+
// only because we don't handle EBB for now
75+
// so by default, we counter epoch 0's EBB
76+
first_block_height: 1,
77+
last_block_time: genesis.byron_timestamp,
78+
last_block_height: 1,
7579
blocks_minted: HashMap::new(),
7680
epoch_blocks: 0,
7781
epoch_txs: 0,
@@ -342,6 +346,22 @@ mod tests {
342346
);
343347
}
344348

349+
#[test]
350+
fn handle_mint_without_issuer() {
351+
let mut state = State::new(&GenesisValues::mainnet());
352+
let mut block = make_block(100);
353+
state.handle_mint(&block, None);
354+
block.number += 1;
355+
state.handle_mint(&block, None);
356+
357+
assert_eq!(state.epoch_blocks, 2);
358+
assert_eq!(state.blocks_minted.len(), 0);
359+
assert_eq!(
360+
state.blocks_minted.get(&keyhash_224(b"issuer").into()),
361+
None
362+
);
363+
}
364+
345365
#[test]
346366
fn handle_mint_multiple_issuer_records_counts() {
347367
let mut state = State::new(&GenesisValues::mainnet());

modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ use tracing::{debug, error, info, info_span, Instrument};
3030
mod pause;
3131
use pause::PauseType;
3232

33-
const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped";
34-
const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available";
35-
const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete";
33+
const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
34+
"bootstrapped-subscribe-topic",
35+
"cardano.sequence.bootstrapped",
36+
);
37+
const DEFAULT_BLOCK_PUBLISH_TOPIC: (&str, &str) =
38+
("block-publish-topic", "cardano.block.available");
39+
const DEFAULT_COMPLETION_TOPIC: (&str, &str) = ("completion-topic", "cardano.snapshot.complete");
3640

3741
const DEFAULT_AGGREGATOR_URL: &str =
3842
"https://aggregator.release-mainnet.api.mithril.network/aggregator";
@@ -237,12 +241,14 @@ impl MithrilSnapshotFetcher {
237241
config: Arc<Config>,
238242
genesis: GenesisValues,
239243
) -> Result<()> {
240-
let block_topic =
241-
config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string());
242-
info!("Publishing blocks on '{block_topic}'");
243-
244-
let completion_topic =
245-
config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string());
244+
let block_publish_topic = config
245+
.get_string(DEFAULT_BLOCK_PUBLISH_TOPIC.0)
246+
.unwrap_or(DEFAULT_BLOCK_PUBLISH_TOPIC.1.to_string());
247+
info!("Publishing blocks on '{block_publish_topic}'");
248+
249+
let completion_topic = config
250+
.get_string(DEFAULT_COMPLETION_TOPIC.0)
251+
.unwrap_or(DEFAULT_COMPLETION_TOPIC.1.to_string());
246252
info!("Publishing completion on '{completion_topic}'");
247253

248254
let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string());
@@ -356,7 +362,7 @@ impl MithrilSnapshotFetcher {
356362

357363
context
358364
.message_bus
359-
.publish(&block_topic, Arc::new(message_enum))
365+
.publish(&block_publish_topic, Arc::new(message_enum))
360366
.await
361367
.unwrap_or_else(|e| error!("Failed to publish block message: {e}"));
362368

@@ -389,21 +395,23 @@ impl MithrilSnapshotFetcher {
389395

390396
/// Main init function
391397
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
392-
let startup_topic =
393-
config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string());
394-
info!("Creating startup subscriber on '{startup_topic}'");
398+
let bootstrapped_subscribe_topic = config
399+
.get_string(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.0)
400+
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
401+
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");
395402

396-
let mut subscription = context.subscribe(&startup_topic).await?;
403+
let mut bootstrapped_subscription =
404+
context.subscribe(&bootstrapped_subscribe_topic).await?;
397405
context.clone().run(async move {
398-
let Ok((_, startup_message)) = subscription.read().await else {
406+
let Ok((_, bootstrapped_message)) = bootstrapped_subscription.read().await else {
399407
return;
400408
};
401-
info!("Received startup message");
402-
let genesis = match startup_message.as_ref() {
409+
info!("Received bootstrapped message");
410+
let genesis = match bootstrapped_message.as_ref() {
403411
Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => {
404412
complete.values.clone()
405413
}
406-
x => panic!("unexpected startup message: {x:?}"),
414+
x => panic!("unexpected bootstrapped message: {x:?}"),
407415
};
408416

409417
let mut delay = 1;

0 commit comments

Comments
 (0)