Skip to content

Commit 1e0f570

Browse files
authored
refactor: make extractor return a specific type (#634)
2 parents f5c2917 + 8fe612b commit 1e0f570

File tree

10 files changed

+269
-199
lines changed

10 files changed

+269
-199
lines changed

tycho-client/src/feed/component_tracker.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ where
272272
}
273273

274274
/// Updates the tracked entrypoints and contracts based on the given DCI data.
275-
pub fn process_entrypoints(&mut self, dci_update: &DCIUpdate) -> Result<(), RPCError> {
275+
pub fn process_entrypoints(&mut self, dci_update: &DCIUpdate) {
276276
// Update detected contracts for entrypoints
277277
for (entrypoint, traces) in &dci_update.trace_results {
278278
self.entrypoints
@@ -303,8 +303,6 @@ where
303303
}
304304
}
305305
}
306-
307-
Ok(())
308306
}
309307

310308
/// Get related contracts for the given component ids. Assumes that the components are already

tycho-client/src/feed/synchronizer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ where
288288
4,
289289
)
290290
.await?;
291-
tracked_components.process_entrypoints(&result.clone().into())?;
291+
tracked_components.process_entrypoints(&result.clone().into());
292292
Some(result)
293293
} else {
294294
None
@@ -521,7 +521,7 @@ where
521521
};
522522

523523
// 3. Update entrypoints on the tracker (affects which contracts are tracked)
524-
tracker.process_entrypoints(&deltas.dci_update)?;
524+
tracker.process_entrypoints(&deltas.dci_update);
525525

526526
// 4. Filter deltas by currently tracked components / contracts
527527
self.filter_deltas(&mut deltas, &tracker);

tycho-common/src/dto.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use utoipa::{IntoParams, ToSchema};
1818
use uuid::Uuid;
1919

2020
use crate::{
21-
models::{self, Address, ComponentId, StoreKey, StoreVal},
21+
models::{self, blockchain::BlockAggregatedChanges, Address, ComponentId, StoreKey, StoreVal},
2222
serde_primitives::{
2323
hex_bytes, hex_bytes_option, hex_hashmap_key, hex_hashmap_key_value, hex_hashmap_value,
2424
},
@@ -145,7 +145,7 @@ pub enum Command {
145145
}
146146

147147
/// A response sent from the server to the client
148-
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
148+
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
149149
#[serde(tag = "method", rename_all = "lowercase")]
150150
pub enum Response {
151151
NewSubscription { extractor_id: ExtractorIdentity, subscription_id: Uuid },
@@ -154,7 +154,7 @@ pub enum Response {
154154

155155
/// A message sent from the server to the client
156156
#[allow(clippy::large_enum_variant)]
157-
#[derive(Serialize, Deserialize, Debug)]
157+
#[derive(Serialize, Deserialize, Debug, Display, Clone)]
158158
#[serde(untagged)]
159159
pub enum WebSocketMessage {
160160
BlockChanges { subscription_id: Uuid, deltas: BlockChanges },
@@ -365,6 +365,122 @@ impl BlockChanges {
365365
pub fn n_changes(&self) -> usize {
366366
self.account_updates.len() + self.state_updates.len()
367367
}
368+
369+
pub fn drop_state(&self) -> Self {
370+
Self {
371+
extractor: self.extractor.clone(),
372+
chain: self.chain,
373+
block: self.block.clone(),
374+
finalized_block_height: self.finalized_block_height,
375+
revert: self.revert,
376+
new_tokens: self.new_tokens.clone(),
377+
account_updates: HashMap::new(),
378+
state_updates: HashMap::new(),
379+
new_protocol_components: self.new_protocol_components.clone(),
380+
deleted_protocol_components: self.deleted_protocol_components.clone(),
381+
component_balances: self.component_balances.clone(),
382+
account_balances: self.account_balances.clone(),
383+
component_tvl: self.component_tvl.clone(),
384+
dci_update: self.dci_update.clone(),
385+
}
386+
}
387+
}
388+
389+
impl From<models::blockchain::Block> for Block {
390+
fn from(value: models::blockchain::Block) -> Self {
391+
Self {
392+
number: value.number,
393+
hash: value.hash,
394+
parent_hash: value.parent_hash,
395+
chain: value.chain.into(),
396+
ts: value.ts,
397+
}
398+
}
399+
}
400+
401+
impl From<models::protocol::ComponentBalance> for ComponentBalance {
402+
fn from(value: models::protocol::ComponentBalance) -> Self {
403+
Self {
404+
token: value.token,
405+
balance: value.balance,
406+
balance_float: value.balance_float,
407+
modify_tx: value.modify_tx,
408+
component_id: value.component_id,
409+
}
410+
}
411+
}
412+
413+
impl From<models::contract::AccountBalance> for AccountBalance {
414+
fn from(value: models::contract::AccountBalance) -> Self {
415+
Self {
416+
account: value.account,
417+
token: value.token,
418+
balance: value.balance,
419+
modify_tx: value.modify_tx,
420+
}
421+
}
422+
}
423+
424+
impl From<BlockAggregatedChanges> for BlockChanges {
425+
fn from(value: BlockAggregatedChanges) -> Self {
426+
Self {
427+
extractor: value.extractor,
428+
chain: value.chain.into(),
429+
block: value.block.into(),
430+
finalized_block_height: value.finalized_block_height,
431+
revert: value.revert,
432+
account_updates: value
433+
.account_deltas
434+
.into_iter()
435+
.map(|(k, v)| (k, v.into()))
436+
.collect(),
437+
state_updates: value
438+
.state_deltas
439+
.into_iter()
440+
.map(|(k, v)| (k, v.into()))
441+
.collect(),
442+
new_protocol_components: value
443+
.new_protocol_components
444+
.into_iter()
445+
.map(|(k, v)| (k, v.into()))
446+
.collect(),
447+
deleted_protocol_components: value
448+
.deleted_protocol_components
449+
.into_iter()
450+
.map(|(k, v)| (k, v.into()))
451+
.collect(),
452+
component_balances: value
453+
.component_balances
454+
.into_iter()
455+
.map(|(component_id, v)| {
456+
let balances: HashMap<Bytes, ComponentBalance> = v
457+
.into_iter()
458+
.map(|(k, v)| (k, ComponentBalance::from(v)))
459+
.collect();
460+
(component_id, balances.into())
461+
})
462+
.collect(),
463+
account_balances: value
464+
.account_balances
465+
.into_iter()
466+
.map(|(k, v)| {
467+
(
468+
k,
469+
v.into_iter()
470+
.map(|(k, v)| (k, v.into()))
471+
.collect(),
472+
)
473+
})
474+
.collect(),
475+
dci_update: value.dci_update.into(),
476+
new_tokens: value
477+
.new_tokens
478+
.into_iter()
479+
.map(|(k, v)| (k, v.into()))
480+
.collect(),
481+
component_tvl: value.component_tvl,
482+
}
483+
}
368484
}
369485

370486
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug, ToSchema)]
@@ -1338,6 +1454,42 @@ pub struct DCIUpdate {
13381454
pub trace_results: HashMap<String, TracingResult>,
13391455
}
13401456

1457+
impl From<models::blockchain::DCIUpdate> for DCIUpdate {
1458+
fn from(value: models::blockchain::DCIUpdate) -> Self {
1459+
Self {
1460+
new_entrypoints: value
1461+
.new_entrypoints
1462+
.into_iter()
1463+
.map(|(k, v)| {
1464+
(
1465+
k,
1466+
v.into_iter()
1467+
.map(|v| v.into())
1468+
.collect(),
1469+
)
1470+
})
1471+
.collect(),
1472+
new_entrypoint_params: value
1473+
.new_entrypoint_params
1474+
.into_iter()
1475+
.map(|(k, v)| {
1476+
(
1477+
k,
1478+
v.into_iter()
1479+
.map(|(params, i)| (params.into(), i))
1480+
.collect(),
1481+
)
1482+
})
1483+
.collect(),
1484+
trace_results: value
1485+
.trace_results
1486+
.into_iter()
1487+
.map(|(k, v)| (k, v.into()))
1488+
.collect(),
1489+
}
1490+
}
1491+
}
1492+
13411493
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, ToSchema, Eq, Hash, Clone)]
13421494
#[serde(deny_unknown_fields)]
13431495
pub struct ComponentTvlRequestBody {

tycho-common/src/models/blockchain.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::{
2-
any::Any,
3-
collections::{hash_map::Entry, HashMap, HashSet},
4-
sync::Arc,
5-
};
1+
use std::collections::{hash_map::Entry, HashMap, HashSet};
62

73
use chrono::NaiveDateTime;
84
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@@ -16,8 +12,7 @@ use crate::{
1612
ComponentBalance, ProtocolChangesWithTx, ProtocolComponent, ProtocolComponentStateDelta,
1713
},
1814
token::Token,
19-
Address, BlockHash, Chain, ComponentId, EntryPointId, ExtractorIdentity, MergeError,
20-
NormalisedMessage, StoreKey,
15+
Address, BlockHash, Chain, ComponentId, EntryPointId, MergeError, StoreKey,
2116
},
2217
Bytes,
2318
};
@@ -136,14 +131,9 @@ impl std::fmt::Display for BlockAggregatedChanges {
136131
}
137132
}
138133

139-
#[typetag::serde]
140-
impl NormalisedMessage for BlockAggregatedChanges {
141-
fn source(&self) -> ExtractorIdentity {
142-
ExtractorIdentity::new(self.chain, &self.extractor)
143-
}
144-
145-
fn drop_state(&self) -> Arc<dyn NormalisedMessage> {
146-
Arc::new(Self {
134+
impl BlockAggregatedChanges {
135+
pub fn drop_state(&self) -> Self {
136+
Self {
147137
extractor: self.extractor.clone(),
148138
chain: self.chain,
149139
block: self.block.clone(),
@@ -158,11 +148,7 @@ impl NormalisedMessage for BlockAggregatedChanges {
158148
account_balances: self.account_balances.clone(),
159149
component_tvl: self.component_tvl.clone(),
160150
dci_update: self.dci_update.clone(),
161-
})
162-
}
163-
164-
fn as_any(&self) -> &dyn Any {
165-
self
151+
}
166152
}
167153
}
168154

@@ -176,6 +162,18 @@ impl BlockScoped for BlockAggregatedChanges {
176162
}
177163
}
178164

165+
impl From<dto::Block> for Block {
166+
fn from(value: dto::Block) -> Self {
167+
Self {
168+
number: value.number,
169+
chain: value.chain.into(),
170+
hash: value.hash,
171+
parent_hash: value.parent_hash,
172+
ts: value.ts,
173+
}
174+
}
175+
}
176+
179177
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
180178
pub struct DCIUpdate {
181179
pub new_entrypoints: HashMap<ComponentId, HashSet<EntryPoint>>,

tycho-common/src/models/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod contract;
33
pub mod protocol;
44
pub mod token;
55

6-
use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc};
6+
use std::{collections::HashMap, fmt::Display, str::FromStr};
77

88
use serde::{Deserialize, Serialize};
99
use strum_macros::{Display, EnumString};
@@ -173,6 +173,18 @@ impl std::fmt::Display for ExtractorIdentity {
173173
}
174174
}
175175

176+
impl From<ExtractorIdentity> for dto::ExtractorIdentity {
177+
fn from(value: ExtractorIdentity) -> Self {
178+
dto::ExtractorIdentity { chain: value.chain.into(), name: value.name }
179+
}
180+
}
181+
182+
impl From<dto::ExtractorIdentity> for ExtractorIdentity {
183+
fn from(value: dto::ExtractorIdentity) -> Self {
184+
Self { chain: value.chain.into(), name: value.name }
185+
}
186+
}
187+
176188
#[derive(Debug, PartialEq, Clone)]
177189
pub struct ExtractionState {
178190
pub name: String,
@@ -200,18 +212,6 @@ impl ExtractionState {
200212
}
201213
}
202214

203-
// TODO: replace with types from dto on extractor
204-
#[typetag::serde(tag = "type")]
205-
pub trait NormalisedMessage:
206-
std::any::Any + std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
207-
{
208-
fn source(&self) -> ExtractorIdentity;
209-
210-
fn drop_state(&self) -> Arc<dyn NormalisedMessage>;
211-
212-
fn as_any(&self) -> &dyn std::any::Any;
213-
}
214-
215215
#[derive(PartialEq, Debug, Clone, Default, Deserialize, Serialize)]
216216
pub enum ImplementationType {
217217
#[default]

tycho-indexer/src/extractor/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use prost::DecodeError;
66
use thiserror::Error;
77
use tycho_common::{
88
models::{
9-
blockchain::{Block, BlockScoped},
9+
blockchain::{Block, BlockAggregatedChanges, BlockScoped},
1010
contract::AccountBalance,
1111
protocol::ComponentBalance,
12-
Address, BlockHash, ExtractorIdentity, MergeError, NormalisedMessage,
12+
Address, BlockHash, ExtractorIdentity, MergeError,
1313
},
1414
storage::StorageError,
1515
Bytes,
@@ -77,7 +77,7 @@ pub enum RPCError {
7777
RequestError(String),
7878
}
7979

80-
pub type ExtractorMsg = Arc<dyn NormalisedMessage>;
80+
pub type ExtractorMsg = Arc<BlockAggregatedChanges>;
8181

8282
#[automock]
8383
#[async_trait]

0 commit comments

Comments
 (0)