@@ -3,16 +3,9 @@ use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value, ValueKind
3
3
use slog:: Level ;
4
4
use slog_scope:: { crit, debug, info} ;
5
5
use sqlite:: Connection ;
6
- use std:: {
7
- error:: Error ,
8
- ffi:: OsStr ,
9
- fs,
10
- net:: IpAddr ,
11
- path:: PathBuf ,
12
- sync:: { Arc , Mutex } ,
13
- } ;
6
+ use std:: { error:: Error , ffi:: OsStr , fs, net:: IpAddr , path:: PathBuf , sync:: Arc } ;
14
7
use tokio:: {
15
- sync:: { oneshot, RwLock } ,
8
+ sync:: { oneshot, Mutex , RwLock } ,
16
9
task:: JoinSet ,
17
10
time:: Duration ,
18
11
} ;
@@ -40,6 +33,7 @@ use crate::{
40
33
database:: provider:: StakePoolStore ,
41
34
event_store:: { self , TransmitterService } ,
42
35
http_server:: routes:: router,
36
+ stake_distribution_service:: MithrilStakeDistributionService ,
43
37
tools:: { EraTools , GenesisTools , GenesisToolsDependency } ,
44
38
AggregatorConfig , AggregatorRunner , AggregatorRuntime , CertificatePendingStore ,
45
39
CertificateStore , Configuration , DefaultConfiguration , DependencyManager , GenesisConfiguration ,
@@ -49,12 +43,10 @@ use crate::{
49
43
50
44
const SQLITE_MONITORING_FILE : & str = "monitoring.sqlite3" ;
51
45
52
- fn setup_genesis_dependencies (
46
+ async fn setup_genesis_dependencies (
53
47
config : & GenesisConfiguration ,
54
48
) -> Result < GenesisToolsDependency , Box < dyn std:: error:: Error > > {
55
49
let sqlite_db_path = Some ( config. get_sqlite_file ( ) ) ;
56
- check_database_migration ( config. get_sqlite_file ( ) ) ?;
57
-
58
50
let chain_observer = Arc :: new (
59
51
mithril_common:: chain_observer:: CardanoCliChainObserver :: new ( Box :: new (
60
52
CardanoCliRunner :: new (
@@ -70,6 +62,11 @@ fn setup_genesis_dependencies(
70
62
. map ( |path| path. as_os_str ( ) )
71
63
. unwrap_or ( OsStr :: new ( ":memory:" ) ) ,
72
64
) ?) ) ;
65
+
66
+ // DATABASE MIGRATION
67
+ check_database_migration ( sqlite_connection. clone ( ) ) . await ?;
68
+
69
+ let stake_store = Arc :: new ( StakePoolStore :: new ( sqlite_connection. clone ( ) ) ) ;
73
70
let immutable_file_observer = Arc :: new ( ImmutableFileSystemObserver :: new ( & config. db_directory ) ) ;
74
71
let beacon_provider = Arc :: new ( BeaconProviderImpl :: new (
75
72
chain_observer,
@@ -99,7 +96,6 @@ fn setup_genesis_dependencies(
99
96
) ?) ,
100
97
config. store_retention_limit ,
101
98
) ) ;
102
- let stake_store = Arc :: new ( StakePoolStore :: new ( sqlite_connection. clone ( ) ) ) ;
103
99
let single_signature_store = Arc :: new ( SingleSignatureStore :: new (
104
100
Box :: new ( SQLiteAdapter :: new ( "single_signature" , sqlite_connection) ?) ,
105
101
config. store_retention_limit ,
@@ -159,18 +155,23 @@ async fn do_first_launch_initialization_if_needed(
159
155
160
156
/// Database version checker.
161
157
/// This is the place where migrations are to be registered.
162
- fn check_database_migration ( sql_file_path : PathBuf ) -> Result < ( ) , Box < dyn Error > > {
158
+ pub async fn check_database_migration (
159
+ connection : Arc < Mutex < Connection > > ,
160
+ ) -> Result < ( ) , Box < dyn Error > > {
163
161
let mut db_checker = DatabaseVersionChecker :: new (
164
162
slog_scope:: logger ( ) ,
165
163
ApplicationNodeType :: Aggregator ,
166
- sql_file_path ,
164
+ connection ,
167
165
) ;
168
166
169
167
for migration in crate :: database:: migration:: get_migrations ( ) {
170
168
db_checker. add_migration ( migration) ;
171
169
}
172
170
173
- db_checker. apply ( ) . map_err ( |e| -> Box < dyn Error > { e } )
171
+ db_checker
172
+ . apply ( )
173
+ . await
174
+ . map_err ( |e| -> Box < dyn Error > { e } )
174
175
}
175
176
176
177
/// Mithril Aggregator Node
@@ -345,11 +346,14 @@ impl ServeCommand {
345
346
. try_deserialize ( )
346
347
. map_err ( |e| format ! ( "configuration deserialize error: {e}" ) ) ?;
347
348
debug ! ( "SERVE command" ; "config" => format!( "{config:?}" ) ) ;
348
- check_database_migration ( config. get_sqlite_file ( ) ) ?;
349
349
350
350
// Init dependencies
351
351
let sqlite_db_path = config. get_sqlite_file ( ) ;
352
352
let sqlite_connection = Arc :: new ( Mutex :: new ( Connection :: open ( sqlite_db_path) ?) ) ;
353
+
354
+ // DATABASE MIGRATION
355
+ check_database_migration ( sqlite_connection. clone ( ) ) . await ?;
356
+
353
357
let snapshot_store = Arc :: new ( LocalSnapshotStore :: new (
354
358
Box :: new ( SQLiteAdapter :: new ( "snapshot" , sqlite_connection. clone ( ) ) ?) ,
355
359
LIST_SNAPSHOTS_MAX_ITEMS ,
@@ -370,7 +374,6 @@ impl ServeCommand {
370
374
) ?) ,
371
375
config. store_retention_limit ,
372
376
) ) ;
373
- let stake_store = Arc :: new ( StakePoolStore :: new ( sqlite_connection. clone ( ) ) ) ;
374
377
let single_signature_store = Arc :: new ( SingleSignatureStore :: new (
375
378
Box :: new ( SQLiteAdapter :: new (
376
379
"single_signature" ,
@@ -381,7 +384,7 @@ impl ServeCommand {
381
384
let protocol_parameters_store = Arc :: new ( ProtocolParametersStore :: new (
382
385
Box :: new ( SQLiteAdapter :: new (
383
386
"protocol_parameters" ,
384
- sqlite_connection,
387
+ sqlite_connection. clone ( ) ,
385
388
) ?) ,
386
389
config. store_retention_limit ,
387
390
) ) ;
@@ -394,6 +397,11 @@ impl ServeCommand {
394
397
) ,
395
398
) ) ,
396
399
) ;
400
+ let stake_store = Arc :: new ( StakePoolStore :: new ( sqlite_connection. clone ( ) ) ) ;
401
+ let stake_distribution_service = Arc :: new ( MithrilStakeDistributionService :: new (
402
+ stake_store. clone ( ) ,
403
+ chain_observer. clone ( ) ,
404
+ ) ) ;
397
405
let immutable_file_observer =
398
406
Arc :: new ( ImmutableFileSystemObserver :: new ( & config. db_directory ) ) ;
399
407
let beacon_provider = Arc :: new ( BeaconProviderImpl :: new (
@@ -452,13 +460,14 @@ impl ServeCommand {
452
460
// Init dependency manager
453
461
let dependency_manager = DependencyManager {
454
462
config : config. clone ( ) ,
463
+ sqlite_connection,
464
+ stake_store,
455
465
snapshot_store : snapshot_store. clone ( ) ,
456
466
snapshot_uploader : snapshot_uploader. clone ( ) ,
457
467
multi_signer : multi_signer. clone ( ) ,
458
468
certificate_pending_store : certificate_pending_store. clone ( ) ,
459
469
certificate_store : certificate_store. clone ( ) ,
460
470
verification_key_store : verification_key_store. clone ( ) ,
461
- stake_store : stake_store. clone ( ) ,
462
471
single_signature_store : single_signature_store. clone ( ) ,
463
472
protocol_parameters_store : protocol_parameters_store. clone ( ) ,
464
473
chain_observer : chain_observer. clone ( ) ,
@@ -474,6 +483,7 @@ impl ServeCommand {
474
483
era_reader : era_reader. clone ( ) ,
475
484
event_transmitter,
476
485
api_version_provider,
486
+ stake_distribution_service,
477
487
} ;
478
488
let dependency_manager = Arc :: new ( dependency_manager) ;
479
489
@@ -633,7 +643,7 @@ impl ExportGenesisSubCommand {
633
643
"Genesis export payload to sign to {}" ,
634
644
self . target_path. display( )
635
645
) ;
636
- let dependencies = setup_genesis_dependencies ( & config) ?;
646
+ let dependencies = setup_genesis_dependencies ( & config) . await ?;
637
647
638
648
let genesis_tools = GenesisTools :: from_dependencies ( dependencies) . await ?;
639
649
genesis_tools. export_payload_to_sign ( & self . target_path )
@@ -662,7 +672,7 @@ impl ImportGenesisSubCommand {
662
672
"Genesis import signed payload from {}" ,
663
673
self . signed_payload_path. to_string_lossy( )
664
674
) ;
665
- let dependencies = setup_genesis_dependencies ( & config) ?;
675
+ let dependencies = setup_genesis_dependencies ( & config) . await ?;
666
676
667
677
let genesis_tools = GenesisTools :: from_dependencies ( dependencies) . await ?;
668
678
genesis_tools
@@ -690,7 +700,7 @@ impl BootstrapGenesisSubCommand {
690
700
. map_err ( |e| format ! ( "configuration deserialize error: {e}" ) ) ?;
691
701
debug ! ( "BOOTSTRAP GENESIS command" ; "config" => format!( "{config:?}" ) ) ;
692
702
println ! ( "Genesis bootstrap for test only!" ) ;
693
- let dependencies = setup_genesis_dependencies ( & config) ?;
703
+ let dependencies = setup_genesis_dependencies ( & config) . await ?;
694
704
695
705
let genesis_tools = GenesisTools :: from_dependencies ( dependencies) . await ?;
696
706
let genesis_secret_key = key_decode_hex ( & self . genesis_secret_key ) ?;
0 commit comments