Skip to content

Commit 69154d1

Browse files
committed
Add ChainScannedBlocks and adapt BlockStreamer trait
1 parent 1dd6495 commit 69154d1

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::path::Path;
44
use async_trait::async_trait;
55
use tokio::sync::RwLock;
66

7+
use crate::cardano_block_scanner::ChainScannedBlocks;
78
use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, ScannedBlock};
89
use crate::entities::ImmutableFileNumber;
910
use crate::StdResult;
@@ -57,8 +58,11 @@ impl DumbBlockStreamer {
5758

5859
#[async_trait]
5960
impl BlockStreamer for DumbBlockStreamer {
60-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>> {
61-
Ok(self.blocks.pop_front())
61+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
62+
Ok(self
63+
.blocks
64+
.pop_front()
65+
.map(ChainScannedBlocks::RollForwards))
6266
}
6367
}
6468

@@ -79,7 +83,10 @@ mod tests {
7983
let mut streamer = DumbBlockStreamer::new(vec![expected_blocks.clone()]);
8084

8185
let blocks = streamer.poll_next().await.unwrap();
82-
assert_eq!(blocks, Some(expected_blocks));
86+
assert_eq!(
87+
blocks,
88+
Some(ChainScannedBlocks::RollForwards(expected_blocks))
89+
);
8390

8491
let blocks = streamer.poll_next().await.unwrap();
8592
assert_eq!(blocks, None);
@@ -98,13 +105,22 @@ mod tests {
98105
let mut streamer = DumbBlockStreamer::new(expected_blocks.clone());
99106

100107
let blocks = streamer.poll_next().await.unwrap();
101-
assert_eq!(blocks, Some(expected_blocks[0].clone()));
108+
assert_eq!(
109+
blocks,
110+
Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
111+
);
102112

103113
let blocks = streamer.poll_next().await.unwrap();
104-
assert_eq!(blocks, Some(expected_blocks[1].clone()));
114+
assert_eq!(
115+
blocks,
116+
Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
117+
);
105118

106119
let blocks = streamer.poll_next().await.unwrap();
107-
assert_eq!(blocks, Some(expected_blocks[2].clone()));
120+
assert_eq!(
121+
blocks,
122+
Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
123+
);
108124

109125
let blocks = streamer.poll_next().await.unwrap();
110126
assert_eq!(blocks, None);

mithril-common/src/cardano_block_scanner/immutable_block_streamer.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use pallas_hardano::storage::immutable::chunk::{read_blocks, Reader};
77
use pallas_traverse::MultiEraBlock;
88
use slog::{debug, error, Logger};
99

10+
use crate::cardano_block_scanner::ChainScannedBlocks;
1011
use crate::cardano_block_scanner::{BlockStreamer, ScannedBlock};
1112
use crate::digesters::ImmutableFile;
1213
use crate::StdResult;
@@ -20,7 +21,7 @@ pub struct ImmutableBlockStreamer {
2021

2122
#[async_trait]
2223
impl BlockStreamer for ImmutableBlockStreamer {
23-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>> {
24+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
2425
match &self.remaining_immutable_files.pop_front() {
2526
Some(immutable_file) => {
2627
debug!(
@@ -37,7 +38,7 @@ impl BlockStreamer for ImmutableBlockStreamer {
3738
immutable_file.path.display()
3839
)
3940
})?;
40-
Ok(Some(blocks))
41+
Ok(Some(ChainScannedBlocks::RollForwards(blocks)))
4142
}
4243
None => Ok(None),
4344
}
@@ -129,6 +130,15 @@ mod tests {
129130

130131
#[tokio::test]
131132
async fn test_parse_expected_number_of_transactions() {
133+
fn sum_of_transactions_len(o: Option<ChainScannedBlocks>) -> Option<usize> {
134+
match o {
135+
Some(ChainScannedBlocks::RollForwards(b)) => {
136+
Some(b.into_iter().map(|b| b.transactions_len()).sum())
137+
}
138+
_ => None,
139+
}
140+
}
141+
132142
// We know the number of transactions in those prebuilt immutables
133143
let immutable_files = [
134144
("00000.chunk", 0usize),
@@ -148,19 +158,19 @@ mod tests {
148158

149159
let immutable_blocks = streamer.poll_next().await.unwrap();
150160
assert_eq!(
151-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
161+
sum_of_transactions_len(immutable_blocks),
152162
Some(immutable_files[0].1)
153163
);
154164

155165
let immutable_blocks = streamer.poll_next().await.unwrap();
156166
assert_eq!(
157-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
167+
sum_of_transactions_len(immutable_blocks),
158168
Some(immutable_files[1].1)
159169
);
160170

161171
let immutable_blocks = streamer.poll_next().await.unwrap();
162172
assert_eq!(
163-
immutable_blocks.map(|b| b.into_iter().map(|b| b.transactions_len()).sum()),
173+
sum_of_transactions_len(immutable_blocks),
164174
Some(immutable_files[2].1)
165175
);
166176

mithril-common/src/cardano_block_scanner/interface.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use anyhow::anyhow;
12
use std::path::Path;
23

34
use async_trait::async_trait;
45

56
use crate::cardano_block_scanner::ScannedBlock;
6-
use crate::entities::ImmutableFileNumber;
7+
use crate::entities::{ChainPoint, ImmutableFileNumber};
78
use crate::StdResult;
89

910
/// A scanner that can read cardano transactions in a cardano database
@@ -55,18 +56,34 @@ pub trait BlockScanner: Sync + Send {
5556
) -> StdResult<Box<dyn BlockStreamer>>;
5657
}
5758

59+
/// [ChainScannedBlocks] allows to scan new blocks and handle rollbacks
60+
#[derive(Debug, PartialEq)]
61+
pub enum ChainScannedBlocks {
62+
/// Roll forward on the chain to the next list of [ScannedBlock]
63+
RollForwards(Vec<ScannedBlock>),
64+
/// Roll backward on the chain to the previous [ChainPoint]
65+
RollBackward(ChainPoint),
66+
}
67+
5868
/// Trait that define how blocks are streamed from a Cardano database
5969
#[async_trait]
6070
pub trait BlockStreamer: Sync + Send {
6171
/// Stream the next available blocks
62-
async fn poll_next(&mut self) -> StdResult<Option<Vec<ScannedBlock>>>;
72+
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>>;
6373

6474
/// Stream all the available blocks, may be very memory intensive
6575
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>> {
66-
let mut blocks = Vec::new();
67-
while let Some(mut next_blocks) = self.poll_next().await? {
68-
blocks.append(&mut next_blocks);
76+
let mut all_blocks = Vec::new();
77+
while let Some(next_blocks) = self.poll_next().await? {
78+
match next_blocks {
79+
ChainScannedBlocks::RollForwards(mut forward_blocks) => {
80+
all_blocks.append(&mut forward_blocks);
81+
}
82+
ChainScannedBlocks::RollBackward(_) => {
83+
return Err(anyhow!("poll_all: RollBackward not supported"));
84+
}
85+
};
6986
}
70-
Ok(blocks)
87+
Ok(all_blocks)
7188
}
7289
}

0 commit comments

Comments
 (0)