Skip to content

Commit d02447f

Browse files
authored
Merge pull request #1353 from input-output-hk/greg/1327/message_service
HTTP message service
2 parents b17533f + 9a20205 commit d02447f

File tree

11 files changed

+306
-62
lines changed

11 files changed

+306
-62
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.4.11"
3+
version = "0.4.12"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/database/provider/certificate.rs

Lines changed: 92 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ use mithril_common::{
1111
HexEncodedAgregateVerificationKey, HexEncodedKey, ProtocolMessage, ProtocolParameters,
1212
ProtocolVersion, StakeDistributionParty,
1313
},
14+
messages::{
15+
CertificateListItemMessage, CertificateListItemMessageMetadata, CertificateMessage,
16+
CertificateMetadataMessagePart,
17+
},
1418
sqlite::{
1519
EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity,
1620
WhereCondition,
@@ -159,6 +163,57 @@ impl From<CertificateRecord> for Certificate {
159163
}
160164
}
161165

166+
impl From<CertificateRecord> for CertificateMessage {
167+
fn from(value: CertificateRecord) -> Self {
168+
let metadata = CertificateMetadataMessagePart {
169+
protocol_version: value.protocol_version,
170+
protocol_parameters: value.protocol_parameters,
171+
initiated_at: value.initiated_at,
172+
sealed_at: value.sealed_at,
173+
signers: value.signers,
174+
};
175+
let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() {
176+
(String::new(), value.signature)
177+
} else {
178+
(value.signature, String::new())
179+
};
180+
181+
CertificateMessage {
182+
hash: value.certificate_id,
183+
previous_hash: value.parent_certificate_id.unwrap_or_default(),
184+
beacon: value.beacon,
185+
metadata,
186+
protocol_message: value.protocol_message,
187+
signed_message: value.message,
188+
aggregate_verification_key: value.aggregate_verification_key,
189+
multi_signature,
190+
genesis_signature,
191+
}
192+
}
193+
}
194+
195+
impl From<CertificateRecord> for CertificateListItemMessage {
196+
fn from(value: CertificateRecord) -> Self {
197+
let metadata = CertificateListItemMessageMetadata {
198+
protocol_version: value.protocol_version,
199+
protocol_parameters: value.protocol_parameters,
200+
initiated_at: value.initiated_at,
201+
sealed_at: value.sealed_at,
202+
total_signers: value.signers.len(),
203+
};
204+
205+
CertificateListItemMessage {
206+
hash: value.certificate_id,
207+
previous_hash: value.parent_certificate_id.unwrap_or_default(),
208+
beacon: value.beacon,
209+
metadata,
210+
protocol_message: value.protocol_message,
211+
signed_message: value.message,
212+
aggregate_verification_key: value.aggregate_verification_key,
213+
}
214+
}
215+
}
216+
162217
impl SqLiteEntity for CertificateRecord {
163218
fn hydrate(row: sqlite::Row) -> Result<Self, HydrationError>
164219
where
@@ -549,15 +604,21 @@ impl CertificateRepository {
549604
}
550605

551606
/// Return the certificate corresponding to the given hash if any.
552-
pub async fn get_certificate(&self, hash: &str) -> StdResult<Option<Certificate>> {
607+
pub async fn get_certificate<T>(&self, hash: &str) -> StdResult<Option<T>>
608+
where
609+
T: From<CertificateRecord>,
610+
{
553611
let provider = CertificateRecordProvider::new(&self.connection);
554612
let mut cursor = provider.get_by_certificate_id(hash)?;
555613

556614
Ok(cursor.next().map(|v| v.into()))
557615
}
558616

559617
/// Return the latest certificates.
560-
pub async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
618+
pub async fn get_latest_certificates<T>(&self, last_n: usize) -> StdResult<Vec<T>>
619+
where
620+
T: From<CertificateRecord>,
621+
{
561622
let provider = CertificateRecordProvider::new(&self.connection);
562623
let cursor = provider.get_all()?;
563624

@@ -567,10 +628,10 @@ impl CertificateRepository {
567628
/// Return the first certificate signed per epoch as the reference
568629
/// certificate for this Epoch. This will be the parent certificate for all
569630
/// other certificates issued within this Epoch.
570-
pub async fn get_master_certificate_for_epoch(
571-
&self,
572-
epoch: Epoch,
573-
) -> StdResult<Option<Certificate>> {
631+
pub async fn get_master_certificate_for_epoch<T>(&self, epoch: Epoch) -> StdResult<Option<T>>
632+
where
633+
T: From<CertificateRecord>,
634+
{
574635
let provider = MasterCertificateProvider::new(&self.connection);
575636
let mut cursor = provider.find(provider.get_master_certificate_condition(epoch))?;
576637

@@ -1037,12 +1098,15 @@ protocol_message, signers, initiated_at, sealed_at) values \
10371098
}
10381099
}
10391100

1040-
let repository = CertificateRepository::new(connection);
1041-
let certificate = repository.get_certificate("whatever").await.unwrap();
1101+
let repository: CertificateRepository = CertificateRepository::new(connection);
1102+
let certificate = repository
1103+
.get_certificate::<Certificate>("whatever")
1104+
.await
1105+
.unwrap();
10421106
assert!(certificate.is_none());
10431107

10441108
let certificate = repository
1045-
.get_certificate(&expected_hash)
1109+
.get_certificate::<Certificate>(&expected_hash)
10461110
.await
10471111
.unwrap()
10481112
.expect("The certificate exist and should be returned.");
@@ -1090,9 +1154,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
10901154
let certificates = vec![];
10911155
insert_certificate_records(connection.clone(), certificates).await;
10921156

1093-
let repository = CertificateRepository::new(connection);
1157+
let repository: CertificateRepository = CertificateRepository::new(connection);
10941158
let certificate = repository
1095-
.get_master_certificate_for_epoch(Epoch(1))
1159+
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
10961160
.await
10971161
.unwrap();
10981162

@@ -1107,9 +1171,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
11071171
let expected_certificate: Certificate = certificate.clone().into();
11081172
insert_certificate_records(connection.clone(), vec![certificate]).await;
11091173

1110-
let repository = CertificateRepository::new(connection);
1174+
let repository: CertificateRepository = CertificateRepository::new(connection);
11111175
let certificate = repository
1112-
.get_master_certificate_for_epoch(Epoch(1))
1176+
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
11131177
.await
11141178
.unwrap()
11151179
.expect("This should return a certificate.");
@@ -1130,9 +1194,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
11301194
let expected_certificate: Certificate = certificates.first().unwrap().clone().into();
11311195
insert_certificate_records(connection.clone(), certificates).await;
11321196

1133-
let repository = CertificateRepository::new(connection);
1197+
let repository: CertificateRepository = CertificateRepository::new(connection);
11341198
let certificate = repository
1135-
.get_master_certificate_for_epoch(Epoch(1))
1199+
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
11361200
.await
11371201
.unwrap()
11381202
.expect("This should return a certificate.");
@@ -1153,9 +1217,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
11531217
let expected_certificate: Certificate = certificates.first().unwrap().clone().into();
11541218
insert_certificate_records(connection.clone(), certificates).await;
11551219

1156-
let repository = CertificateRepository::new(connection);
1220+
let repository: CertificateRepository = CertificateRepository::new(connection);
11571221
let certificate = repository
1158-
.get_master_certificate_for_epoch(Epoch(2))
1222+
.get_master_certificate_for_epoch::<Certificate>(Epoch(2))
11591223
.await
11601224
.unwrap()
11611225
.expect("This should return a certificate.");
@@ -1177,9 +1241,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
11771241
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
11781242
insert_certificate_records(connection.clone(), certificates).await;
11791243

1180-
let repository = CertificateRepository::new(connection);
1244+
let repository: CertificateRepository = CertificateRepository::new(connection);
11811245
let certificate = repository
1182-
.get_master_certificate_for_epoch(Epoch(2))
1246+
.get_master_certificate_for_epoch::<Certificate>(Epoch(2))
11831247
.await
11841248
.unwrap()
11851249
.expect("This should return a certificate.");
@@ -1203,7 +1267,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
12031267
let expected_certificate: Certificate = certificates.get(3).unwrap().clone().into();
12041268
insert_certificate_records(connection.clone(), certificates).await;
12051269

1206-
let repository = CertificateRepository::new(connection);
1270+
let repository: CertificateRepository = CertificateRepository::new(connection);
12071271
let certificate = repository
12081272
.get_master_certificate_for_epoch(Epoch(2))
12091273
.await
@@ -1224,9 +1288,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
12241288
];
12251289
insert_certificate_records(connection.clone(), certificates).await;
12261290

1227-
let repository = CertificateRepository::new(connection);
1291+
let repository: CertificateRepository = CertificateRepository::new(connection);
12281292
let certificate = repository
1229-
.get_master_certificate_for_epoch(Epoch(3))
1293+
.get_master_certificate_for_epoch::<Certificate>(Epoch(3))
12301294
.await
12311295
.unwrap();
12321296

@@ -1247,7 +1311,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
12471311
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
12481312
insert_certificate_records(connection.clone(), certificates).await;
12491313

1250-
let repository = CertificateRepository::new(connection);
1314+
let repository: CertificateRepository = CertificateRepository::new(connection);
12511315
let certificate = repository
12521316
.get_master_certificate_for_epoch(Epoch(2))
12531317
.await
@@ -1273,7 +1337,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
12731337
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
12741338
insert_certificate_records(connection.clone(), certificates).await;
12751339

1276-
let repository = CertificateRepository::new(connection);
1340+
let repository: CertificateRepository = CertificateRepository::new(connection);
12771341
let certificate = repository
12781342
.get_master_certificate_for_epoch(Epoch(2))
12791343
.await
@@ -1297,7 +1361,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
12971361
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
12981362
insert_certificate_records(connection.clone(), certificates).await;
12991363

1300-
let repository = CertificateRepository::new(connection);
1364+
let repository: CertificateRepository = CertificateRepository::new(connection);
13011365
let certificate = repository
13021366
.get_master_certificate_for_epoch(Epoch(2))
13031367
.await
@@ -1322,9 +1386,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
13221386
}
13231387
}
13241388

1325-
let repository = CertificateRepository::new(connection);
1389+
let repository: CertificateRepository = CertificateRepository::new(connection);
13261390
let certificate = repository
1327-
.get_master_certificate_for_epoch(*epoch)
1391+
.get_master_certificate_for_epoch::<Certificate>(*epoch)
13281392
.await
13291393
.unwrap()
13301394
.expect("This should return a certificate.");
@@ -1337,7 +1401,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
13371401
let (certificates, _) = setup_certificate_chain(5, 3);
13381402
let mut deps = DependenciesBuilder::new(Configuration::new_sample());
13391403
let connection = deps.get_sqlite_connection().await.unwrap();
1340-
let repository = CertificateRepository::new(connection);
1404+
let repository: CertificateRepository = CertificateRepository::new(connection);
13411405
let certificate = repository
13421406
.create_certificate(certificates[4].clone())
13431407
.await

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ use crate::{
5252
event_store::{EventMessage, EventStore, TransmitterService},
5353
http_server::routes::router,
5454
services::{
55-
CertifierService, MithrilCertifierService, MithrilEpochService, MithrilSignedEntityService,
56-
MithrilStakeDistributionService, MithrilTickerService, SignedEntityService,
57-
StakeDistributionService, TickerService,
55+
CertifierService, HttpMessageService, MithrilCertifierService, MithrilEpochService,
56+
MithrilHttpMessageService, MithrilSignedEntityService, MithrilStakeDistributionService,
57+
MithrilTickerService, SignedEntityService, StakeDistributionService, TickerService,
5858
},
5959
tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter},
6060
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
@@ -185,6 +185,9 @@ pub struct DependenciesBuilder {
185185

186186
/// Signed Entity storer
187187
pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
188+
189+
/// HTTP Message service
190+
pub http_message_service: Option<Arc<dyn HttpMessageService>>,
188191
}
189192

190193
impl DependenciesBuilder {
@@ -225,6 +228,7 @@ impl DependenciesBuilder {
225228
certifier_service: None,
226229
epoch_service: None,
227230
signed_entity_storer: None,
231+
http_message_service: None,
228232
}
229233
}
230234

@@ -1023,6 +1027,7 @@ impl DependenciesBuilder {
10231027
ticker_service: self.get_ticker_service().await?,
10241028
signed_entity_storer: self.get_signed_entity_storer().await?,
10251029
signer_getter: self.get_signer_store().await?,
1030+
http_message_service: self.get_http_message_service().await?,
10261031
};
10271032

10281033
Ok(dependency_manager)
@@ -1208,7 +1213,26 @@ impl DependenciesBuilder {
12081213
Ok(self.certifier_service.as_ref().cloned().unwrap())
12091214
}
12101215

1211-
/// Remove the dependencies builder from memory to release Arc.
1216+
/// build HTTP message service
1217+
pub async fn build_http_message_service(&mut self) -> Result<Arc<dyn HttpMessageService>> {
1218+
let certificate_repository = Arc::new(CertificateRepository::new(
1219+
self.get_sqlite_connection().await?,
1220+
));
1221+
let service = MithrilHttpMessageService::new(certificate_repository);
1222+
1223+
Ok(Arc::new(service))
1224+
}
1225+
1226+
/// [HttpMessageService] service
1227+
pub async fn get_http_message_service(&mut self) -> Result<Arc<dyn HttpMessageService>> {
1228+
if self.http_message_service.is_none() {
1229+
self.http_message_service = Some(self.build_http_message_service().await?);
1230+
}
1231+
1232+
Ok(self.http_message_service.as_ref().cloned().unwrap())
1233+
}
1234+
1235+
/// Remove the dependencies builder from memory to release Arc instances.
12121236
pub async fn vanish(self) {
12131237
self.drop_sqlite_connection().await;
12141238
}

mithril-aggregator/src/dependency_injection/containers.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use mithril_common::{
1616
BeaconProvider,
1717
};
1818

19-
use crate::services::EpochService;
19+
use crate::services::{EpochService, HttpMessageService};
2020
use crate::{
2121
configuration::*,
2222
database::provider::{CertificateRepository, SignedEntityStorer, SignerGetter, StakePoolStore},
@@ -132,6 +132,9 @@ pub struct DependencyContainer {
132132

133133
/// Signer getter service
134134
pub signer_getter: Arc<dyn SignerGetter>,
135+
136+
/// HTTP message service
137+
pub http_message_service: Arc<dyn HttpMessageService>,
135138
}
136139

137140
#[doc(hidden)]

0 commit comments

Comments
 (0)