Skip to content

Commit 55d7d32

Browse files
committed
feat(aggregator): implement persistence of synchronised certificates
1 parent c45076c commit 55d7d32

File tree

5 files changed

+274
-63
lines changed

5 files changed

+274
-63
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! Shared `WhereCondition` across certificates queries
2+
3+
use sqlite::Value;
4+
use std::iter::repeat_n;
5+
6+
use mithril_persistence::sqlite::WhereCondition;
7+
8+
use crate::database::record::CertificateRecord;
9+
10+
pub(super) fn insert_many(certificates_records: Vec<CertificateRecord>) -> WhereCondition {
11+
let columns = "(\
12+
certificate_id, \
13+
parent_certificate_id, \
14+
message, \
15+
signature, \
16+
aggregate_verification_key, \
17+
epoch, \
18+
network, \
19+
signed_entity_type_id, \
20+
signed_entity_beacon, \
21+
protocol_version, \
22+
protocol_parameters, \
23+
protocol_message, \
24+
signers, \
25+
initiated_at, \
26+
sealed_at)";
27+
let values_columns: Vec<&str> = repeat_n(
28+
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
29+
certificates_records.len(),
30+
)
31+
.collect();
32+
33+
let values: Vec<Value> = certificates_records
34+
.into_iter()
35+
.flat_map(|certificate_record| {
36+
vec![
37+
Value::String(certificate_record.certificate_id),
38+
match certificate_record.parent_certificate_id {
39+
Some(parent_certificate_id) => Value::String(parent_certificate_id),
40+
None => Value::Null,
41+
},
42+
Value::String(certificate_record.message),
43+
Value::String(certificate_record.signature),
44+
Value::String(certificate_record.aggregate_verification_key),
45+
Value::Integer(certificate_record.epoch.try_into().unwrap()),
46+
Value::String(certificate_record.network),
47+
Value::Integer(certificate_record.signed_entity_type.index() as i64),
48+
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
49+
Value::String(certificate_record.protocol_version),
50+
Value::String(
51+
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
52+
),
53+
Value::String(serde_json::to_string(&certificate_record.protocol_message).unwrap()),
54+
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
55+
Value::String(certificate_record.initiated_at.to_rfc3339()),
56+
Value::String(certificate_record.sealed_at.to_rfc3339()),
57+
]
58+
})
59+
.collect();
60+
61+
WhereCondition::new(
62+
format!("{columns} values {}", values_columns.join(", ")).as_str(),
63+
values,
64+
)
65+
}

mithril-aggregator/src/database/query/certificate/insert_certificate.rs

Lines changed: 5 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
use std::iter::repeat_n;
2-
3-
use sqlite::Value;
4-
51
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
62

73
use crate::database::record::CertificateRecord;
84

5+
use super::conditions;
6+
97
/// Query to insert [CertificateRecord] in the sqlite database
108
pub struct InsertCertificateRecordQuery {
119
condition: WhereCondition,
@@ -17,64 +15,9 @@ impl InsertCertificateRecordQuery {
1715
}
1816

1917
pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
20-
let columns = "(\
21-
certificate_id, \
22-
parent_certificate_id, \
23-
message, \
24-
signature, \
25-
aggregate_verification_key, \
26-
epoch, \
27-
network, \
28-
signed_entity_type_id, \
29-
signed_entity_beacon, \
30-
protocol_version, \
31-
protocol_parameters, \
32-
protocol_message, \
33-
signers, \
34-
initiated_at, \
35-
sealed_at)";
36-
let values_columns: Vec<&str> = repeat_n(
37-
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
38-
certificates_records.len(),
39-
)
40-
.collect();
41-
42-
let values: Vec<Value> = certificates_records
43-
.into_iter()
44-
.flat_map(|certificate_record| {
45-
vec![
46-
Value::String(certificate_record.certificate_id),
47-
match certificate_record.parent_certificate_id {
48-
Some(parent_certificate_id) => Value::String(parent_certificate_id),
49-
None => Value::Null,
50-
},
51-
Value::String(certificate_record.message),
52-
Value::String(certificate_record.signature),
53-
Value::String(certificate_record.aggregate_verification_key),
54-
Value::Integer(certificate_record.epoch.try_into().unwrap()),
55-
Value::String(certificate_record.network),
56-
Value::Integer(certificate_record.signed_entity_type.index() as i64),
57-
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
58-
Value::String(certificate_record.protocol_version),
59-
Value::String(
60-
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
61-
),
62-
Value::String(
63-
serde_json::to_string(&certificate_record.protocol_message).unwrap(),
64-
),
65-
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
66-
Value::String(certificate_record.initiated_at.to_rfc3339()),
67-
Value::String(certificate_record.sealed_at.to_rfc3339()),
68-
]
69-
})
70-
.collect();
71-
72-
let condition = WhereCondition::new(
73-
format!("{columns} values {}", values_columns.join(", ")).as_str(),
74-
values,
75-
);
76-
77-
Self { condition }
18+
Self {
19+
condition: conditions::insert_many(certificates_records),
20+
}
7821
}
7922
}
8023

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
2+
3+
use crate::database::record::CertificateRecord;
4+
5+
use super::conditions;
6+
7+
/// Query to insert or replace [CertificateRecord] in the sqlite database
8+
pub struct InsertOrReplaceCertificateRecordQuery {
9+
condition: WhereCondition,
10+
}
11+
12+
impl InsertOrReplaceCertificateRecordQuery {
13+
pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
14+
Self {
15+
condition: conditions::insert_many(certificates_records),
16+
}
17+
}
18+
}
19+
20+
impl Query for InsertOrReplaceCertificateRecordQuery {
21+
type Entity = CertificateRecord;
22+
23+
fn filters(&self) -> WhereCondition {
24+
self.condition.clone()
25+
}
26+
27+
fn get_definition(&self, condition: &str) -> String {
28+
// it is important to alias the fields with the same name as the table
29+
// since the table cannot be aliased in a RETURNING statement in SQLite.
30+
let projection = Self::Entity::get_projection()
31+
.expand(SourceAlias::new(&[("{:certificate:}", "certificate")]));
32+
33+
format!("insert or replace into certificate {condition} returning {projection}")
34+
}
35+
}
36+
37+
#[cfg(test)]
38+
mod tests {
39+
use std::collections::HashMap;
40+
41+
use mithril_common::crypto_helper::tests_setup::setup_certificate_chain;
42+
use mithril_common::entities::Epoch;
43+
use mithril_common::test_utils::fake_data;
44+
use mithril_persistence::sqlite::ConnectionExtensions;
45+
46+
use crate::database::query::{GetCertificateRecordQuery, InsertCertificateRecordQuery};
47+
use crate::database::test_helper::main_db_connection;
48+
49+
use super::*;
50+
51+
#[test]
52+
fn test_insert_many_certificates_records_in_empty_db() {
53+
let certificates = setup_certificate_chain(5, 2);
54+
let certificates_records: Vec<CertificateRecord> = certificates.into();
55+
56+
let connection = main_db_connection().unwrap();
57+
58+
let certificates_records_saved: Vec<CertificateRecord> = connection
59+
.fetch_collect(InsertOrReplaceCertificateRecordQuery::many(
60+
certificates_records.clone(),
61+
))
62+
.expect("saving many records should not fail");
63+
64+
assert_eq!(certificates_records, certificates_records_saved);
65+
66+
// Check insertion order
67+
let all_records: Vec<CertificateRecord> =
68+
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
69+
assert_eq!(
70+
certificates_records.into_iter().rev().collect::<Vec<_>>(),
71+
all_records
72+
);
73+
}
74+
75+
#[test]
76+
fn test_replace_one_certificate_record() {
77+
let certificate_record = CertificateRecord {
78+
epoch: Epoch(12),
79+
..fake_data::certificate("hash").into()
80+
};
81+
82+
let connection = main_db_connection().unwrap();
83+
let certificate_record_saved = connection
84+
.fetch_first(InsertCertificateRecordQuery::one(
85+
certificate_record.clone(),
86+
))
87+
.unwrap();
88+
assert_eq!(Some(Epoch(12)), certificate_record_saved.map(|r| r.epoch));
89+
90+
let modified_certificate_record = CertificateRecord {
91+
epoch: Epoch(23),
92+
..certificate_record
93+
};
94+
let certificate_record_saved = connection
95+
.fetch_first(InsertOrReplaceCertificateRecordQuery::many(vec![
96+
modified_certificate_record.clone(),
97+
]))
98+
.unwrap();
99+
assert_eq!(Some(Epoch(23)), certificate_record_saved.map(|r| r.epoch));
100+
101+
let all_records_cursor = connection.fetch(GetCertificateRecordQuery::all()).unwrap();
102+
assert_eq!(1, all_records_cursor.count());
103+
}
104+
105+
#[test]
106+
fn test_insert_and_replace_many_certificate_record() {
107+
let tested_records: HashMap<_, CertificateRecord> = HashMap::from([
108+
(
109+
"cert1-genesis",
110+
fake_data::genesis_certificate("genesis").into(),
111+
),
112+
("cert2", fake_data::certificate("cert2").into()),
113+
(
114+
"cert2-modified",
115+
CertificateRecord {
116+
epoch: Epoch(14),
117+
..fake_data::certificate("cert2").into()
118+
},
119+
),
120+
("cert3", fake_data::certificate("cert3").into()),
121+
("cert4", fake_data::certificate("cert4").into()),
122+
(
123+
"cert4-modified",
124+
CertificateRecord {
125+
epoch: Epoch(32),
126+
..fake_data::certificate("cert4").into()
127+
},
128+
),
129+
("cert5", fake_data::certificate("cert5").into()),
130+
]);
131+
let connection = main_db_connection().unwrap();
132+
133+
let cursor = connection
134+
.fetch(InsertCertificateRecordQuery::many(vec![
135+
tested_records["cert1-genesis"].clone(),
136+
tested_records["cert2"].clone(),
137+
tested_records["cert3"].clone(),
138+
tested_records["cert4"].clone(),
139+
tested_records["cert5"].clone(),
140+
]))
141+
.unwrap();
142+
assert_eq!(5, cursor.count());
143+
144+
let cursor = connection
145+
.fetch(InsertOrReplaceCertificateRecordQuery::many(vec![
146+
tested_records["cert1-genesis"].clone(),
147+
tested_records["cert2-modified"].clone(),
148+
tested_records["cert3"].clone(),
149+
tested_records["cert4-modified"].clone(),
150+
]))
151+
.unwrap();
152+
assert_eq!(4, cursor.count());
153+
154+
let all_records: Vec<CertificateRecord> =
155+
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
156+
assert_eq!(5, all_records.len());
157+
assert_eq!(
158+
all_records,
159+
vec![
160+
tested_records["cert4-modified"].clone(),
161+
tested_records["cert3"].clone(),
162+
tested_records["cert2-modified"].clone(),
163+
tested_records["cert1-genesis"].clone(),
164+
// Since the cert5 was not in the Insert/replace query, it now has a lower rowid and shows first
165+
tested_records["cert5"].clone(),
166+
]
167+
);
168+
}
169+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
mod conditions;
12
mod delete_certificate;
23
mod get_certificate;
34
mod get_master_certificate;
45
mod insert_certificate;
6+
mod insert_or_replace_certificate;
57

68
pub use delete_certificate::*;
79
pub use get_certificate::*;
810
pub use get_master_certificate::*;
911
pub use insert_certificate::*;
12+
pub use insert_or_replace_certificate::*;

mithril-aggregator/src/database/repository/certificate_repository.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ use mithril_persistence::sqlite::ConnectionExtensions;
1111

1212
use crate::database::query::{
1313
DeleteCertificateQuery, GetCertificateRecordQuery, InsertCertificateRecordQuery,
14-
MasterCertificateQuery,
14+
InsertOrReplaceCertificateRecordQuery, MasterCertificateQuery,
1515
};
1616
use crate::database::record::CertificateRecord;
17+
use crate::services::SynchronizedCertificateStorer;
1718

1819
/// Database frontend API for Certificate queries.
1920
pub struct CertificateRepository {
@@ -105,6 +106,24 @@ impl CertificateRepository {
105106
Ok(new_certificates.map(|cert| cert.into()).collect())
106107
}
107108

109+
/// Create, or replace if they already exist, many certificates at once in the database.
110+
pub async fn create_or_replace_many_certificates(
111+
&self,
112+
certificates: Vec<Certificate>,
113+
) -> StdResult<Vec<Certificate>> {
114+
if certificates.is_empty() {
115+
return Ok(vec![]);
116+
}
117+
118+
let records: Vec<CertificateRecord> =
119+
certificates.into_iter().map(|cert| cert.into()).collect();
120+
let new_certificates = self
121+
.connection
122+
.fetch(InsertOrReplaceCertificateRecordQuery::many(records))?;
123+
124+
Ok(new_certificates.map(|cert| cert.into()).collect())
125+
}
126+
108127
/// Delete all the given certificates from the database
109128
pub async fn delete_certificates(&self, certificates: &[&Certificate]) -> StdResult<()> {
110129
let ids = certificates.iter().map(|c| c.hash.as_str()).collect::<Vec<_>>();
@@ -131,6 +150,18 @@ impl CertificateRetriever for CertificateRepository {
131150
}
132151
}
133152

153+
#[async_trait]
154+
impl SynchronizedCertificateStorer for CertificateRepository {
155+
async fn insert_or_replace_many(&self, certificates: Vec<Certificate>) -> StdResult<()> {
156+
self.create_or_replace_many_certificates(certificates).await?;
157+
Ok(())
158+
}
159+
160+
async fn get_latest_genesis(&self) -> StdResult<Option<Certificate>> {
161+
self.get_latest_genesis_certificate().await
162+
}
163+
}
164+
134165
#[cfg(test)]
135166
mod tests {
136167
use mithril_common::crypto_helper::tests_setup::setup_certificate_chain;

0 commit comments

Comments
 (0)