Skip to content

Commit f28fab4

Browse files
committed
add OpenMessageRepository
1 parent cadff9f commit f28fab4

File tree

3 files changed

+204
-12
lines changed

3 files changed

+204
-12
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ tar = "0.4.38"
3232
thiserror = "1.0.31"
3333
tokio = { version = "1.17.0", features = ["full"] }
3434
tokio-util = { version = "0.7.1", features = ["codec"] }
35+
uuid = { version = "1.3.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
3536
warp = "0.3"
3637

3738
[dev-dependencies]

mithril-aggregator/src/database/provider/open_message.rs

Lines changed: 186 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use chrono::NaiveDateTime;
22
use mithril_common::entities::Beacon;
33
use mithril_common::entities::Epoch;
4+
use mithril_common::StdError;
45

56
use mithril_common::sqlite::Provider;
67
use mithril_common::sqlite::SourceAlias;
@@ -10,18 +11,33 @@ use mithril_common::{
1011
};
1112
use sqlite::Row;
1213
use sqlite::{Connection, Value};
14+
use uuid::Uuid;
15+
16+
type StdResult<T> = Result<T, StdError>;
1317

1418
/// ## OpenMessage
1519
///
1620
/// An open message is a message open for signatures. Every signer may send a
1721
/// single signature for this message from which a multi signature will be
1822
/// generated if possible.
19-
struct OpenMessage {
20-
open_message_id: String,
23+
pub struct OpenMessage {
24+
/// OpenMessage unique identifier
25+
open_message_id: Uuid,
26+
27+
/// Epoch
2128
epoch: Epoch,
29+
30+
/// Beacon, this is the discriminant of this message type in the current
31+
/// Epoch
2232
beacon: Beacon,
33+
34+
/// Type of message
2335
signed_entity_type: SignedEntityType,
36+
37+
/// Message content
2438
message: String,
39+
40+
/// Message creation datetime, it is set by the database.
2541
created_at: NaiveDateTime,
2642
}
2743

@@ -31,6 +47,11 @@ impl SqLiteEntity for OpenMessage {
3147
Self: Sized,
3248
{
3349
let open_message_id = row.get::<String, _>(0);
50+
let open_message_id = Uuid::parse_str(&open_message_id).map_err(|e| {
51+
HydrationError::InvalidData(format!(
52+
"Invalid UUID in open_message.open_message_id: '{open_message_id}'. Error: {e}"
53+
))
54+
})?;
3455
let message = row.get::<String, _>(4);
3556
let epoch_settings_id = row.get::<i64, _>(1);
3657
let epoch_val = u64::try_from(epoch_settings_id)
@@ -96,13 +117,13 @@ struct OpenMessageProvider<'client> {
96117
}
97118

98119
impl<'client> OpenMessageProvider<'client> {
99-
fn new(connection: &'client Connection) -> Self {
120+
pub fn new(connection: &'client Connection) -> Self {
100121
Self { connection }
101122
}
102123

103124
fn get_epoch_condition(&self, epoch: Epoch) -> WhereCondition {
104125
WhereCondition::new(
105-
"{:open_message:}.epoch_settings_id = ?*",
126+
"epoch_settings_id = ?*",
106127
vec![Value::Integer(epoch.0 as i64)],
107128
)
108129
}
@@ -112,14 +133,14 @@ impl<'client> OpenMessageProvider<'client> {
112133
signed_entity_type: &SignedEntityType,
113134
) -> WhereCondition {
114135
WhereCondition::new(
115-
"{:open_message:}.signed_entity_type_id = ?*",
136+
"signed_entity_type_id = ?*",
116137
vec![Value::Integer(*signed_entity_type as i64)],
117138
)
118139
}
119140

120141
fn get_open_message_id_condition(&self, open_message_id: &str) -> WhereCondition {
121142
WhereCondition::new(
122-
"{:open_message:}.open_message_id = ?*",
143+
"open_message_id = ?*",
123144
vec![Value::String(open_message_id.to_owned())],
124145
)
125146
}
@@ -140,6 +161,128 @@ impl<'client> Provider<'client> for OpenMessageProvider<'client> {
140161
}
141162
}
142163

164+
struct InsertOpenMessageProvider<'client> {
165+
connection: &'client Connection,
166+
}
167+
impl<'client> InsertOpenMessageProvider<'client> {
168+
pub fn new(connection: &'client Connection) -> Self {
169+
Self { connection }
170+
}
171+
172+
fn get_insert_condition(
173+
&self,
174+
epoch: Epoch,
175+
beacon: &Beacon,
176+
signed_entity_type: &SignedEntityType,
177+
message: &str,
178+
) -> StdResult<WhereCondition> {
179+
let expression = "(open_message_id, epoch_settings_id, beacon, signed_entity_type_id, message) values (?*, ?*, ?*, ?*, ?*)";
180+
let parameters = vec![
181+
Value::String(Uuid::new_v4().to_string()),
182+
Value::Integer(epoch.0 as i64),
183+
Value::String(serde_json::to_string(beacon)?),
184+
Value::Integer(*signed_entity_type as i64),
185+
Value::String(message.to_string()),
186+
];
187+
188+
Ok(WhereCondition::new(expression, parameters))
189+
}
190+
}
191+
192+
impl<'client> Provider<'client> for InsertOpenMessageProvider<'client> {
193+
type Entity = OpenMessage;
194+
195+
fn get_connection(&'client self) -> &'client Connection {
196+
self.connection
197+
}
198+
199+
fn get_definition(&self, condition: &str) -> String {
200+
let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]);
201+
let projection = Self::Entity::get_projection().expand(aliases);
202+
203+
format!("insert into open_message {condition} returning {projection}")
204+
}
205+
}
206+
207+
struct DeleteOpenMessageProvider<'client> {
208+
connection: &'client Connection,
209+
}
210+
211+
impl<'client> DeleteOpenMessageProvider<'client> {
212+
pub fn new(connection: &'client Connection) -> Self {
213+
Self { connection }
214+
}
215+
216+
fn get_epoch_condition(&self, epoch: Epoch) -> WhereCondition {
217+
WhereCondition::new(
218+
"epoch_settings_id = ?*",
219+
vec![Value::Integer(epoch.0 as i64)],
220+
)
221+
}
222+
}
223+
224+
impl<'client> Provider<'client> for DeleteOpenMessageProvider<'client> {
225+
type Entity = OpenMessage;
226+
227+
fn get_connection(&'client self) -> &'client Connection {
228+
self.connection
229+
}
230+
231+
fn get_definition(&self, condition: &str) -> String {
232+
let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]);
233+
let projection = Self::Entity::get_projection().expand(aliases);
234+
235+
format!("delete from open_message where {condition} returning {projection}")
236+
}
237+
}
238+
239+
pub struct OpenMessageRepository<'client> {
240+
connection: &'client Connection,
241+
}
242+
243+
impl<'client> OpenMessageRepository<'client> {
244+
/// Return the latest [OpenMessage] for the given Epoch and [SignedEntityType].
245+
pub fn get_open_message(
246+
&self,
247+
epoch: Epoch,
248+
signed_entity_type: &SignedEntityType,
249+
) -> StdResult<Option<OpenMessage>> {
250+
let provider = OpenMessageProvider::new(self.connection);
251+
let filters = provider
252+
.get_epoch_condition(epoch)
253+
.and_where(provider.get_signed_entity_type_condition(signed_entity_type));
254+
let mut messages = provider.find(filters)?;
255+
256+
Ok(messages.next())
257+
}
258+
259+
/// Create a new [OpenMessage] in the database.
260+
pub fn create_open_message(
261+
&self,
262+
epoch: Epoch,
263+
beacon: &Beacon,
264+
signed_entity_type: &SignedEntityType,
265+
message: &str,
266+
) -> StdResult<OpenMessage> {
267+
let provider = InsertOpenMessageProvider::new(self.connection);
268+
let filters = provider.get_insert_condition(epoch, beacon, signed_entity_type, message)?;
269+
let mut cursor = provider.find(filters)?;
270+
271+
cursor
272+
.next()
273+
.ok_or_else(|| panic!("Inserting an open_message should not return nothing."))
274+
}
275+
276+
/// Remove all the [OpenMessage] for the given Epoch in the database.
277+
pub fn clean_epoch(&self, epoch: Epoch) -> StdResult<()> {
278+
let provider = DeleteOpenMessageProvider::new(self.connection);
279+
let filters = provider.get_epoch_condition(epoch);
280+
let _ = provider.find(filters)?;
281+
282+
Ok(())
283+
}
284+
}
285+
143286
#[cfg(test)]
144287
mod tests {
145288
use mithril_common::sqlite::SourceAlias;
@@ -163,7 +306,7 @@ mod tests {
163306
let provider = OpenMessageProvider::new(&connection);
164307
let (expr, params) = provider.get_epoch_condition(Epoch(12)).expand();
165308

166-
assert_eq!("{:open_message:}.epoch_settings_id = ?1".to_string(), expr);
309+
assert_eq!("epoch_settings_id = ?1".to_string(), expr);
167310
assert_eq!(vec![Value::Integer(12)], params,);
168311
}
169312

@@ -175,10 +318,7 @@ mod tests {
175318
.get_signed_entity_type_condition(&SignedEntityType::CardanoImmutableFilesFull)
176319
.expand();
177320

178-
assert_eq!(
179-
"{:open_message:}.signed_entity_type_id = ?1".to_string(),
180-
expr
181-
);
321+
assert_eq!("signed_entity_type_id = ?1".to_string(), expr);
182322
assert_eq!(vec![Value::Integer(2)], params,);
183323
}
184324

@@ -190,12 +330,46 @@ mod tests {
190330
.get_open_message_id_condition("cecd7983-8b3a-42b1-b778-6d75e87828ee")
191331
.expand();
192332

193-
assert_eq!("{:open_message:}.open_message_id = ?1".to_string(), expr);
333+
assert_eq!("open_message_id = ?1".to_string(), expr);
194334
assert_eq!(
195335
vec![Value::String(
196336
"cecd7983-8b3a-42b1-b778-6d75e87828ee".to_string()
197337
)],
198338
params,
199339
);
200340
}
341+
342+
#[test]
343+
fn insert_provider_condition() {
344+
let connection = Connection::open(":memory:").unwrap();
345+
let provider = InsertOpenMessageProvider::new(&connection);
346+
let (expr, params) = provider
347+
.get_insert_condition(
348+
Epoch(12),
349+
&Beacon::default(),
350+
&SignedEntityType::CardanoStakeDistribution,
351+
"This is a message",
352+
)
353+
.unwrap()
354+
.expand();
355+
356+
assert_eq!("(open_message_id, epoch_settings_id, beacon, signed_entity_type_id, message) values (?1, ?2, ?3, ?4, ?5)".to_string(), expr);
357+
assert_eq!(Value::Integer(12), params[1]);
358+
assert_eq!(
359+
Value::String(r#"{"network":"","epoch":0,"immutable_file_number":0}"#.to_string()),
360+
params[2]
361+
);
362+
assert_eq!(Value::Integer(1), params[3]);
363+
assert_eq!(Value::String("This is a message".to_string()), params[4]);
364+
}
365+
366+
#[test]
367+
fn delete_provider_epoch_condition() {
368+
let connection = Connection::open(":memory:").unwrap();
369+
let provider = DeleteOpenMessageProvider::new(&connection);
370+
let (expr, params) = provider.get_epoch_condition(Epoch(12)).expand();
371+
372+
assert_eq!("epoch_settings_id = ?1".to_string(), expr);
373+
assert_eq!(vec![Value::Integer(12)], params,);
374+
}
201375
}

0 commit comments

Comments
 (0)