Skip to content

Commit 4cc1342

Browse files
committed
Cleanup runtime runner in aggregator
And remove create & upload snapshot archive and create snapshot functions.
1 parent 2c495f7 commit 4cc1342

File tree

2 files changed

+4
-176
lines changed

2 files changed

+4
-176
lines changed

mithril-aggregator/src/runtime/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ impl From<Box<dyn StdError + Sync + Send>> for RuntimeError {
6060
// TODO: Are these errors still relevant, do we need to remove them?
6161
#[allow(clippy::enum_variant_names)]
6262
pub enum RunnerError {
63-
/// Protocol message part is missing
64-
#[error("Missing protocol message: '{0}'.")]
65-
MissingProtocolMessage(String),
66-
6763
/// No stack distribution found
6864
#[error("Missing stack distribution: '{0}'.")]
6965
MissingStakeDistribution(String),

mithril-aggregator/src/runtime/runner.rs

Lines changed: 4 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use async_trait::async_trait;
2-
use chrono::Utc;
32
use mithril_common::entities::Epoch;
43
use mithril_common::entities::SignedEntityType;
54
use mithril_common::store::StakeStorer;
65
use slog_scope::{debug, warn};
76

87
use mithril_common::crypto_helper::ProtocolStakeDistribution;
98
use mithril_common::entities::{
10-
Beacon, Certificate, CertificatePending, ProtocolMessage, ProtocolMessagePartKey, Snapshot,
9+
Beacon, Certificate, CertificatePending, ProtocolMessage, ProtocolMessagePartKey,
1110
};
1211
use mithril_common::CardanoNetwork;
1312

@@ -17,10 +16,8 @@ use std::path::PathBuf;
1716
use std::sync::Arc;
1817

1918
use crate::entities::OpenMessage;
20-
use crate::snapshot_uploaders::SnapshotLocation;
21-
use crate::snapshotter::OngoingSnapshot;
2219
use crate::RuntimeError;
23-
use crate::{DependencyManager, ProtocolError, SnapshotError};
20+
use crate::{DependencyManager, ProtocolError};
2421

2522
#[cfg(test)]
2623
use mockall::automock;
@@ -124,31 +121,6 @@ pub trait AggregatorRunnerTrait: Sync + Send {
124121
signed_entity_type: &SignedEntityType,
125122
) -> Result<Option<Certificate>, Box<dyn StdError + Sync + Send>>;
126123

127-
/// Create an archive of the cardano node db directory naming it after the given beacon.
128-
///
129-
/// Returns the path of the created archive and the archive size as byte.
130-
async fn create_snapshot_archive(
131-
&self,
132-
beacon: &Beacon,
133-
protocol_message: &ProtocolMessage,
134-
) -> Result<OngoingSnapshot, Box<dyn StdError + Sync + Send>>;
135-
136-
/// Upload the snapshot at the given location using the configured uploader(s).
137-
///
138-
/// **Important**: the snapshot is removed after the upload succeeded.
139-
async fn upload_snapshot_archive(
140-
&self,
141-
ongoing_snapshot: &OngoingSnapshot,
142-
) -> Result<Vec<SnapshotLocation>, Box<dyn StdError + Sync + Send>>;
143-
144-
/// Create a snapshot and save it to the given locations.
145-
async fn create_and_save_snapshot(
146-
&self,
147-
certificate: Certificate,
148-
ongoing_snapshot: &OngoingSnapshot,
149-
remote_locations: Vec<String>,
150-
) -> Result<Snapshot, Box<dyn StdError + Sync + Send>>;
151-
152124
/// Create an artifact and persist it.
153125
async fn create_and_save_artifact(
154126
&self,
@@ -469,92 +441,6 @@ impl AggregatorRunnerTrait for AggregatorRunner {
469441
.await
470442
}
471443

472-
async fn create_snapshot_archive(
473-
&self,
474-
beacon: &Beacon,
475-
protocol_message: &ProtocolMessage,
476-
) -> Result<OngoingSnapshot, Box<dyn StdError + Sync + Send>> {
477-
debug!("RUNNER: create snapshot archive");
478-
479-
let snapshotter = self.dependencies.snapshotter.clone();
480-
let snapshot_digest = protocol_message
481-
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
482-
.ok_or_else(|| {
483-
RunnerError::MissingProtocolMessage(format!(
484-
"no digest message part found for beacon '{beacon:?}'."
485-
))
486-
})?;
487-
let snapshot_name = format!(
488-
"{}-e{}-i{}.{}.tar.gz",
489-
beacon.network, beacon.epoch.0, beacon.immutable_file_number, snapshot_digest
490-
);
491-
// spawn a separate thread to prevent blocking
492-
let ongoing_snapshot =
493-
tokio::task::spawn_blocking(move || -> Result<OngoingSnapshot, SnapshotError> {
494-
snapshotter.snapshot(&snapshot_name)
495-
})
496-
.await??;
497-
498-
debug!(" > snapshot created: '{:?}'", ongoing_snapshot);
499-
500-
Ok(ongoing_snapshot)
501-
}
502-
503-
async fn upload_snapshot_archive(
504-
&self,
505-
ongoing_snapshot: &OngoingSnapshot,
506-
) -> Result<Vec<SnapshotLocation>, Box<dyn StdError + Sync + Send>> {
507-
debug!("RUNNER: upload snapshot archive");
508-
let location = self
509-
.dependencies
510-
.snapshot_uploader
511-
.upload_snapshot(ongoing_snapshot.get_file_path())
512-
.await?;
513-
514-
if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
515-
warn!(
516-
" > Post upload ongoing snapshot file removal failure: {}",
517-
error
518-
);
519-
}
520-
521-
Ok(vec![location])
522-
}
523-
524-
async fn create_and_save_snapshot(
525-
&self,
526-
certificate: Certificate,
527-
ongoing_snapshot: &OngoingSnapshot,
528-
remote_locations: Vec<String>,
529-
) -> Result<Snapshot, Box<dyn StdError + Sync + Send>> {
530-
debug!("RUNNER: create and save snapshot");
531-
let snapshot_digest = certificate
532-
.protocol_message
533-
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
534-
.ok_or_else(|| {
535-
RunnerError::MissingProtocolMessage(format!(
536-
"message part 'digest' not found for snapshot '{}'.",
537-
ongoing_snapshot.get_file_path().display()
538-
))
539-
})?
540-
.to_owned();
541-
let snapshot = Snapshot::new(
542-
snapshot_digest,
543-
certificate.beacon,
544-
certificate.hash,
545-
*ongoing_snapshot.get_file_size(),
546-
format!("{:?}", Utc::now()),
547-
remote_locations,
548-
);
549-
550-
self.dependencies
551-
.snapshot_store
552-
.add_snapshot(snapshot.clone())
553-
.await?;
554-
555-
Ok(snapshot)
556-
}
557-
558444
async fn create_and_save_artifact(
559445
&self,
560446
signed_entity_type: &SignedEntityType,
@@ -629,8 +515,6 @@ impl AggregatorRunnerTrait for AggregatorRunner {
629515
#[cfg(test)]
630516
pub mod tests {
631517
use crate::certifier_service::MockCertifierService;
632-
use crate::multi_signer::MockMultiSigner;
633-
use crate::snapshotter::OngoingSnapshot;
634518
use crate::{
635519
initialize_dependencies,
636520
runtime::{AggregatorRunner, AggregatorRunnerTrait},
@@ -639,16 +523,13 @@ pub mod tests {
639523
use mithril_common::chain_observer::FakeObserver;
640524
use mithril_common::digesters::DumbImmutableFileObserver;
641525
use mithril_common::entities::{
642-
Beacon, CertificatePending, Epoch, ProtocolMessage, SignedEntityType, StakeDistribution,
526+
Beacon, CertificatePending, Epoch, SignedEntityType, StakeDistribution,
643527
};
644528
use mithril_common::store::StakeStorer;
529+
use mithril_common::test_utils::fake_data;
645530
use mithril_common::test_utils::MithrilFixtureBuilder;
646-
use mithril_common::{entities::ProtocolMessagePartKey, test_utils::fake_data};
647531
use mithril_common::{BeaconProviderImpl, CardanoNetwork};
648-
use std::path::Path;
649532
use std::sync::Arc;
650-
use tempfile::NamedTempFile;
651-
use tokio::sync::RwLock;
652533

653534
#[tokio::test]
654535
async fn test_get_beacon_from_chain() {
@@ -939,55 +820,6 @@ pub mod tests {
939820
assert_eq!(None, maybe_saved_cert);
940821
}
941822

942-
#[tokio::test]
943-
async fn test_remove_snapshot_archive_after_upload() {
944-
let deps = initialize_dependencies().await;
945-
let runner = AggregatorRunner::new(Arc::new(deps));
946-
let file = NamedTempFile::new().unwrap();
947-
let file_path = file.path();
948-
let snapshot = OngoingSnapshot::new(file_path.to_path_buf(), 7331);
949-
950-
runner
951-
.upload_snapshot_archive(&snapshot)
952-
.await
953-
.expect("Snapshot upload should not fail");
954-
955-
assert!(
956-
!file_path.exists(),
957-
"Ongoing snapshot file should have been removed after upload"
958-
);
959-
}
960-
961-
#[tokio::test]
962-
async fn test_create_snapshot_archive_name_archive_after_beacon() {
963-
let beacon = Beacon::new("network".to_string(), 20, 145);
964-
let mut message = ProtocolMessage::new();
965-
message.set_message_part(
966-
ProtocolMessagePartKey::SnapshotDigest,
967-
"test+digest".to_string(),
968-
);
969-
let mock_multi_signer = MockMultiSigner::new();
970-
let mut deps = initialize_dependencies().await;
971-
deps.multi_signer = Arc::new(RwLock::new(mock_multi_signer));
972-
let runner = AggregatorRunner::new(Arc::new(deps));
973-
974-
let ongoing_snapshot = runner
975-
.create_snapshot_archive(&beacon, &message)
976-
.await
977-
.expect("create_snapshot_archive should not fail");
978-
979-
assert_eq!(
980-
Path::new(
981-
format!(
982-
"{}-e{}-i{}.{}.tar.gz",
983-
beacon.network, beacon.epoch.0, beacon.immutable_file_number, "test+digest"
984-
)
985-
.as_str()
986-
),
987-
ongoing_snapshot.get_file_path()
988-
);
989-
}
990-
991823
#[tokio::test]
992824
async fn test_update_era_checker() {
993825
let deps = initialize_dependencies().await;

0 commit comments

Comments
 (0)