Skip to content

Commit 8b66e45

Browse files
authored
Merge pull request #1716 from input-output-hk/ensemble/modify-block-streamer-return-type
Update `BlockStreamer` return type
2 parents 1dd6495 + 06ce18e commit 8b66e45

File tree

10 files changed

+117
-43
lines changed

10 files changed

+117
-43
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.9"
3+
version = "0.5.10"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/services/cardano_transactions_importer.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::anyhow;
12
use std::mem;
23
use std::ops::Range;
34
use std::path::{Path, PathBuf};
@@ -6,7 +7,7 @@ use std::sync::Arc;
67
use async_trait::async_trait;
78
use slog::{debug, Logger};
89

9-
use mithril_common::cardano_block_scanner::BlockScanner;
10+
use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks};
1011
use mithril_common::crypto_helper::{MKTree, MKTreeNode};
1112
use mithril_common::entities::{BlockNumber, BlockRange, CardanoTransaction, ImmutableFileNumber};
1213
use mithril_common::signable_builder::TransactionsImporter;
@@ -105,14 +106,21 @@ impl CardanoTransactionsImporter {
105106
let mut streamer = self.block_scanner.scan(&self.dirpath, from, until).await?;
106107

107108
while let Some(blocks) = streamer.poll_next().await? {
108-
let parsed_transactions: Vec<CardanoTransaction> = blocks
109-
.into_iter()
110-
.flat_map(|b| b.into_transactions())
111-
.collect();
112-
113-
self.transaction_store
114-
.store_transactions(parsed_transactions)
115-
.await?;
109+
match blocks {
110+
ChainScannedBlocks::RollForwards(forward_blocks) => {
111+
let parsed_transactions: Vec<CardanoTransaction> = forward_blocks
112+
.into_iter()
113+
.flat_map(|b| b.into_transactions())
114+
.collect();
115+
116+
self.transaction_store
117+
.store_transactions(parsed_transactions)
118+
.await?;
119+
}
120+
ChainScannedBlocks::RollBackward(_) => {
121+
return Err(anyhow!("RollBackward not yet implemented"))
122+
}
123+
}
116124
}
117125

118126
Ok(())

mithril-common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-common"
3-
version = "0.4.8"
3+
version = "0.4.9"
44
description = "Common types, interfaces, and utilities for Mithril nodes."
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-common/src/cardano_block_scanner/block_scanner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl BlockScanner for CardanoBlockScanner {
6060

6161
#[cfg(test)]
6262
mod tests {
63+
use crate::cardano_block_scanner::BlockStreamerTestExtensions;
6364
use crate::test_utils::{TempDir, TestLogger};
6465

6566
use super::*;

mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::path::Path;
44
use async_trait::async_trait;
55
use tokio::sync::RwLock;
66

7+
use crate::cardano_block_scanner::ChainScannedBlocks;
78
use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, ScannedBlock};
89
use crate::entities::ImmutableFileNumber;
910
use crate::StdResult;
@@ -57,13 +58,18 @@ impl DumbBlockStreamer {
5758

5859
#[async_trait]
5960
impl BlockStreamer for DumbBlockStreamer {
60-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>> {
61-
Ok(self.blocks.pop_front())
61+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
62+
Ok(self
63+
.blocks
64+
.pop_front()
65+
.map(ChainScannedBlocks::RollForwards))
6266
}
6367
}
6468

6569
#[cfg(test)]
6670
mod tests {
71+
use crate::cardano_block_scanner::BlockStreamerTestExtensions;
72+
6773
use super::*;
6874

6975
#[tokio::test]
@@ -79,7 +85,10 @@ mod tests {
7985
let mut streamer = DumbBlockStreamer::new(vec![expected_blocks.clone()]);
8086

8187
let blocks = streamer.poll_next().await.unwrap();
82-
assert_eq!(blocks, Some(expected_blocks));
88+
assert_eq!(
89+
blocks,
90+
Some(ChainScannedBlocks::RollForwards(expected_blocks))
91+
);
8392

8493
let blocks = streamer.poll_next().await.unwrap();
8594
assert_eq!(blocks, None);
@@ -98,13 +107,22 @@ mod tests {
98107
let mut streamer = DumbBlockStreamer::new(expected_blocks.clone());
99108

100109
let blocks = streamer.poll_next().await.unwrap();
101-
assert_eq!(blocks, Some(expected_blocks[0].clone()));
110+
assert_eq!(
111+
blocks,
112+
Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
113+
);
102114

103115
let blocks = streamer.poll_next().await.unwrap();
104-
assert_eq!(blocks, Some(expected_blocks[1].clone()));
116+
assert_eq!(
117+
blocks,
118+
Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
119+
);
105120

106121
let blocks = streamer.poll_next().await.unwrap();
107-
assert_eq!(blocks, Some(expected_blocks[2].clone()));
122+
assert_eq!(
123+
blocks,
124+
Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
125+
);
108126

109127
let blocks = streamer.poll_next().await.unwrap();
110128
assert_eq!(blocks, None);

mithril-common/src/cardano_block_scanner/immutable_block_streamer.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use pallas_hardano::storage::immutable::chunk::{read_blocks, Reader};
77
use pallas_traverse::MultiEraBlock;
88
use slog::{debug, error, Logger};
99

10+
use crate::cardano_block_scanner::ChainScannedBlocks;
1011
use crate::cardano_block_scanner::{BlockStreamer, ScannedBlock};
1112
use crate::digesters::ImmutableFile;
1213
use crate::StdResult;
@@ -20,7 +21,7 @@ pub struct ImmutableBlockStreamer {
2021

2122
#[async_trait]
2223
impl BlockStreamer for ImmutableBlockStreamer {
23-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>> {
24+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
2425
match &self.remaining_immutable_files.pop_front() {
2526
Some(immutable_file) => {
2627
debug!(
@@ -37,7 +38,7 @@ impl BlockStreamer for ImmutableBlockStreamer {
3738
immutable_file.path.display()
3839
)
3940
})?;
40-
Ok(Some(blocks))
41+
Ok(Some(ChainScannedBlocks::RollForwards(blocks)))
4142
}
4243
None => Ok(None),
4344
}
@@ -123,12 +124,22 @@ impl ImmutableBlockStreamer {
123124

124125
#[cfg(test)]
125126
mod tests {
127+
use crate::cardano_block_scanner::BlockStreamerTestExtensions;
126128
use crate::test_utils::{TempDir, TestLogger};
127129

128130
use super::*;
129131

130132
#[tokio::test]
131133
async fn test_parse_expected_number_of_transactions() {
134+
fn sum_of_transactions_len(o: Option<ChainScannedBlocks>) -> Option<usize> {
135+
match o {
136+
Some(ChainScannedBlocks::RollForwards(b)) => {
137+
Some(b.into_iter().map(|b| b.transactions_len()).sum())
138+
}
139+
_ => None,
140+
}
141+
}
142+
132143
// We know the number of transactions in those prebuilt immutables
133144
let immutable_files = [
134145
("00000.chunk", 0usize),
@@ -148,19 +159,19 @@ mod tests {
148159

149160
let immutable_blocks = streamer.poll_next().await.unwrap();
150161
assert_eq!(
151-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
162+
sum_of_transactions_len(immutable_blocks),
152163
Some(immutable_files[0].1)
153164
);
154165

155166
let immutable_blocks = streamer.poll_next().await.unwrap();
156167
assert_eq!(
157-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
168+
sum_of_transactions_len(immutable_blocks),
158169
Some(immutable_files[1].1)
159170
);
160171

161172
let immutable_blocks = streamer.poll_next().await.unwrap();
162173
assert_eq!(
163-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
174+
sum_of_transactions_len(immutable_blocks),
164175
Some(immutable_files[2].1)
165176
);
166177

mithril-common/src/cardano_block_scanner/interface.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use anyhow::anyhow;
12
use std::path::Path;
23

34
use async_trait::async_trait;
45

56
use crate::cardano_block_scanner::ScannedBlock;
6-
use crate::entities::ImmutableFileNumber;
7+
use crate::entities::{ChainPoint, ImmutableFileNumber};
78
use crate::StdResult;
89

910
/// A scanner that can read cardano transactions in a cardano database
@@ -55,18 +56,46 @@ pub trait BlockScanner: Sync + Send {
5556
) -> StdResult<Box<dyn BlockStreamer>>;
5657
}
5758

59+
/// [ChainScannedBlocks] allows to scan new blocks and handle rollbacks
60+
#[derive(Debug, PartialEq)]
61+
pub enum ChainScannedBlocks {
62+
/// Roll forward on the chain to the next list of [ScannedBlock]
63+
RollForwards(Vec<ScannedBlock>),
64+
/// Roll backward on the chain to the previous [ChainPoint]
65+
RollBackward(ChainPoint),
66+
}
67+
5868
/// Trait that define how blocks are streamed from a Cardano database
5969
#[async_trait]
6070
pub trait BlockStreamer: Sync + Send {
6171
/// Stream the next available blocks
62-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>>;
72+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>>;
73+
}
6374

64-
/// Stream all the available blocks, may be very memory intensive
65-
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>> {
66-
let mut blocks = Vec::new();
67-
while let Some(mut next_blocks) = self.poll_next().await? {
68-
blocks.append(&mut next_blocks);
75+
cfg_test_tools! {
76+
/// Tests extensions methods for the [BlockStreamer] trait.
77+
#[async_trait]
78+
pub trait BlockStreamerTestExtensions{
79+
/// Stream all the available blocks, may be very memory intensive
80+
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>>;
81+
}
82+
83+
#[async_trait]
84+
impl <S: BlockStreamer + ?Sized> BlockStreamerTestExtensions for S {
85+
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>> {
86+
let mut all_blocks = Vec::new();
87+
while let Some(next_blocks) = self.poll_next().await? {
88+
match next_blocks {
89+
ChainScannedBlocks::RollForwards(mut forward_blocks) => {
90+
all_blocks.append(&mut forward_blocks);
91+
}
92+
ChainScannedBlocks::RollBackward(_) => {
93+
return Err(anyhow!("poll_all: RollBackward not supported"));
94+
}
95+
};
96+
}
97+
Ok(all_blocks)
6998
}
70-
Ok(blocks)
7199
}
100+
72101
}

mithril-signer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-signer"
3-
version = "0.2.136"
3+
version = "0.2.137"
44
description = "A Mithril Signer"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-signer/src/cardano_transactions_importer.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use async_trait::async_trait;
77
use slog::{debug, Logger};
88

9-
use mithril_common::cardano_block_scanner::BlockScanner;
9+
use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks};
1010
use mithril_common::crypto_helper::{MKTree, MKTreeNode};
1111
use mithril_common::entities::{BlockNumber, BlockRange, CardanoTransaction, ImmutableFileNumber};
1212
use mithril_common::signable_builder::TransactionsImporter;
@@ -105,14 +105,21 @@ impl CardanoTransactionsImporter {
105105
let mut streamer = self.block_scanner.scan(&self.dirpath, from, until).await?;
106106

107107
while let Some(blocks) = streamer.poll_next().await? {
108-
let parsed_transactions: Vec<CardanoTransaction> = blocks
109-
.into_iter()
110-
.flat_map(|b| b.into_transactions())
111-
.collect();
112-
113-
self.transaction_store
114-
.store_transactions(parsed_transactions)
115-
.await?;
108+
match blocks {
109+
ChainScannedBlocks::RollForwards(blocks) => {
110+
let parsed_transactions: Vec<CardanoTransaction> = blocks
111+
.into_iter()
112+
.flat_map(|b| b.into_transactions())
113+
.collect();
114+
115+
self.transaction_store
116+
.store_transactions(parsed_transactions)
117+
.await?;
118+
}
119+
ChainScannedBlocks::RollBackward(_) => {
120+
return Err(anyhow::anyhow!("RollBackward not supported"));
121+
}
122+
}
116123
}
117124

118125
Ok(())

0 commit comments

Comments
 (0)