Skip to content

Commit 1dc3dc5

Browse files
committed
refactor: make extractor return a specific type
This commit improves how we deal with our types and communication with tycho-client. The main objective here was to be able to detect any breaking change at compile time instead of runtime. We now deal with specific types to ensure serialization and deserialization are on the same Rust struct `dto::BlockChanges` and equivalent dto for `Command` and `Response`. We also removed the `NormalisedMessage` trait because it was not needed anymore.
1 parent f5c2917 commit 1dc3dc5

File tree

8 files changed

+266
-194
lines changed

8 files changed

+266
-194
lines changed

tycho-common/src/dto.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use utoipa::{IntoParams, ToSchema};
1818
use uuid::Uuid;
1919

2020
use crate::{
21-
models::{self, Address, ComponentId, StoreKey, StoreVal},
21+
models::{self, blockchain::BlockAggregatedChanges, Address, ComponentId, StoreKey, StoreVal},
2222
serde_primitives::{
2323
hex_bytes, hex_bytes_option, hex_hashmap_key, hex_hashmap_key_value, hex_hashmap_value,
2424
},
@@ -145,7 +145,7 @@ pub enum Command {
145145
}
146146

147147
/// A response sent from the server to the client
148-
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
148+
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
149149
#[serde(tag = "method", rename_all = "lowercase")]
150150
pub enum Response {
151151
NewSubscription { extractor_id: ExtractorIdentity, subscription_id: Uuid },
@@ -154,7 +154,7 @@ pub enum Response {
154154

155155
/// A message sent from the server to the client
156156
#[allow(clippy::large_enum_variant)]
157-
#[derive(Serialize, Deserialize, Debug)]
157+
#[derive(Serialize, Deserialize, Debug, Display, Clone)]
158158
#[serde(untagged)]
159159
pub enum WebSocketMessage {
160160
BlockChanges { subscription_id: Uuid, deltas: BlockChanges },
@@ -365,6 +365,122 @@ impl BlockChanges {
365365
pub fn n_changes(&self) -> usize {
366366
self.account_updates.len() + self.state_updates.len()
367367
}
368+
369+
pub fn drop_state(&self) -> Self {
370+
Self {
371+
extractor: self.extractor.clone(),
372+
chain: self.chain,
373+
block: self.block.clone(),
374+
finalized_block_height: self.finalized_block_height,
375+
revert: self.revert,
376+
new_tokens: self.new_tokens.clone(),
377+
account_updates: HashMap::new(),
378+
state_updates: HashMap::new(),
379+
new_protocol_components: self.new_protocol_components.clone(),
380+
deleted_protocol_components: self.deleted_protocol_components.clone(),
381+
component_balances: self.component_balances.clone(),
382+
account_balances: self.account_balances.clone(),
383+
component_tvl: self.component_tvl.clone(),
384+
dci_update: self.dci_update.clone(),
385+
}
386+
}
387+
}
388+
389+
impl From<models::blockchain::Block> for Block {
390+
fn from(value: models::blockchain::Block) -> Self {
391+
Self {
392+
number: value.number,
393+
hash: value.hash,
394+
parent_hash: value.parent_hash,
395+
chain: value.chain.into(),
396+
ts: value.ts,
397+
}
398+
}
399+
}
400+
401+
impl From<models::protocol::ComponentBalance> for ComponentBalance {
402+
fn from(value: models::protocol::ComponentBalance) -> Self {
403+
Self {
404+
token: value.token,
405+
balance: value.balance,
406+
balance_float: value.balance_float,
407+
modify_tx: value.modify_tx,
408+
component_id: value.component_id,
409+
}
410+
}
411+
}
412+
413+
impl From<models::contract::AccountBalance> for AccountBalance {
414+
fn from(value: models::contract::AccountBalance) -> Self {
415+
Self {
416+
account: value.account,
417+
token: value.token,
418+
balance: value.balance,
419+
modify_tx: value.modify_tx,
420+
}
421+
}
422+
}
423+
424+
impl From<BlockAggregatedChanges> for BlockChanges {
425+
fn from(value: BlockAggregatedChanges) -> Self {
426+
Self {
427+
extractor: value.extractor,
428+
chain: value.chain.into(),
429+
block: value.block.into(),
430+
finalized_block_height: value.finalized_block_height,
431+
revert: value.revert,
432+
account_updates: value
433+
.account_deltas
434+
.into_iter()
435+
.map(|(k, v)| (k, v.into()))
436+
.collect(),
437+
state_updates: value
438+
.state_deltas
439+
.into_iter()
440+
.map(|(k, v)| (k, v.into()))
441+
.collect(),
442+
new_protocol_components: value
443+
.new_protocol_components
444+
.into_iter()
445+
.map(|(k, v)| (k, v.into()))
446+
.collect(),
447+
deleted_protocol_components: value
448+
.deleted_protocol_components
449+
.into_iter()
450+
.map(|(k, v)| (k, v.into()))
451+
.collect(),
452+
component_balances: value
453+
.component_balances
454+
.into_iter()
455+
.map(|(component_id, v)| {
456+
let balances: HashMap<Bytes, ComponentBalance> = v
457+
.into_iter()
458+
.map(|(k, v)| (k, ComponentBalance::from(v)))
459+
.collect();
460+
(component_id, balances.into())
461+
})
462+
.collect(),
463+
account_balances: value
464+
.account_balances
465+
.into_iter()
466+
.map(|(k, v)| {
467+
(
468+
k,
469+
v.into_iter()
470+
.map(|(k, v)| (k, v.into()))
471+
.collect(),
472+
)
473+
})
474+
.collect(),
475+
dci_update: value.dci_update.into(),
476+
new_tokens: value
477+
.new_tokens
478+
.into_iter()
479+
.map(|(k, v)| (k, v.into()))
480+
.collect(),
481+
component_tvl: value.component_tvl,
482+
}
483+
}
368484
}
369485

370486
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug, ToSchema)]
@@ -1338,6 +1454,42 @@ pub struct DCIUpdate {
13381454
pub trace_results: HashMap<String, TracingResult>,
13391455
}
13401456

1457+
impl From<models::blockchain::DCIUpdate> for DCIUpdate {
1458+
fn from(value: models::blockchain::DCIUpdate) -> Self {
1459+
Self {
1460+
new_entrypoints: value
1461+
.new_entrypoints
1462+
.into_iter()
1463+
.map(|(k, v)| {
1464+
(
1465+
k,
1466+
v.into_iter()
1467+
.map(|v| v.into())
1468+
.collect(),
1469+
)
1470+
})
1471+
.collect(),
1472+
new_entrypoint_params: value
1473+
.new_entrypoint_params
1474+
.into_iter()
1475+
.map(|(k, v)| {
1476+
(
1477+
k,
1478+
v.into_iter()
1479+
.map(|(params, i)| (params.into(), i))
1480+
.collect(),
1481+
)
1482+
})
1483+
.collect(),
1484+
trace_results: value
1485+
.trace_results
1486+
.into_iter()
1487+
.map(|(k, v)| (k, v.into()))
1488+
.collect(),
1489+
}
1490+
}
1491+
}
1492+
13411493
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, ToSchema, Eq, Hash, Clone)]
13421494
#[serde(deny_unknown_fields)]
13431495
pub struct ComponentTvlRequestBody {

tycho-common/src/models/blockchain.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::{
2-
any::Any,
3-
collections::{hash_map::Entry, HashMap, HashSet},
4-
sync::Arc,
5-
};
1+
use std::collections::{hash_map::Entry, HashMap, HashSet};
62

73
use chrono::NaiveDateTime;
84
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@@ -16,8 +12,7 @@ use crate::{
1612
ComponentBalance, ProtocolChangesWithTx, ProtocolComponent, ProtocolComponentStateDelta,
1713
},
1814
token::Token,
19-
Address, BlockHash, Chain, ComponentId, EntryPointId, ExtractorIdentity, MergeError,
20-
NormalisedMessage, StoreKey,
15+
Address, BlockHash, Chain, ComponentId, EntryPointId, MergeError, StoreKey,
2116
},
2217
Bytes,
2318
};
@@ -136,14 +131,9 @@ impl std::fmt::Display for BlockAggregatedChanges {
136131
}
137132
}
138133

139-
#[typetag::serde]
140-
impl NormalisedMessage for BlockAggregatedChanges {
141-
fn source(&self) -> ExtractorIdentity {
142-
ExtractorIdentity::new(self.chain, &self.extractor)
143-
}
144-
145-
fn drop_state(&self) -> Arc<dyn NormalisedMessage> {
146-
Arc::new(Self {
134+
impl BlockAggregatedChanges {
135+
pub fn drop_state(&self) -> Self {
136+
Self {
147137
extractor: self.extractor.clone(),
148138
chain: self.chain,
149139
block: self.block.clone(),
@@ -158,11 +148,7 @@ impl NormalisedMessage for BlockAggregatedChanges {
158148
account_balances: self.account_balances.clone(),
159149
component_tvl: self.component_tvl.clone(),
160150
dci_update: self.dci_update.clone(),
161-
})
162-
}
163-
164-
fn as_any(&self) -> &dyn Any {
165-
self
151+
}
166152
}
167153
}
168154

@@ -176,6 +162,18 @@ impl BlockScoped for BlockAggregatedChanges {
176162
}
177163
}
178164

165+
impl From<dto::Block> for Block {
166+
fn from(value: dto::Block) -> Self {
167+
Self {
168+
number: value.number,
169+
chain: value.chain.into(),
170+
hash: value.hash,
171+
parent_hash: value.parent_hash,
172+
ts: value.ts,
173+
}
174+
}
175+
}
176+
179177
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
180178
pub struct DCIUpdate {
181179
pub new_entrypoints: HashMap<ComponentId, HashSet<EntryPoint>>,

tycho-common/src/models/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod contract;
33
pub mod protocol;
44
pub mod token;
55

6-
use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc};
6+
use std::{collections::HashMap, fmt::Display, str::FromStr};
77

88
use serde::{Deserialize, Serialize};
99
use strum_macros::{Display, EnumString};
@@ -173,6 +173,18 @@ impl std::fmt::Display for ExtractorIdentity {
173173
}
174174
}
175175

176+
impl From<ExtractorIdentity> for dto::ExtractorIdentity {
177+
fn from(value: ExtractorIdentity) -> Self {
178+
dto::ExtractorIdentity { chain: value.chain.into(), name: value.name }
179+
}
180+
}
181+
182+
impl From<dto::ExtractorIdentity> for ExtractorIdentity {
183+
fn from(value: dto::ExtractorIdentity) -> Self {
184+
Self { chain: value.chain.into(), name: value.name }
185+
}
186+
}
187+
176188
#[derive(Debug, PartialEq, Clone)]
177189
pub struct ExtractionState {
178190
pub name: String,
@@ -200,18 +212,6 @@ impl ExtractionState {
200212
}
201213
}
202214

203-
// TODO: replace with types from dto on extractor
204-
#[typetag::serde(tag = "type")]
205-
pub trait NormalisedMessage:
206-
std::any::Any + std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
207-
{
208-
fn source(&self) -> ExtractorIdentity;
209-
210-
fn drop_state(&self) -> Arc<dyn NormalisedMessage>;
211-
212-
fn as_any(&self) -> &dyn std::any::Any;
213-
}
214-
215215
#[derive(PartialEq, Debug, Clone, Default, Deserialize, Serialize)]
216216
pub enum ImplementationType {
217217
#[default]

tycho-indexer/src/extractor/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use prost::DecodeError;
66
use thiserror::Error;
77
use tycho_common::{
88
models::{
9-
blockchain::{Block, BlockScoped},
9+
blockchain::{Block, BlockAggregatedChanges, BlockScoped},
1010
contract::AccountBalance,
1111
protocol::ComponentBalance,
12-
Address, BlockHash, ExtractorIdentity, MergeError, NormalisedMessage,
12+
Address, BlockHash, ExtractorIdentity, MergeError,
1313
},
1414
storage::StorageError,
1515
Bytes,
@@ -77,7 +77,7 @@ pub enum RPCError {
7777
RequestError(String),
7878
}
7979

80-
pub type ExtractorMsg = Arc<dyn NormalisedMessage>;
80+
pub type ExtractorMsg = Arc<BlockAggregatedChanges>;
8181

8282
#[automock]
8383
#[async_trait]

tycho-indexer/src/extractor/protocol_extractor.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2876,10 +2876,6 @@ mod test_serial_db {
28762876
.unwrap();
28772877

28782878

2879-
let res = client_msg
2880-
.as_any()
2881-
.downcast_ref::<BlockAggregatedChanges>()
2882-
.expect("not good type");
28832879
let base_ts = db_fixtures::yesterday_midnight().timestamp();
28842880
let block_entity_changes_result = BlockAggregatedChanges {
28852881
extractor: "native_name".to_string(),
@@ -2959,8 +2955,8 @@ mod test_serial_db {
29592955
};
29602956

29612957
assert_eq!(
2962-
res,
2963-
&block_entity_changes_result
2958+
*client_msg,
2959+
block_entity_changes_result
29642960
);
29652961
})
29662962
.await;
@@ -3057,10 +3053,6 @@ mod test_serial_db {
30573053
.unwrap()
30583054
.unwrap();
30593055

3060-
let res = client_msg
3061-
.as_any()
3062-
.downcast_ref::<BlockAggregatedChanges>()
3063-
.expect("not good type");
30643056

30653057
let base_ts = db_fixtures::yesterday_midnight().timestamp();
30663058
let account1 = Bytes::from_str("0000000000000000000000000000000000000001").unwrap();
@@ -3165,8 +3157,8 @@ mod test_serial_db {
31653157
};
31663158

31673159
assert_eq!(
3168-
res,
3169-
&block_account_expected
3160+
*client_msg,
3161+
block_account_expected
31703162
);
31713163
})
31723164
.await;

0 commit comments

Comments
 (0)