Skip to content

Commit 8931cb8

Browse files
committed
feat(aggregator): implement persistence of openmessage created post sync
1 parent 55d7d32 commit 8931cb8

File tree

8 files changed

+279
-21
lines changed

8 files changed

+279
-21
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//! Shared `WhereCondition` across open message queries
2+
3+
use sqlite::Value;
4+
5+
use mithril_common::StdResult;
6+
use mithril_persistence::sqlite::WhereCondition;
7+
8+
use crate::database::record::OpenMessageRecord;
9+
10+
pub(crate) fn insert_one(record: OpenMessageRecord) -> StdResult<WhereCondition> {
11+
let expression = "(\
12+
open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, \
13+
expires_at, created_at, is_certified, is_expired\
14+
) values (?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)";
15+
let beacon_str = record.signed_entity_type.get_json_beacon()?;
16+
let parameters = vec![
17+
Value::String(record.open_message_id.to_string()),
18+
Value::Integer(record.epoch.try_into()?),
19+
Value::String(beacon_str),
20+
Value::Integer(record.signed_entity_type.index() as i64),
21+
Value::String(serde_json::to_string(&record.protocol_message)?),
22+
record
23+
.expires_at
24+
.map(|t| Value::String(t.to_rfc3339()))
25+
.unwrap_or(Value::Null),
26+
Value::String(record.created_at.to_rfc3339()),
27+
Value::Integer(record.is_certified as i64),
28+
Value::Integer(record.is_expired as i64),
29+
];
30+
31+
Ok(WhereCondition::new(expression, parameters))
32+
}

mithril-aggregator/src/database/query/open_message/get_open_message.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ pub struct GetOpenMessageQuery {
1515
}
1616

1717
impl GetOpenMessageQuery {
18+
#[cfg(test)]
19+
pub fn all() -> Self {
20+
Self {
21+
condition: WhereCondition::default(),
22+
}
23+
}
24+
1825
pub fn by_epoch_and_signed_entity_type(
1926
epoch: Epoch,
2027
signed_entity_type: &SignedEntityType,
Lines changed: 79 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use chrono::Utc;
2-
use sqlite::Value;
3-
use uuid::Uuid;
42

53
use mithril_common::StdResult;
64
use mithril_common::entities::{Epoch, ProtocolMessage, SignedEntityType};
75
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
86

7+
use crate::database::query::open_message::conditions;
98
use crate::database::record::OpenMessageRecord;
109

1110
/// Query to insert [OpenMessageRecord] in the sqlite database
@@ -19,23 +18,20 @@ impl InsertOpenMessageQuery {
1918
signed_entity_type: &SignedEntityType,
2019
protocol_message: &ProtocolMessage,
2120
) -> StdResult<Self> {
22-
let expression = "(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, expires_at, created_at) values (?*, ?*, ?*, ?*, ?*, ?*, ?*)";
23-
let beacon_str = signed_entity_type.get_json_beacon()?;
24-
let parameters = vec![
25-
Value::String(Uuid::new_v4().to_string()),
26-
Value::Integer(epoch.try_into()?),
27-
Value::String(beacon_str),
28-
Value::Integer(signed_entity_type.index() as i64),
29-
Value::String(serde_json::to_string(protocol_message)?),
30-
signed_entity_type
31-
.get_open_message_timeout()
32-
.map(|t| Value::String((Utc::now() + t).to_rfc3339()))
33-
.unwrap_or(Value::Null),
34-
Value::String(Utc::now().to_rfc3339()),
35-
];
21+
let now = Utc::now();
22+
let record = OpenMessageRecord {
23+
open_message_id: OpenMessageRecord::new_id(),
24+
epoch,
25+
signed_entity_type: signed_entity_type.clone(),
26+
protocol_message: protocol_message.clone(),
27+
is_certified: false,
28+
is_expired: false,
29+
created_at: now,
30+
expires_at: signed_entity_type.get_open_message_timeout().map(|t| now + t),
31+
};
3632

3733
Ok(Self {
38-
condition: WhereCondition::new(expression, parameters),
34+
condition: conditions::insert_one(record)?,
3935
})
4036
}
4137
}
@@ -54,3 +50,69 @@ impl Query for InsertOpenMessageQuery {
5450
format!("insert into open_message {condition} returning {projection}")
5551
}
5652
}
53+
54+
#[cfg(test)]
55+
mod tests {
56+
use mithril_common::entities::ProtocolMessagePartKey;
57+
use mithril_persistence::sqlite::ConnectionExtensions;
58+
59+
use crate::database::query::GetOpenMessageQuery;
60+
use crate::database::test_helper::main_db_connection;
61+
62+
use super::*;
63+
64+
#[test]
65+
fn test_insert_one() {
66+
let connection = main_db_connection().unwrap();
67+
let epoch = Epoch(5);
68+
let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(10));
69+
let mut protocol_message = ProtocolMessage::new();
70+
protocol_message.set_message_part(
71+
ProtocolMessagePartKey::CardanoStakeDistributionEpoch,
72+
"value".to_string(),
73+
);
74+
75+
connection
76+
.fetch_first(
77+
InsertOpenMessageQuery::one(epoch, &signed_entity_type, &protocol_message).unwrap(),
78+
)
79+
.unwrap();
80+
let records: Vec<OpenMessageRecord> =
81+
connection.fetch_collect(GetOpenMessageQuery::all()).unwrap();
82+
83+
assert_eq!(1, records.len());
84+
assert_eq!(
85+
OpenMessageRecord {
86+
open_message_id: records[0].open_message_id,
87+
epoch,
88+
signed_entity_type,
89+
protocol_message,
90+
is_certified: false,
91+
is_expired: false,
92+
created_at: records[0].created_at,
93+
expires_at: records[0].expires_at,
94+
},
95+
records[0]
96+
);
97+
}
98+
99+
#[should_panic]
100+
#[test]
101+
fn test_insert_two_entries_with_same_signed_entity_violate_unique_constraint() {
102+
let connection = main_db_connection().unwrap();
103+
let epoch = Epoch(5);
104+
let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(10));
105+
106+
connection
107+
.fetch_first(
108+
InsertOpenMessageQuery::one(epoch, &signed_entity_type, &ProtocolMessage::new())
109+
.unwrap(),
110+
)
111+
.unwrap();
112+
113+
let _ = connection.fetch_first(
114+
InsertOpenMessageQuery::one(epoch + 10, &signed_entity_type, &ProtocolMessage::new())
115+
.unwrap(),
116+
);
117+
}
118+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use mithril_common::StdResult;
2+
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
3+
4+
use crate::database::query::open_message::conditions;
5+
use crate::database::record::OpenMessageRecord;
6+
7+
/// Query to insert [OpenMessageRecord] in the sqlite database
8+
pub struct InsertOrReplaceOpenMessageQuery {
9+
condition: WhereCondition,
10+
}
11+
12+
impl InsertOrReplaceOpenMessageQuery {
13+
pub fn one(record: OpenMessageRecord) -> StdResult<Self> {
14+
Ok(Self {
15+
condition: conditions::insert_one(record)?,
16+
})
17+
}
18+
}
19+
20+
impl Query for InsertOrReplaceOpenMessageQuery {
21+
type Entity = OpenMessageRecord;
22+
23+
fn filters(&self) -> WhereCondition {
24+
self.condition.clone()
25+
}
26+
27+
fn get_definition(&self, condition: &str) -> String {
28+
let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]);
29+
let projection = Self::Entity::get_projection().expand(aliases);
30+
31+
format!("insert or replace into open_message {condition} returning {projection}")
32+
}
33+
}
34+
35+
#[cfg(test)]
36+
mod tests {
37+
use mithril_persistence::sqlite::ConnectionExtensions;
38+
39+
use crate::database::query::GetOpenMessageQuery;
40+
use crate::database::test_helper::main_db_connection;
41+
42+
use super::*;
43+
44+
#[test]
45+
fn test_insert_one() {
46+
let connection = main_db_connection().unwrap();
47+
let record = OpenMessageRecord::dummy();
48+
49+
connection
50+
.fetch_first(InsertOrReplaceOpenMessageQuery::one(record.clone()).unwrap())
51+
.unwrap();
52+
let records: Vec<OpenMessageRecord> = connection
53+
.fetch_collect(
54+
GetOpenMessageQuery::by_epoch_and_signed_entity_type(
55+
record.epoch,
56+
&record.signed_entity_type,
57+
)
58+
.unwrap(),
59+
)
60+
.unwrap();
61+
62+
assert_eq!(1, records.len());
63+
assert_eq!(record, records[0]);
64+
}
65+
66+
#[test]
67+
fn test_insert_record_for_existing_signed_entity_type_replaces_it() {
68+
let connection = main_db_connection().unwrap();
69+
let record = OpenMessageRecord {
70+
is_expired: false,
71+
..OpenMessageRecord::dummy()
72+
};
73+
74+
connection
75+
.fetch_first(InsertOrReplaceOpenMessageQuery::one(record.clone()).unwrap())
76+
.unwrap();
77+
78+
let replaced_record = connection
79+
.fetch_first(
80+
InsertOrReplaceOpenMessageQuery::one(OpenMessageRecord {
81+
is_expired: true,
82+
..record.clone()
83+
})
84+
.unwrap(),
85+
)
86+
.unwrap();
87+
let count = connection.fetch(GetOpenMessageQuery::all()).unwrap().count();
88+
89+
assert_eq!(1, count);
90+
assert_eq!(Some(true), replaced_record.map(|r| r.is_expired));
91+
}
92+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
mod conditions;
12
mod delete_open_message;
23
mod get_open_message;
34
mod get_open_message_with_single_signatures;
45
mod insert_open_message;
6+
mod insert_or_replace_open_message;
57
mod update_open_message;
68

79
pub use delete_open_message::*;
810
pub use get_open_message::*;
911
pub use get_open_message_with_single_signatures::*;
1012
pub use insert_open_message::*;
13+
pub use insert_or_replace_open_message::*;
1114
pub use update_open_message::*;

mithril-aggregator/src/database/record/open_message.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ pub struct OpenMessageRecord {
3939
}
4040

4141
impl OpenMessageRecord {
42+
/// Creates a new random id that can be used for a new record
43+
pub fn new_id() -> Uuid {
44+
Uuid::new_v4()
45+
}
46+
4247
#[cfg(test)]
4348
/// Create a dumb OpenMessage instance mainly for test purposes
4449
pub fn dummy() -> Self {

mithril-aggregator/src/database/repository/open_message_repository.rs

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
use std::sync::Arc;
2-
1+
use async_trait::async_trait;
32
use chrono::Utc;
3+
use std::sync::Arc;
44

55
use mithril_common::StdResult;
66
use mithril_common::entities::{Epoch, ProtocolMessage, SignedEntityType};
77
use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
88

99
use crate::database::query::{
1010
DeleteOpenMessageQuery, GetOpenMessageQuery, GetOpenMessageWithSingleSignaturesQuery,
11-
InsertOpenMessageQuery, UpdateOpenMessageQuery,
11+
InsertOpenMessageQuery, InsertOrReplaceOpenMessageQuery, UpdateOpenMessageQuery,
1212
};
1313
use crate::database::record::{OpenMessageRecord, OpenMessageWithSingleSignaturesRecord};
14+
use crate::entities::OpenMessage;
15+
use crate::services::OpenMessageStorer;
1416

1517
/// ## Open message repository
1618
///
@@ -79,6 +81,21 @@ impl OpenMessageRepository {
7981
message.ok_or_else(|| panic!("Inserting an open_message should not return nothing."))
8082
}
8183

84+
/// Create, or replace if one with the same [SignedEntityType] they already exist, a
85+
/// [OpenMessageRecord] in the database.
86+
pub async fn create_or_replace_open_message(
87+
&self,
88+
record: OpenMessageRecord,
89+
) -> StdResult<OpenMessageRecord> {
90+
let message = self
91+
.connection
92+
.fetch_first(InsertOrReplaceOpenMessageQuery::one(record)?)?;
93+
94+
message.ok_or_else(|| {
95+
panic!("Inserting or replacing an open_message should not return nothing.")
96+
})
97+
}
98+
8299
/// Updates an [OpenMessageRecord] in the database.
83100
pub async fn update_open_message(
84101
&self,
@@ -102,6 +119,24 @@ impl OpenMessageRepository {
102119
}
103120
}
104121

122+
#[async_trait]
123+
impl OpenMessageStorer for OpenMessageRepository {
124+
async fn insert_or_replace_open_message(&self, open_message: OpenMessage) -> StdResult<()> {
125+
let record = OpenMessageRecord {
126+
open_message_id: OpenMessageRecord::new_id(),
127+
epoch: open_message.epoch,
128+
signed_entity_type: open_message.signed_entity_type,
129+
protocol_message: open_message.protocol_message,
130+
is_certified: open_message.is_certified,
131+
is_expired: open_message.is_expired,
132+
created_at: open_message.created_at,
133+
expires_at: open_message.expires_at,
134+
};
135+
self.create_or_replace_open_message(record).await?;
136+
Ok(())
137+
}
138+
}
139+
105140
#[cfg(test)]
106141
mod tests {
107142
use mithril_common::entities::{BlockNumber, CardanoDbBeacon};
@@ -261,6 +296,28 @@ mod tests {
261296
assert_eq!(open_message.epoch, message.epoch);
262297
}
263298

299+
#[tokio::test]
300+
async fn repository_create_or_replace_open_message() {
301+
let connection = get_connection().await;
302+
let repository = OpenMessageRepository::new(connection.clone());
303+
let mut inserted_record = repository
304+
.create_or_replace_open_message(OpenMessageRecord {
305+
epoch: Epoch(5),
306+
signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(6)),
307+
..OpenMessageRecord::dummy()
308+
})
309+
.await
310+
.unwrap();
311+
assert_eq!(Epoch(5), inserted_record.epoch);
312+
313+
inserted_record.epoch = Epoch(32);
314+
let replaced_record = repository
315+
.create_or_replace_open_message(inserted_record)
316+
.await
317+
.unwrap();
318+
assert_eq!(Epoch(32), replaced_record.epoch);
319+
}
320+
264321
#[tokio::test]
265322
async fn repository_update_open_message() {
266323
let connection = get_connection().await;

mithril-aggregator/src/entities/open_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ mod test {
120120
fn test_from_record() {
121121
let created_at = Utc::now();
122122
let record = OpenMessageRecord {
123-
open_message_id: Uuid::new_v4(),
123+
open_message_id: OpenMessageRecord::new_id(),
124124
epoch: Epoch(1),
125125
signed_entity_type: SignedEntityType::dummy(),
126126
protocol_message: ProtocolMessage::default(),

0 commit comments

Comments
 (0)