Skip to content

Commit 6bcdac1

Browse files
authored
Merge pull request #534 from sevenlabs-hq/v1/filtering-refactor
[V1] Filtering Redesign
2 parents 7c92386 + 0d3a8ae commit 6bcdac1

File tree

8 files changed

+264
-266
lines changed

8 files changed

+264
-266
lines changed

crates/core/src/datasource.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ pub enum UpdateType {
4545
AccountUpdate,
4646
Transaction,
4747
AccountDeletion,
48+
BlockDetails,
49+
}
50+
51+
impl Update {
52+
pub fn update_type(&self) -> UpdateType {
53+
match self {
54+
Update::Account(_) => UpdateType::AccountUpdate,
55+
Update::Transaction(_) => UpdateType::Transaction,
56+
Update::AccountDeletion(_) => UpdateType::AccountDeletion,
57+
Update::BlockDetails(_) => UpdateType::BlockDetails,
58+
}
59+
}
4860
}
4961

5062
#[derive(Debug, Clone)]

crates/core/src/filter.rs

Lines changed: 162 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,48 +4,155 @@ use crate::{
44
instruction::{NestedInstruction, NestedInstructions},
55
transaction::TransactionMetadata,
66
};
7+
use solana_pubkey::Pubkey;
8+
use solana_signature::Signature;
9+
use std::collections::HashMap;
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::sync::{Arc, RwLock};
12+
use std::time::{Duration, Instant};
713

8-
pub trait Filter {
14+
#[derive(Debug, Clone, PartialEq, Eq)]
15+
pub struct FilterContext<'a> {
16+
pub datasource_id: &'a DatasourceId,
17+
}
18+
19+
#[derive(Debug, Clone, PartialEq, Eq)]
20+
pub enum FilterResult {
21+
Accept,
22+
Reject,
23+
}
24+
25+
pub trait Filter: Send + Sync {
926
fn filter_account(
1027
&self,
11-
_datasource_id: &DatasourceId,
28+
_context: &FilterContext,
1229
_account_metadata: &AccountMetadata,
1330
_account: &solana_account::Account,
14-
) -> bool {
15-
true
31+
) -> FilterResult {
32+
FilterResult::Accept
1633
}
1734

1835
fn filter_instruction(
1936
&self,
20-
_datasource_id: &DatasourceId,
37+
_context: &FilterContext,
2138
_nested_instruction: &NestedInstruction,
22-
) -> bool {
23-
true
39+
) -> FilterResult {
40+
FilterResult::Accept
2441
}
2542

2643
fn filter_transaction(
2744
&self,
28-
_datasource_id: &DatasourceId,
45+
_context: &FilterContext,
2946
_transaction_metadata: &TransactionMetadata,
3047
_nested_instructions: &NestedInstructions,
31-
) -> bool {
32-
true
48+
) -> FilterResult {
49+
FilterResult::Accept
3350
}
3451

3552
fn filter_account_deletion(
3653
&self,
37-
_datasource_id: &DatasourceId,
54+
_context: &FilterContext,
3855
_account_deletion: &AccountDeletion,
39-
) -> bool {
40-
true
56+
) -> FilterResult {
57+
FilterResult::Accept
4158
}
4259

4360
fn filter_block_details(
4461
&self,
45-
_datasource_id: &DatasourceId,
62+
_context: &FilterContext,
4663
_block_details: &BlockDetails,
47-
) -> bool {
48-
true
64+
) -> FilterResult {
65+
FilterResult::Accept
66+
}
67+
}
68+
69+
const DEDUP_CLEANUP_INTERVAL_SECS: u64 = 60;
70+
71+
type SeenInstructions = HashMap<(Signature, Vec<u8>), Instant>;
72+
73+
pub struct DeduplicationFilter {
74+
seen_instructions: Arc<RwLock<SeenInstructions>>,
75+
seen_accounts: Arc<RwLock<HashMap<(Signature, Pubkey), Instant>>>,
76+
ttl: Duration,
77+
creation: Instant,
78+
last_cleanup_secs: AtomicU64,
79+
}
80+
81+
impl DeduplicationFilter {
82+
pub fn new(ttl: Duration) -> Self {
83+
let creation = Instant::now();
84+
Self {
85+
seen_instructions: Arc::new(RwLock::new(HashMap::new())),
86+
seen_accounts: Arc::new(RwLock::new(HashMap::new())),
87+
ttl,
88+
creation,
89+
last_cleanup_secs: AtomicU64::new(0),
90+
}
91+
}
92+
93+
pub fn cleanup_expired(&self) {
94+
let cutoff = Instant::now() - self.ttl;
95+
if let Ok(mut seen) = self.seen_instructions.write() {
96+
seen.retain(|_, t| *t > cutoff);
97+
}
98+
if let Ok(mut seen) = self.seen_accounts.write() {
99+
seen.retain(|_, t| *t > cutoff);
100+
}
101+
self.last_cleanup_secs
102+
.store(self.creation.elapsed().as_secs(), Ordering::Relaxed);
103+
}
104+
105+
#[inline(always)]
106+
fn cleanup_if_needed(&self) {
107+
let elapsed_secs = self.creation.elapsed().as_secs();
108+
let last = self.last_cleanup_secs.load(Ordering::Relaxed);
109+
if elapsed_secs.saturating_sub(last) >= DEDUP_CLEANUP_INTERVAL_SECS {
110+
self.cleanup_expired();
111+
}
112+
}
113+
}
114+
115+
impl Filter for DeduplicationFilter {
116+
#[inline(always)]
117+
fn filter_instruction(
118+
&self,
119+
_context: &FilterContext,
120+
nested_instruction: &NestedInstruction,
121+
) -> FilterResult {
122+
self.cleanup_if_needed();
123+
let sig = nested_instruction.metadata.transaction_metadata.signature;
124+
let path = nested_instruction.metadata.absolute_path.clone();
125+
let key = (sig, path);
126+
let Ok(mut seen) = self.seen_instructions.write() else {
127+
return FilterResult::Accept;
128+
};
129+
if seen.insert(key, Instant::now()).is_none() {
130+
FilterResult::Accept
131+
} else {
132+
FilterResult::Reject
133+
}
134+
}
135+
136+
#[inline(always)]
137+
fn filter_account(
138+
&self,
139+
_context: &FilterContext,
140+
account_metadata: &AccountMetadata,
141+
_account: &solana_account::Account,
142+
) -> FilterResult {
143+
self.cleanup_if_needed();
144+
let Some(tx_sig) = account_metadata.transaction_signature else {
145+
return FilterResult::Accept;
146+
};
147+
let key = (tx_sig, account_metadata.pubkey);
148+
let Ok(mut seen) = self.seen_accounts.write() else {
149+
return FilterResult::Accept;
150+
};
151+
if seen.insert(key, Instant::now()).is_none() {
152+
FilterResult::Accept
153+
} else {
154+
FilterResult::Reject
155+
}
49156
}
50157
}
51158

@@ -65,48 +172,72 @@ impl DatasourceFilter {
65172
allowed_datasources: datasource_ids,
66173
}
67174
}
175+
176+
fn allows(&self, datasource_id: &DatasourceId) -> bool {
177+
self.allowed_datasources.contains(datasource_id)
178+
}
68179
}
69180

70181
impl Filter for DatasourceFilter {
71182
fn filter_account(
72183
&self,
73-
datasource_id: &DatasourceId,
184+
context: &FilterContext,
74185
_account_metadata: &AccountMetadata,
75186
_account: &solana_account::Account,
76-
) -> bool {
77-
self.allowed_datasources.contains(datasource_id)
187+
) -> FilterResult {
188+
if self.allows(context.datasource_id) {
189+
FilterResult::Accept
190+
} else {
191+
FilterResult::Reject
192+
}
78193
}
79194

80195
fn filter_instruction(
81196
&self,
82-
datasource_id: &DatasourceId,
197+
context: &FilterContext,
83198
_nested_instruction: &NestedInstruction,
84-
) -> bool {
85-
self.allowed_datasources.contains(datasource_id)
199+
) -> FilterResult {
200+
if self.allows(context.datasource_id) {
201+
FilterResult::Accept
202+
} else {
203+
FilterResult::Reject
204+
}
86205
}
87206

88207
fn filter_transaction(
89208
&self,
90-
datasource_id: &DatasourceId,
209+
context: &FilterContext,
91210
_transaction_metadata: &TransactionMetadata,
92211
_nested_instructions: &NestedInstructions,
93-
) -> bool {
94-
self.allowed_datasources.contains(datasource_id)
212+
) -> FilterResult {
213+
if self.allows(context.datasource_id) {
214+
FilterResult::Accept
215+
} else {
216+
FilterResult::Reject
217+
}
95218
}
96219

97220
fn filter_account_deletion(
98221
&self,
99-
datasource_id: &DatasourceId,
222+
context: &FilterContext,
100223
_account_deletion: &AccountDeletion,
101-
) -> bool {
102-
self.allowed_datasources.contains(datasource_id)
224+
) -> FilterResult {
225+
if self.allows(context.datasource_id) {
226+
FilterResult::Accept
227+
} else {
228+
FilterResult::Reject
229+
}
103230
}
104231

105232
fn filter_block_details(
106233
&self,
107-
datasource_id: &DatasourceId,
234+
context: &FilterContext,
108235
_block_details: &BlockDetails,
109-
) -> bool {
110-
self.allowed_datasources.contains(datasource_id)
236+
) -> FilterResult {
237+
if self.allows(context.datasource_id) {
238+
FilterResult::Accept
239+
} else {
240+
FilterResult::Reject
241+
}
111242
}
112243
}

crates/core/src/instruction.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ use {
1010
},
1111
};
1212

13+
use crate::collection::InstructionDecoderCollection;
14+
15+
#[derive(Debug)]
16+
pub struct ParsedInstruction<T: InstructionDecoderCollection> {
17+
pub metadata: InstructionMetadata,
18+
pub instruction: T,
19+
pub inner_instructions: Vec<ParsedInstruction<T>>,
20+
}
21+
1322
#[derive(Debug, Clone)]
1423
pub struct InstructionMetadata {
1524
pub transaction_metadata: Arc<TransactionMetadata>,
@@ -210,10 +219,6 @@ where
210219
self.processor.process(&data).await?;
211220
}
212221

213-
for nested_inner_instruction in nested_instruction.inner_instructions.iter() {
214-
self.run(nested_inner_instruction).await?;
215-
}
216-
217222
Ok(())
218223
}
219224

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub mod pipeline;
1515
#[cfg(feature = "postgres")]
1616
pub mod postgres;
1717
pub mod processor;
18-
pub mod schema;
18+
1919
pub mod transaction;
2020
pub mod transformers;
2121

0 commit comments

Comments
 (0)