Skip to content

Commit 84e1d50

Browse files
committed
Implement SignedEntityStore store adapter
1 parent ab858a8 commit 84e1d50

File tree

1 file changed

+132
-2
lines changed

1 file changed

+132
-2
lines changed

mithril-aggregator/src/database/provider/signed_entity.rs

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1+
use std::sync::Arc;
2+
13
use sqlite::{Connection, Value};
24

5+
use async_trait::async_trait;
6+
37
use mithril_common::{
48
entities::{SignedEntityType, Snapshot},
59
sqlite::{
610
EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity,
711
WhereCondition,
812
},
13+
store::adapter::{AdapterError, StoreAdapter},
914
};
1015

1116
use mithril_common::StdError;
17+
use tokio::sync::Mutex;
1218

1319
/// SignedEntity record is the representation of a stored signed_entity.
1420
#[derive(Debug, PartialEq, Clone)]
@@ -172,7 +178,9 @@ impl<'client> Provider<'client> for SignedEntityRecordProvider<'client> {
172178
fn get_definition(&self, condition: &str) -> String {
173179
let aliases = SourceAlias::new(&[("{:signed_entity:}", "se")]);
174180
let projection = Self::Entity::get_projection().expand(aliases);
175-
format!("select {projection} from signed_entity as se where {condition} order by created_at, ROWID desc")
181+
format!(
182+
"select {projection} from signed_entity as se where {condition} order by ROWID desc"
183+
)
176184
}
177185
}
178186

@@ -233,6 +241,81 @@ impl<'conn> Provider<'conn> for InsertSignedEntityRecordProvider<'conn> {
233241
}
234242
}
235243

244+
/// Service to deal with signed_entity (read & write).
245+
pub struct SignedEntityStoreAdapter {
246+
connection: Arc<Mutex<Connection>>,
247+
}
248+
249+
impl SignedEntityStoreAdapter {
250+
/// Create a new SignedEntityStoreAdapter service
251+
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
252+
Self { connection }
253+
}
254+
}
255+
256+
#[async_trait]
257+
impl StoreAdapter for SignedEntityStoreAdapter {
258+
type Key = String;
259+
type Record = Snapshot;
260+
261+
async fn store_record(
262+
&mut self,
263+
_key: &Self::Key,
264+
record: &Self::Record,
265+
) -> Result<(), AdapterError> {
266+
let connection = &*self.connection.lock().await;
267+
let provider = InsertSignedEntityRecordProvider::new(connection);
268+
let _signed_entity_record = provider
269+
.persist(record.to_owned().into())
270+
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
271+
272+
Ok(())
273+
}
274+
275+
async fn get_record(&self, key: &Self::Key) -> Result<Option<Self::Record>, AdapterError> {
276+
let connection = &*self.connection.lock().await;
277+
let provider = SignedEntityRecordProvider::new(connection);
278+
let mut cursor = provider
279+
.get_by_signed_entity_id(key.to_string())
280+
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
281+
let signed_entity = cursor
282+
.next()
283+
.map(|signed_entity_record| signed_entity_record.into());
284+
285+
Ok(signed_entity)
286+
}
287+
288+
async fn record_exists(&self, key: &Self::Key) -> Result<bool, AdapterError> {
289+
Ok(self.get_record(key).await?.is_some())
290+
}
291+
292+
async fn get_last_n_records(
293+
&self,
294+
how_many: usize,
295+
) -> Result<Vec<(Self::Key, Self::Record)>, AdapterError> {
296+
Ok(self
297+
.get_iter()
298+
.await?
299+
.take(how_many)
300+
.map(|se| (se.digest.to_owned(), se))
301+
.collect())
302+
}
303+
304+
async fn remove(&mut self, _key: &Self::Key) -> Result<Option<Self::Record>, AdapterError> {
305+
unimplemented!()
306+
}
307+
308+
async fn get_iter(&self) -> Result<Box<dyn Iterator<Item = Self::Record> + '_>, AdapterError> {
309+
let connection = &*self.connection.lock().await;
310+
let provider = SignedEntityRecordProvider::new(connection);
311+
let cursor = provider
312+
.get_all()
313+
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
314+
let signed_entities: Vec<Snapshot> = cursor.map(|se| se.into()).collect();
315+
Ok(Box::new(signed_entities.into_iter()))
316+
}
317+
}
318+
236319
#[cfg(test)]
237320
mod tests {
238321
use mithril_common::test_utils::fake_data;
@@ -252,7 +335,7 @@ mod tests {
252335
signed_entity_type: SignedEntityType::CardanoImmutableFilesFull,
253336
certificate_id: snapshot.certificate_hash,
254337
entity,
255-
created_at: "2023-04-07T21:44:51Z".to_string(),
338+
created_at: snapshot.created_at,
256339
}
257340
})
258341
.collect()
@@ -435,4 +518,51 @@ mod tests {
435518
assert_eq!(signed_entity_record, signed_entity_record_saved);
436519
}
437520
}
521+
522+
#[tokio::test]
523+
async fn test_store_adapter() {
524+
let signed_entity_records = fake_signed_entity_records(5);
525+
526+
let connection = Connection::open(":memory:").unwrap();
527+
setup_signed_entity_db(&connection, Vec::new()).unwrap();
528+
529+
let mut signed_entity_store_adapter =
530+
SignedEntityStoreAdapter::new(Arc::new(Mutex::new(connection)));
531+
532+
for signed_entity_record in &signed_entity_records {
533+
assert!(signed_entity_store_adapter
534+
.store_record(
535+
&signed_entity_record.signed_entity_id,
536+
&signed_entity_record.to_owned().into()
537+
)
538+
.await
539+
.is_ok());
540+
}
541+
542+
for signed_entity_record in &signed_entity_records {
543+
assert!(signed_entity_store_adapter
544+
.record_exists(&signed_entity_record.signed_entity_id)
545+
.await
546+
.unwrap());
547+
assert_eq!(
548+
Some(signed_entity_record.to_owned().into()),
549+
signed_entity_store_adapter
550+
.get_record(&signed_entity_record.signed_entity_id)
551+
.await
552+
.unwrap()
553+
);
554+
}
555+
556+
assert_eq!(
557+
signed_entity_records,
558+
signed_entity_store_adapter
559+
.get_last_n_records(signed_entity_records.len())
560+
.await
561+
.unwrap()
562+
.into_iter()
563+
.map(|(_k, v)| v.into())
564+
.rev()
565+
.collect::<Vec<SignedEntityRecord>>()
566+
)
567+
}
438568
}

0 commit comments

Comments
 (0)