Skip to content

Commit 799f046

Browse files
committed
Synchronise importer changes from signer to aggregator
1 parent b6e0cb8 commit 799f046

16 files changed

+1035
-146
lines changed

mithril-aggregator/src/database/cardano_transaction_migration.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,19 @@ vacuum;
5757
4,
5858
r#"
5959
create index block_number_index on cardano_tx(block_number);
60+
"#,
61+
),
62+
// Migration 5
63+
// Add `block_range_root` table
64+
SqlMigration::new(
65+
5,
66+
r#"
67+
create table block_range_root (
68+
start integer not null,
69+
end integer not null,
70+
merkle_root text not null,
71+
primary key (start, end)
72+
);
6073
"#,
6174
),
6275
]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use mithril_persistence::sqlite::{Provider, SourceAlias, SqLiteEntity, SqliteConnection};
2+
3+
use crate::database::record::BlockRangeRootRecord;
4+
5+
/// Simple queries to retrieve [BlockRangeRootRecord] from the sqlite database.
6+
pub struct GetBlockRangeRootProvider<'client> {
7+
connection: &'client SqliteConnection,
8+
}
9+
10+
impl<'client> GetBlockRangeRootProvider<'client> {
11+
#[cfg(test)]
12+
/// Create a new instance
13+
pub fn new(connection: &'client SqliteConnection) -> Self {
14+
Self { connection }
15+
}
16+
}
17+
18+
#[cfg(test)]
19+
impl mithril_persistence::sqlite::GetAllCondition for GetBlockRangeRootProvider<'_> {}
20+
21+
impl<'client> Provider<'client> for GetBlockRangeRootProvider<'client> {
22+
type Entity = BlockRangeRootRecord;
23+
24+
fn get_connection(&'client self) -> &'client SqliteConnection {
25+
self.connection
26+
}
27+
28+
fn get_definition(&self, condition: &str) -> String {
29+
let aliases = SourceAlias::new(&[("{:block_range_root:}", "block_range_root")]);
30+
let projection = Self::Entity::get_projection().expand(aliases);
31+
32+
format!("select {projection} from block_range_root where {condition} order by rowid")
33+
}
34+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use mithril_persistence::sqlite::{
2+
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
3+
};
4+
5+
use crate::database::record::IntervalWithoutBlockRangeRootRecord;
6+
7+
/// Query that return the interval of block numbers that does not have a block range root.
8+
pub struct GetIntervalWithoutBlockRangeRootProvider<'client> {
9+
connection: &'client SqliteConnection,
10+
}
11+
12+
impl<'client> GetIntervalWithoutBlockRangeRootProvider<'client> {
13+
/// Create a new instance
14+
pub fn new(connection: &'client SqliteConnection) -> Self {
15+
Self { connection }
16+
}
17+
18+
pub fn get_interval_without_block_range_condition(&self) -> WhereCondition {
19+
WhereCondition::default()
20+
}
21+
}
22+
23+
impl<'client> Provider<'client> for GetIntervalWithoutBlockRangeRootProvider<'client> {
24+
type Entity = IntervalWithoutBlockRangeRootRecord;
25+
26+
fn get_connection(&'client self) -> &'client SqliteConnection {
27+
self.connection
28+
}
29+
30+
fn get_definition(&self, condition: &str) -> String {
31+
let aliases = SourceAlias::new(&[
32+
("{:interval_start:}", "interval_start"),
33+
("{:interval_end:}", "interval_end"),
34+
]);
35+
let projection = Self::Entity::get_projection().expand(aliases);
36+
37+
format!(
38+
"select {projection} from \
39+
(select max(end) as start from block_range_root where {condition}) as interval_start, \
40+
(select max(block_number) as end from cardano_tx where {condition}) as interval_end"
41+
)
42+
}
43+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use std::iter::repeat;
2+
3+
use sqlite::Value;
4+
5+
use mithril_common::StdResult;
6+
use mithril_persistence::sqlite::{
7+
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
8+
};
9+
10+
use crate::database::record::BlockRangeRootRecord;
11+
12+
/// Query to insert [BlockRangeRootRecord] in the sqlite database
13+
pub struct InsertBlockRangeRootProvider<'client> {
14+
connection: &'client SqliteConnection,
15+
}
16+
17+
impl<'client> InsertBlockRangeRootProvider<'client> {
18+
/// Create a new instance
19+
pub fn new(connection: &'client SqliteConnection) -> Self {
20+
Self { connection }
21+
}
22+
23+
/// Condition to insert multiples records.
24+
pub fn get_insert_many_condition(
25+
&self,
26+
block_range_records: Vec<BlockRangeRootRecord>,
27+
) -> StdResult<WhereCondition> {
28+
let columns = "(start, end, merkle_root)";
29+
let values_columns: Vec<&str> = repeat("(?*, ?*, ?*)")
30+
.take(block_range_records.len())
31+
.collect();
32+
33+
let values: StdResult<Vec<Value>> =
34+
block_range_records
35+
.into_iter()
36+
.try_fold(vec![], |mut vec, record| {
37+
vec.append(&mut vec![
38+
Value::Integer(record.range.start.try_into()?),
39+
Value::Integer(record.range.end.try_into()?),
40+
Value::String(record.merkle_root.to_hex()),
41+
]);
42+
Ok(vec)
43+
});
44+
45+
Ok(WhereCondition::new(
46+
format!("{columns} values {}", values_columns.join(", ")).as_str(),
47+
values?,
48+
))
49+
}
50+
}
51+
52+
impl<'client> Provider<'client> for InsertBlockRangeRootProvider<'client> {
53+
type Entity = BlockRangeRootRecord;
54+
55+
fn get_connection(&'client self) -> &'client SqliteConnection {
56+
self.connection
57+
}
58+
59+
fn get_definition(&self, condition: &str) -> String {
60+
let aliases = SourceAlias::new(&[("{:block_range_root:}", "block_range_root")]);
61+
let projection = Self::Entity::get_projection().expand(aliases);
62+
63+
format!("insert or ignore into block_range_root {condition} returning {projection}")
64+
}
65+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mod get_block_range_root;
2+
mod get_interval_without_block_range_provider;
3+
mod insert_block_range;
4+
5+
#[cfg(test)]
6+
pub use get_block_range_root::*;
7+
pub use get_interval_without_block_range_provider::*;
8+
pub use insert_block_range::*;

mithril-aggregator/src/database/provider/cardano_transaction/get_cardano_transaction.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::ops::Range;
2+
13
use sqlite::Value;
24

3-
use mithril_common::entities::{ImmutableFileNumber, TransactionHash};
5+
use mithril_common::entities::{BlockNumber, ImmutableFileNumber, TransactionHash};
46
use mithril_persistence::sqlite::{
57
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
68
};
@@ -47,6 +49,20 @@ impl<'client> GetCardanoTransactionProvider<'client> {
4749
vec![Value::Integer(beacon as i64)],
4850
)
4951
}
52+
53+
pub fn get_transaction_between_blocks_condition(
54+
&self,
55+
range: Range<BlockNumber>,
56+
) -> WhereCondition {
57+
WhereCondition::new(
58+
"block_number >= ?*",
59+
vec![Value::Integer(range.start as i64)],
60+
)
61+
.and_where(WhereCondition::new(
62+
"block_number < ?*",
63+
vec![Value::Integer(range.end as i64)],
64+
))
65+
}
5066
}
5167

5268
impl<'client> Provider<'client> for GetCardanoTransactionProvider<'client> {

mithril-aggregator/src/database/provider/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//! Aggregator related database providers
2+
mod block_range_root;
23
mod cardano_transaction;
34
mod certificate;
45
mod epoch_setting;
@@ -9,6 +10,7 @@ mod signer_registration;
910
mod single_signature;
1011
mod stake_pool;
1112

13+
pub use block_range_root::*;
1214
pub use cardano_transaction::*;
1315
pub use certificate::*;
1416
pub use epoch_setting::*;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use sqlite::Row;
2+
3+
use mithril_common::crypto_helper::MKTreeNode;
4+
use mithril_common::entities::BlockRange;
5+
use mithril_persistence::sqlite::{HydrationError, Projection, SqLiteEntity};
6+
7+
use crate::database::record::hydrator::try_to_u64;
8+
9+
/// Block range root record is the representation of block range with its merkle root precomputed.
10+
#[derive(Debug, PartialEq, Clone)]
11+
pub struct BlockRangeRootRecord {
12+
/// Range of block numbers covered
13+
pub range: BlockRange,
14+
/// Merkle root of the block range, computed from the list of all transactions that are
15+
/// included in the range
16+
pub merkle_root: MKTreeNode,
17+
}
18+
19+
impl From<(BlockRange, MKTreeNode)> for BlockRangeRootRecord {
20+
fn from(value: (BlockRange, MKTreeNode)) -> Self {
21+
Self {
22+
range: value.0,
23+
merkle_root: value.1,
24+
}
25+
}
26+
}
27+
28+
impl From<BlockRangeRootRecord> for (BlockRange, MKTreeNode) {
29+
fn from(value: BlockRangeRootRecord) -> Self {
30+
(value.range, value.merkle_root)
31+
}
32+
}
33+
34+
impl SqLiteEntity for BlockRangeRootRecord {
35+
fn hydrate(row: Row) -> Result<Self, HydrationError>
36+
where
37+
Self: Sized,
38+
{
39+
let start = try_to_u64("block_range.start", row.read::<i64, _>(0))?;
40+
let _end = try_to_u64("block_range.end", row.read::<i64, _>(1))?;
41+
let merkle_root = row.read::<&str, _>(2);
42+
43+
Ok(Self {
44+
range: BlockRange::from_block_number(start),
45+
merkle_root: MKTreeNode::from_hex(merkle_root)
46+
.map_err(|e| HydrationError::InvalidData(
47+
format!(
48+
"Field block_range.merkle_root (value={merkle_root}) is incompatible with hex representation. Error = {e}")
49+
)
50+
)?,
51+
})
52+
}
53+
54+
fn get_projection() -> Projection {
55+
Projection::from(&[
56+
("start", "{:block_range_root:}.start", "int"),
57+
("end", "{:block_range_root:}.end", "int"),
58+
("merkle_root", "{:block_range_root:}.merkle_root", "text"),
59+
])
60+
}
61+
}

mithril-aggregator/src/database/record/certificate.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use mithril_persistence::{
1717
sqlite::{HydrationError, Projection, SqLiteEntity},
1818
};
1919

20+
use crate::database::record::hydrator;
21+
2022
era_deprecate!("Remove immutable_file_number");
2123
/// Certificate record is the representation of a stored certificate.
2224
#[derive(Debug, PartialEq, Clone)]
@@ -312,7 +314,7 @@ impl SqLiteEntity for CertificateRecord {
312314
let network = row.read::<&str, _>(6).to_string();
313315
let immutable_file_number = row.read::<i64, _>(7);
314316
let signed_entity_type_id = row.read::<i64, _>(8);
315-
let signed_entity_beacon_string = super::read_signed_entity_beacon_column(&row, 9);
317+
let signed_entity_beacon_string = hydrator::read_signed_entity_beacon_column(&row, 9);
316318
let protocol_version = row.read::<&str, _>(10).to_string();
317319
let protocol_parameters_string = row.read::<&str, _>(11);
318320
let protocol_message_string = row.read::<&str, _>(12);
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use sqlite::Row;
2+
3+
use mithril_common::entities::BlockNumber;
4+
use mithril_persistence::sqlite::{HydrationError, Projection, SqLiteEntity};
5+
6+
use crate::database::record::hydrator::try_to_u64;
7+
8+
/// Interval of block numbers without block ranges root.
9+
pub struct IntervalWithoutBlockRangeRootRecord {
10+
/// Start of the interval
11+
pub start: Option<BlockNumber>,
12+
/// End of the interval
13+
pub end: Option<BlockNumber>,
14+
}
15+
16+
impl SqLiteEntity for IntervalWithoutBlockRangeRootRecord {
17+
fn hydrate(row: Row) -> Result<Self, HydrationError>
18+
where
19+
Self: Sized,
20+
{
21+
let start = row
22+
.read::<Option<i64>, _>(0)
23+
.map(|v| try_to_u64("interval_start.start", v))
24+
.transpose()?;
25+
let end = row
26+
.read::<Option<i64>, _>(1)
27+
.map(|v| try_to_u64("interval_end.end", v))
28+
.transpose()?;
29+
30+
Ok(Self { start, end })
31+
}
32+
33+
fn get_projection() -> Projection {
34+
Projection::from(&[
35+
("start", "{:interval_start:}.start", "int"),
36+
("end", "{:interval_end:}.end", "int"),
37+
])
38+
}
39+
}

0 commit comments

Comments
 (0)