Skip to content

Commit fefcf09

Browse files
committed
some fixes + diff data source
1 parent 7b74a01 commit fefcf09

File tree

16 files changed

+658
-24
lines changed

16 files changed

+658
-24
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ fork-tree = { version = "13.0.1" }
188188
ureq = { version = "3.1.2", default-features = false }
189189
url = { version = "2.5.7", default-features = false }
190190
blockfrost-openapi = { version = "0.1.75", default-features = false }
191+
chrono = { version = "0.4.31", default-features = false }
191192

192193
# substrate dependencies
193194
frame-benchmarking = { default-features = false, git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-stable2509" }

demo/node/src/data_sources.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ use sp_governed_map::GovernedMapDataSource;
99
use sp_partner_chains_bridge::TokenBridgeDataSource;
1010
use std::{error::Error, sync::Arc};
1111

12+
use crate::diff_sources;
13+
1214
pub const DATA_SOURCE_VAR: &str = "CARDANO_DATA_SOURCE";
1315

1416
#[derive(Clone, Debug, PartialEq)]
1517
pub enum DataSourceType {
1618
DbSync,
1719
Mock,
1820
Dolos,
21+
Diff,
1922
}
2023

2124
impl DataSourceType {
@@ -34,7 +37,7 @@ impl std::str::FromStr for DataSourceType {
3437
match s.to_lowercase().as_str() {
3538
"db-sync" => Ok(DataSourceType::DbSync),
3639
"mock" => Ok(DataSourceType::Mock),
37-
"dolos" => Ok(DataSourceType::Dolos),
40+
"dolos" => Ok(DataSourceType::Diff), // TODO restore this to dolos
3841
_ => {
3942
Err(format!("Invalid data source type: {}. Valid options: db-sync, mock, dolos", s))
4043
},
@@ -48,6 +51,7 @@ impl std::fmt::Display for DataSourceType {
4851
DataSourceType::DbSync => write!(f, "db-sync"),
4952
DataSourceType::Mock => write!(f, "mock"),
5053
DataSourceType::Dolos => write!(f, "dolos"),
54+
DataSourceType::Diff => write!(f, "diff"),
5155
}
5256
}
5357
}
@@ -84,6 +88,14 @@ pub(crate) async fn create_cached_data_sources(
8488
DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| {
8589
ServiceError::Application(format!("Failed to create dolos data sources: {err}").into())
8690
}),
91+
92+
DataSourceType::Diff => {
93+
diff_sources::create_diff_data_sources(metrics_opt).await.map_err(|err| {
94+
ServiceError::Application(
95+
format!("Failed to create dolos data sources: {err}").into(),
96+
)
97+
})
98+
},
8799
}
88100
}
89101

@@ -107,18 +119,22 @@ pub async fn create_dolos_data_sources(
107119
) -> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
108120
let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?;
109121
let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?;
110-
let block = Arc::new(
122+
let block_dbsync = Arc::new(
111123
partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone())
112124
.await?,
113125
);
126+
let block_dolos = Arc::new(
127+
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone())
128+
.await?,
129+
);
114130
Ok(DataSources {
115131
sidechain_rpc: Arc::new(
116132
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(
117133
dolos_client.clone(),
118134
),
119135
),
120136
mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new(
121-
dolos_client.clone(),
137+
block_dolos.clone(),
122138
)),
123139
authority_selection: Arc::new(
124140
partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new(
@@ -137,15 +153,15 @@ pub async fn create_dolos_data_sources(
137153
pool.clone(),
138154
metrics_opt.clone(),
139155
GOVERNED_MAP_CACHE_SIZE,
140-
block.clone(),
156+
block_dbsync.clone(),
141157
)
142158
.await?,
143159
),
144160
bridge: Arc::new(
145161
partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new(
146162
pool,
147163
metrics_opt,
148-
block,
164+
block_dbsync,
149165
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
150166
),
151167
),

demo/node/src/diff_sources.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
use authority_selection_inherents::AuthoritySelectionDataSource;
2+
use pallet_sidechain_rpc::SidechainRpcDataSource;
3+
use partner_chains_data_source_metrics::McFollowerMetrics;
4+
use sidechain_domain::*;
5+
use sidechain_mc_hash::McHashDataSource;
6+
use std::{error::Error, sync::Arc};
7+
8+
use crate::data_sources::*;
9+
10+
struct SidechainRpcDataSourceImplDiff {
11+
dolos: Arc<partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl>,
12+
dbsync: Arc<partner_chains_db_sync_data_sources::SidechainRpcDataSourceImpl>,
13+
}
14+
15+
#[async_trait::async_trait]
16+
impl SidechainRpcDataSource for SidechainRpcDataSourceImplDiff {
17+
async fn get_latest_block_info(
18+
&self,
19+
) -> Result<sidechain_domain::MainchainBlock, Box<dyn std::error::Error + Send + Sync>> {
20+
let reference = self.dbsync.get_latest_block_info().await?;
21+
let dolos_output = self.dolos.get_latest_block_info().await?;
22+
if reference != dolos_output {
23+
println!(
24+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> SidechainRpcDataSource::get_latest_block_info mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
25+
)
26+
}
27+
Ok(reference)
28+
}
29+
}
30+
31+
struct McHashDataSourceImplDiff {
32+
dolos: Arc<partner_chains_dolos_data_sources::McHashDataSourceImpl>,
33+
dbsync: Arc<partner_chains_db_sync_data_sources::McHashDataSourceImpl>,
34+
}
35+
36+
#[async_trait::async_trait]
37+
impl McHashDataSource for McHashDataSourceImplDiff {
38+
async fn get_latest_stable_block_for(
39+
&self,
40+
reference_timestamp: sp_timestamp::Timestamp,
41+
) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
42+
let reference = self.dbsync.get_latest_stable_block_for(reference_timestamp).await?;
43+
let dolos_output = self.dolos.get_latest_stable_block_for(reference_timestamp).await?;
44+
if reference != dolos_output {
45+
println!(
46+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_latest_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
47+
)
48+
}
49+
Ok(reference)
50+
}
51+
52+
async fn get_stable_block_for(
53+
&self,
54+
hash: McBlockHash,
55+
reference_timestamp: sp_timestamp::Timestamp,
56+
) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
57+
let reference = self.dbsync.get_stable_block_for(hash.clone(), reference_timestamp).await?;
58+
let dolos_output = self.dolos.get_stable_block_for(hash, reference_timestamp).await?;
59+
if reference != dolos_output {
60+
println!(
61+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
62+
)
63+
}
64+
Ok(reference)
65+
}
66+
67+
async fn get_block_by_hash(
68+
&self,
69+
hash: McBlockHash,
70+
) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
71+
let reference = self.dbsync.get_block_by_hash(hash.clone()).await?;
72+
let dolos_output = self.dolos.get_block_by_hash(hash).await?;
73+
if reference != dolos_output {
74+
println!(
75+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_block_by_hash mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
76+
)
77+
}
78+
Ok(reference)
79+
}
80+
}
81+
82+
struct AuthoritySelectionDataSourceImplDiff {
83+
dolos: Arc<partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl>,
84+
dbsync: Arc<partner_chains_db_sync_data_sources::CandidateDataSourceCached>,
85+
}
86+
87+
#[async_trait::async_trait]
88+
impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImplDiff {
89+
async fn get_ariadne_parameters(
90+
&self,
91+
epoch_number: McEpochNumber,
92+
d_parameter_policy: PolicyId,
93+
permissioned_candidate_policy: PolicyId,
94+
) -> Result<
95+
authority_selection_inherents::AriadneParameters,
96+
Box<dyn std::error::Error + Send + Sync>,
97+
> {
98+
let reference = self
99+
.dbsync
100+
.get_ariadne_parameters(
101+
epoch_number,
102+
d_parameter_policy.clone(),
103+
permissioned_candidate_policy.clone(),
104+
)
105+
.await?;
106+
let dolos_output = self
107+
.dolos
108+
.get_ariadne_parameters(epoch_number, d_parameter_policy, permissioned_candidate_policy)
109+
.await?;
110+
if reference != dolos_output {
111+
println!(
112+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_ariadne_parameters mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
113+
)
114+
}
115+
Ok(reference)
116+
}
117+
118+
async fn get_candidates(
119+
&self,
120+
epoch_number: McEpochNumber,
121+
committee_candidate_address: MainchainAddress,
122+
) -> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
123+
let reference = self
124+
.dbsync
125+
.get_candidates(epoch_number, committee_candidate_address.clone())
126+
.await?;
127+
let dolos_output =
128+
self.dolos.get_candidates(epoch_number, committee_candidate_address).await?;
129+
if reference != dolos_output {
130+
println!(
131+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_candidates mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
132+
)
133+
}
134+
Ok(reference)
135+
}
136+
137+
async fn get_epoch_nonce(
138+
&self,
139+
epoch_number: McEpochNumber,
140+
) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
141+
let reference = self.dbsync.get_epoch_nonce(epoch_number).await?;
142+
let dolos_output = self.dolos.get_epoch_nonce(epoch_number).await?;
143+
if reference != dolos_output {
144+
println!(
145+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_epoch_nonce mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
146+
)
147+
}
148+
Ok(reference)
149+
}
150+
151+
async fn data_epoch(
152+
&self,
153+
for_epoch: McEpochNumber,
154+
) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
155+
let reference = self.dbsync.data_epoch(for_epoch).await?;
156+
let dolos_output = self.dolos.data_epoch(for_epoch).await?;
157+
if reference != dolos_output {
158+
println!(
159+
">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::data_epoch mismatch: dbs: {reference:?} dolos: {dolos_output:?}"
160+
)
161+
}
162+
Ok(reference)
163+
}
164+
}
165+
166+
pub async fn create_diff_data_sources(
167+
metrics_opt: Option<McFollowerMetrics>,
168+
) -> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
169+
let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?;
170+
let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?;
171+
let block_dbsync = Arc::new(
172+
partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone())
173+
.await?,
174+
);
175+
let block_dolos = Arc::new(
176+
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone())
177+
.await?,
178+
);
179+
Ok(DataSources {
180+
sidechain_rpc: Arc::new(SidechainRpcDataSourceImplDiff {
181+
dolos: Arc::new(partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(
182+
dolos_client.clone(),
183+
)),
184+
dbsync: Arc::new(partner_chains_db_sync_data_sources::SidechainRpcDataSourceImpl::new(
185+
block_dbsync.clone(),
186+
metrics_opt.clone(),
187+
)),
188+
}),
189+
mc_hash: Arc::new(McHashDataSourceImplDiff {
190+
dolos: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new(
191+
block_dolos.clone(),
192+
)),
193+
dbsync: Arc::new(partner_chains_db_sync_data_sources::McHashDataSourceImpl::new(
194+
block_dbsync.clone(),
195+
metrics_opt.clone(),
196+
)),
197+
}),
198+
authority_selection: Arc::new(AuthoritySelectionDataSourceImplDiff {
199+
dolos: Arc::new(
200+
partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new(
201+
dolos_client.clone(),
202+
),
203+
),
204+
dbsync: Arc::new(
205+
partner_chains_db_sync_data_sources::CandidatesDataSourceImpl::new(
206+
pool.clone(),
207+
metrics_opt.clone(),
208+
)
209+
.await?
210+
.cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?,
211+
),
212+
}),
213+
block_participation: Arc::new(
214+
partner_chains_db_sync_data_sources::StakeDistributionDataSourceImpl::new(
215+
pool.clone(),
216+
metrics_opt.clone(),
217+
STAKE_CACHE_SIZE,
218+
),
219+
),
220+
governed_map: Arc::new(
221+
partner_chains_db_sync_data_sources::GovernedMapDataSourceCachedImpl::new(
222+
pool.clone(),
223+
metrics_opt.clone(),
224+
GOVERNED_MAP_CACHE_SIZE,
225+
block_dbsync.clone(),
226+
)
227+
.await?,
228+
),
229+
bridge: Arc::new(
230+
partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new(
231+
pool,
232+
metrics_opt,
233+
block_dbsync,
234+
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
235+
),
236+
),
237+
})
238+
}

demo/node/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub mod chain_spec;
55
mod data_sources;
6+
mod diff_sources;
67
mod inherent_data;
78
pub mod rpc;
89
pub mod service;

demo/node/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod chain_spec;
66
mod cli;
77
mod command;
88
mod data_sources;
9+
mod diff_sources;
910
mod inherent_data;
1011
mod rpc;
1112
mod service;

toolkit/data-sources/db-sync/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ sqlx = { workspace = true }
1616
db-sync-sqlx = { workspace = true }
1717
tokio = { workspace = true, features = ["full"] }
1818
futures = { workspace = true }
19-
chrono = "0.4.31"
19+
chrono = { workspace = true }
2020
hex = { workspace = true }
2121
hex-literal = { workspace = true }
2222
itertools = { workspace = true }

toolkit/data-sources/db-sync/src/candidates/cached.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub type ArcMut<T> = Arc<Mutex<T>>;
2020

2121
type AriadneParametersCacheKey = (McEpochNumber, PolicyId, PolicyId);
2222
type CandidatesCacheKey = (McEpochNumber, String);
23+
/// Cached candidate data source
2324
pub struct CandidateDataSourceCached {
2425
inner: CandidatesDataSourceImpl,
2526
get_ariadne_parameters_for_epoch_cache:
@@ -96,6 +97,7 @@ impl CandidateDataSourceCacheConfig {
9697
}
9798

9899
impl CandidateDataSourceCached {
100+
/// TODO
99101
pub fn new(
100102
inner: CandidatesDataSourceImpl,
101103
candidates_for_epoch_cache_size: usize,
@@ -114,6 +116,7 @@ impl CandidateDataSourceCached {
114116
}
115117
}
116118

119+
/// TODO
117120
pub fn new_from_env(
118121
inner: CandidatesDataSourceImpl,
119122
candidates_for_epoch_cache_size: usize,

0 commit comments

Comments
 (0)