Skip to content

Commit 24da918

Browse files
committed
Add a view on stake signer per version
Use request syntax of sqlite 3.38
1 parent f67adf1 commit 24da918

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

mithril-aggregator/src/event_store/database/migration.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,53 @@ group by action, date;
3333
create index metric_date_index on event(date(json_extract(content, '$.content.date')));
3434
"#,
3535
),
36+
SqlMigration::new(
37+
3,
38+
r#"
39+
create view stake_signer_version as with
40+
signer_version as (
41+
select
42+
content->>'$.content.party_id' as party_id,
43+
content->>'$.headers.signer-node-version' as node_version,
44+
content->>'$.headers.epoch' as epoch,
45+
content->>'$.content.stake' as stakes
46+
from event
47+
order by created_at desc
48+
),
49+
stakes_version as (
50+
select
51+
party_id,
52+
case
53+
when instr(signer_version.node_version, '+') > 0
54+
then substr(signer_version.node_version, 0, instr(signer_version.node_version, '+'))
55+
else signer_version.node_version
56+
end as version,
57+
stakes,
58+
cast(epoch as int) as epoch
59+
from signer_version
60+
group by party_id, epoch
61+
),
62+
summed_stakes_version as (
63+
select
64+
epoch,
65+
version,
66+
party_id,
67+
sum(stakes) over (partition by epoch) as total_epoch_stakes,
68+
sum(stakes) over (partition by epoch, version) as stakes_version
69+
from stakes_version
70+
order by epoch desc
71+
)
72+
select
73+
epoch,
74+
version,
75+
total_epoch_stakes,
76+
stakes_version,
77+
printf('%02d %%', round((stakes_version * 100) / (total_epoch_stakes * 1.0))) as stakes_ratio,
78+
count(party_id) as pool_count
79+
from summed_stakes_version
80+
group by epoch, version
81+
order by epoch desc, version desc;
82+
"#,
83+
),
3684
]
3785
}

mithril-aggregator/src/event_store/database/repository.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,142 @@ mod tests {
146146
assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
147147
}
148148
}
149+
150+
mod stake_signer_version {
151+
use std::sync::Arc;
152+
153+
use crate::event_store::{
154+
database::test_helper::event_store_db_connection, TransmitterService,
155+
};
156+
use crate::test_tools::TestLogger;
157+
use mithril_common::entities::Stake;
158+
use mithril_common::{entities::SignerWithStake, test_utils::fake_data, StdResult};
159+
use sqlite::ConnectionThreadSafe;
160+
161+
use super::{EventMessage, EventPersister};
162+
163+
/// Insert a signer registration event in the database.
164+
fn insert_registration_event(
165+
persister: &EventPersister,
166+
epoch: &str,
167+
party_id: &str,
168+
stake: Stake,
169+
signer_node_version: &str,
170+
) {
171+
let headers = vec![
172+
("signer-node-version", signer_node_version),
173+
("epoch", epoch),
174+
];
175+
176+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
177+
let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
178+
179+
let signers = fake_data::signers_with_stakes(1);
180+
let signer = SignerWithStake {
181+
party_id: party_id.to_string(),
182+
stake,
183+
..signers[0].clone()
184+
};
185+
186+
let _result = transmitter_service.send_event_message::<SignerWithStake>(
187+
"HTTP::signer_register",
188+
"register_signer",
189+
&signer,
190+
headers,
191+
);
192+
193+
let message: EventMessage = rx.try_recv().unwrap();
194+
195+
let _event = persister.persist(message).unwrap();
196+
}
197+
198+
fn get_all_registrations(
199+
connection: Arc<ConnectionThreadSafe>,
200+
) -> StdResult<Vec<(i64, String, i64, i64, String, i64)>> {
201+
let query = "select
202+
epoch,
203+
version,
204+
total_epoch_stakes,
205+
stakes_version,
206+
stakes_ratio,
207+
pool_count
208+
from stake_signer_version;";
209+
let mut statement = connection.prepare(query)?;
210+
let mut result = Vec::new();
211+
while let Ok(sqlite::State::Row) = statement.next() {
212+
result.push((
213+
statement.read::<i64, _>("epoch")?,
214+
statement.read::<String, _>("version")?,
215+
statement.read::<i64, _>("total_epoch_stakes")?,
216+
statement.read::<i64, _>("stakes_version")?,
217+
statement.read::<String, _>("stakes_ratio")?,
218+
statement.read::<i64, _>("pool_count")?,
219+
));
220+
}
221+
222+
Ok(result)
223+
}
224+
225+
#[test]
226+
fn retrieved_node_version() {
227+
let connection = Arc::new(event_store_db_connection().unwrap());
228+
let persister = EventPersister::new(connection.clone());
229+
230+
insert_registration_event(&persister, "3", "A", 15, "0.2.234");
231+
insert_registration_event(&persister, "4", "A", 15, "15.24.32");
232+
insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
233+
234+
let result = get_all_registrations(connection).unwrap();
235+
236+
assert!(result.contains(&(3, "0.2.234".to_string(), 15, 15, "100 %".to_string(), 1)));
237+
assert!(result.contains(&(4, "15.24.32".to_string(), 15, 15, "100 %".to_string(), 1)));
238+
assert!(result.contains(&(5, "0.4.789".to_string(), 15, 15, "100 %".to_string(), 1)));
239+
}
240+
241+
#[test]
242+
fn retrieved_total_by_epoch() {
243+
let connection = Arc::new(event_store_db_connection().unwrap());
244+
let persister = EventPersister::new(connection.clone());
245+
246+
insert_registration_event(&persister, "8", "A", 20, "1.0.2");
247+
insert_registration_event(&persister, "8", "B", 15, "1.0.2");
248+
insert_registration_event(&persister, "9", "A", 56, "1.0.2");
249+
insert_registration_event(&persister, "9", "B", 31, "1.0.2");
250+
let result = get_all_registrations(connection).unwrap();
251+
252+
assert!(result.contains(&(8, "1.0.2".to_string(), 35, 35, "100 %".to_string(), 2)));
253+
assert!(result.contains(&(9, "1.0.2".to_string(), 87, 87, "100 %".to_string(), 2)));
254+
}
255+
256+
#[test]
257+
fn retrieved_percentage_per_version() {
258+
let connection = Arc::new(event_store_db_connection().unwrap());
259+
let persister = EventPersister::new(connection.clone());
260+
261+
insert_registration_event(&persister, "8", "A", 90, "1.0.2");
262+
insert_registration_event(&persister, "8", "B", 30, "1.0.2");
263+
insert_registration_event(&persister, "8", "C", 80, "1.0.4");
264+
let result = get_all_registrations(connection).unwrap();
265+
266+
assert!(result.contains(&(8, "1.0.2".to_string(), 200, 120, "60 %".to_string(), 2)));
267+
assert!(result.contains(&(8, "1.0.4".to_string(), 200, 80, "40 %".to_string(), 1)));
268+
}
269+
270+
#[test]
271+
fn retrieved_percentage_per_epoch() {
272+
let connection = Arc::new(event_store_db_connection().unwrap());
273+
let persister = EventPersister::new(connection.clone());
274+
275+
insert_registration_event(&persister, "8", "A", 6, "1.0.2");
276+
insert_registration_event(&persister, "8", "B", 4, "1.0.4");
277+
insert_registration_event(&persister, "9", "A", 28, "1.0.2");
278+
insert_registration_event(&persister, "9", "B", 12, "1.0.4");
279+
let result = get_all_registrations(connection).unwrap();
280+
281+
assert!(result.contains(&(8, "1.0.2".to_string(), 10, 6, "60 %".to_string(), 1)));
282+
assert!(result.contains(&(8, "1.0.4".to_string(), 10, 4, "40 %".to_string(), 1)));
283+
assert!(result.contains(&(9, "1.0.2".to_string(), 40, 28, "70 %".to_string(), 1)));
284+
assert!(result.contains(&(9, "1.0.4".to_string(), 40, 12, "30 %".to_string(), 1)));
285+
}
286+
}
149287
}

0 commit comments

Comments
 (0)