Skip to content

Commit 27fdfa0

Browse files
authored
Merge pull request #1934 from input-output-hk/ensemble/1900/aggregator_signatures_buffer
Aggregator single signatures buffer
2 parents 62dbe88 + 5b578c7 commit 27fdfa0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2699
-386
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ As a minor extension, we have adopted a slightly different versioning convention
1111

1212
- Optimizations of the state machine used by the signer to create individual signatures.
1313

14+
- Support for buffering of incoming single signatures by the aggregator if it can not aggregate them yet
15+
1416
- Crates versions:
1517

1618
| Crate | Version |

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.63"
3+
version = "0.5.64"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/database/migration.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,23 @@ pragma foreign_keys=true;
758758
26,
759759
r#"
760760
create unique index signed_entity_unique_index on signed_entity(signed_entity_type_id, beacon);
761+
"#,
762+
),
763+
// Migration 27
764+
SqlMigration::new(
765+
27,
766+
r#"
767+
create table buffered_single_signature (
768+
signed_entity_type_id integer not null,
769+
party_id text not null,
770+
lottery_indexes json not null,
771+
signature text not null,
772+
created_at text not null,
773+
primary key (signed_entity_type_id, party_id)
774+
);
775+
776+
create index buffered_single_signature_signed_entity_type_id on buffered_single_signature(signed_entity_type_id);
777+
create index buffered_single_signature_party_id_index on buffered_single_signature(party_id);
761778
"#,
762779
),
763780
]
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use sqlite::Value;
2+
3+
use mithril_common::entities::{PartyId, SignedEntityTypeDiscriminants};
4+
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
5+
6+
use crate::database::record::BufferedSingleSignatureRecord;
7+
8+
/// Query to delete old [BufferedSingleSignatureRecord] from the sqlite database
9+
pub struct DeleteBufferedSingleSignatureQuery {
10+
condition: WhereCondition,
11+
}
12+
13+
impl DeleteBufferedSingleSignatureQuery {
14+
pub fn by_discriminant_and_party_ids(
15+
signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
16+
party_ids: Vec<PartyId>,
17+
) -> Self {
18+
let ids_values = party_ids.into_iter().map(Value::String).collect();
19+
20+
Self {
21+
condition: WhereCondition::new(
22+
"signed_entity_type_id = ?*",
23+
vec![Value::Integer(
24+
signed_entity_type_discriminant.index() as i64
25+
)],
26+
)
27+
.and_where(WhereCondition::where_in("party_id", ids_values)),
28+
}
29+
}
30+
}
31+
32+
impl Query for DeleteBufferedSingleSignatureQuery {
33+
type Entity = BufferedSingleSignatureRecord;
34+
35+
fn filters(&self) -> WhereCondition {
36+
self.condition.clone()
37+
}
38+
39+
fn get_definition(&self, condition: &str) -> String {
40+
// it is important to alias the fields with the same name as the table
41+
// since the table cannot be aliased in a RETURNING statement in SQLite.
42+
let projection = Self::Entity::get_projection().expand(SourceAlias::new(&[(
43+
"{:buffered_single_signature:}",
44+
"buffered_single_signature",
45+
)]));
46+
47+
format!("delete from buffered_single_signature where {condition} returning {projection}")
48+
}
49+
}
50+
51+
#[cfg(test)]
52+
mod tests {
53+
use mithril_common::entities::SignedEntityTypeDiscriminants::{
54+
CardanoTransactions, MithrilStakeDistribution,
55+
};
56+
use mithril_persistence::sqlite::ConnectionExtensions;
57+
58+
use crate::database::query::GetBufferedSingleSignatureQuery;
59+
use crate::database::record::strip_buffered_sigs_date;
60+
use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection};
61+
62+
use super::*;
63+
64+
#[test]
65+
fn test_delete_buffered_single_signature_records_by_discriminant_and_party_ids() {
66+
let connection = main_db_connection().unwrap();
67+
let records = BufferedSingleSignatureRecord::fakes(&[
68+
("party_1", MithrilStakeDistribution),
69+
("party_2", MithrilStakeDistribution),
70+
("party_3", MithrilStakeDistribution),
71+
("party_1", CardanoTransactions),
72+
("party_2", CardanoTransactions),
73+
]);
74+
insert_buffered_single_signatures(&connection, records.clone()).unwrap();
75+
76+
let cursor = connection
77+
.fetch(
78+
DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids(
79+
MithrilStakeDistribution,
80+
vec!["party_1".into(), "party_3".into()],
81+
),
82+
)
83+
.unwrap();
84+
assert_eq!(2, cursor.count());
85+
86+
let remaining_records: Vec<BufferedSingleSignatureRecord> = connection
87+
.fetch_collect(GetBufferedSingleSignatureQuery::all())
88+
.unwrap();
89+
assert_eq!(
90+
strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
91+
("party_2", CardanoTransactions),
92+
("party_1", CardanoTransactions),
93+
("party_2", MithrilStakeDistribution),
94+
])),
95+
strip_buffered_sigs_date(&remaining_records)
96+
);
97+
98+
let cursor = connection
99+
.fetch(
100+
DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids(
101+
CardanoTransactions,
102+
vec!["party_1".into(), "party_2".into()],
103+
),
104+
)
105+
.unwrap();
106+
assert_eq!(2, cursor.count());
107+
108+
let remaining_records: Vec<BufferedSingleSignatureRecord> = connection
109+
.fetch_collect(GetBufferedSingleSignatureQuery::all())
110+
.unwrap();
111+
assert_eq!(
112+
strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[(
113+
"party_2",
114+
MithrilStakeDistribution
115+
),])),
116+
strip_buffered_sigs_date(&remaining_records)
117+
);
118+
}
119+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use sqlite::Value;
2+
3+
use mithril_common::entities::SignedEntityTypeDiscriminants;
4+
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
5+
6+
use crate::database::record::BufferedSingleSignatureRecord;
7+
8+
/// Simple queries to retrieve [BufferedSingleSignatureRecord] from the sqlite database.
9+
pub struct GetBufferedSingleSignatureQuery {
10+
condition: WhereCondition,
11+
}
12+
13+
impl GetBufferedSingleSignatureQuery {
14+
#[cfg(test)]
15+
pub(crate) fn all() -> Self {
16+
Self {
17+
condition: WhereCondition::default(),
18+
}
19+
}
20+
21+
pub fn by_discriminant(signed_entity_type_discriminant: SignedEntityTypeDiscriminants) -> Self {
22+
Self {
23+
condition: WhereCondition::new(
24+
"signed_entity_type_id = ?*",
25+
vec![Value::Integer(
26+
signed_entity_type_discriminant.index() as i64
27+
)],
28+
),
29+
}
30+
}
31+
}
32+
33+
impl Query for GetBufferedSingleSignatureQuery {
34+
type Entity = BufferedSingleSignatureRecord;
35+
36+
fn filters(&self) -> WhereCondition {
37+
self.condition.clone()
38+
}
39+
40+
fn get_definition(&self, condition: &str) -> String {
41+
let aliases = SourceAlias::new(&[("{:buffered_single_signature:}", "b")]);
42+
let projection = Self::Entity::get_projection().expand(aliases);
43+
format!("select {projection} from buffered_single_signature as b where {condition} order by ROWID desc")
44+
}
45+
}
46+
47+
#[cfg(test)]
48+
mod tests {
49+
use mithril_common::entities::SignedEntityTypeDiscriminants::{
50+
CardanoImmutableFilesFull, CardanoTransactions, MithrilStakeDistribution,
51+
};
52+
use mithril_persistence::sqlite::ConnectionExtensions;
53+
54+
use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection};
55+
56+
use super::*;
57+
58+
#[test]
59+
fn test_get_all() {
60+
let connection = main_db_connection().unwrap();
61+
let records = BufferedSingleSignatureRecord::fakes(&[
62+
("party1", MithrilStakeDistribution),
63+
("party2", CardanoTransactions),
64+
("party3", MithrilStakeDistribution),
65+
]);
66+
insert_buffered_single_signatures(&connection, records.clone()).unwrap();
67+
68+
let stored_records: Vec<BufferedSingleSignatureRecord> = connection
69+
.fetch_collect(GetBufferedSingleSignatureQuery::all())
70+
.unwrap();
71+
72+
assert_eq!(
73+
records.into_iter().rev().collect::<Vec<_>>(),
74+
stored_records
75+
);
76+
}
77+
78+
#[test]
79+
fn test_get_buffered_single_signature_records_by_discriminant() {
80+
let connection = main_db_connection().unwrap();
81+
let msd_records = BufferedSingleSignatureRecord::fakes(&[
82+
("party1", MithrilStakeDistribution),
83+
("party2", MithrilStakeDistribution),
84+
]);
85+
let ctx_records = BufferedSingleSignatureRecord::fakes(&[("party3", CardanoTransactions)]);
86+
insert_buffered_single_signatures(
87+
&connection,
88+
[msd_records.clone(), ctx_records.clone()].concat(),
89+
)
90+
.unwrap();
91+
92+
let stored_msd_records: Vec<BufferedSingleSignatureRecord> = connection
93+
.fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant(
94+
MithrilStakeDistribution,
95+
))
96+
.unwrap();
97+
assert_eq!(
98+
msd_records.into_iter().rev().collect::<Vec<_>>(),
99+
stored_msd_records
100+
);
101+
102+
let stored_ctx_records: Vec<BufferedSingleSignatureRecord> = connection
103+
.fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant(
104+
CardanoTransactions,
105+
))
106+
.unwrap();
107+
assert_eq!(
108+
ctx_records.into_iter().rev().collect::<Vec<_>>(),
109+
stored_ctx_records
110+
);
111+
112+
let cursor = connection
113+
.fetch(GetBufferedSingleSignatureQuery::by_discriminant(
114+
CardanoImmutableFilesFull,
115+
))
116+
.unwrap();
117+
assert_eq!(0, cursor.count());
118+
}
119+
}

0 commit comments

Comments
 (0)