Skip to content

Commit ce6a79d

Browse files
authored
feat(etl): add stream length checks to prevent overflow (#92)
1 parent bae5e43 commit ce6a79d

File tree

7 files changed

+50
-20
lines changed

7 files changed

+50
-20
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plerkle/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "plerkle"
33
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
4-
version = "1.10.0"
4+
version = "1.11.0"
55
authors = ["Metaplex Developers <dev@metaplex.com>"]
66
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
77
license = "AGPL-3.0"

plerkle_messenger/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "plerkle_messenger"
33
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
4-
version = "1.10.0"
4+
version = "1.11.0"
55
authors = ["Metaplex Developers <dev@metaplex.com>"]
66
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
77
license = "AGPL-3.0"

plerkle_messenger/src/redis/redis_messenger.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ impl RedisMessenger {
245245
}
246246
Ok(())
247247
}
248+
249+
pub async fn stream_len(&mut self, stream_key: &'static str) -> Result<u64, MessengerError> {
250+
Ok(self.connection.xlen(stream_key).await.map_err(|e| {
251+
error!("Failed to read stream length: {}", e);
252+
MessengerError::ConnectionError { msg: e.to_string() }
253+
})?)
254+
}
248255
}
249256

250257
#[async_trait]

plerkle_snapshot/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
# Renamed from original "solana-snapshot-etl"
33
name = "plerkle_snapshot"
4-
version = "0.5.0"
4+
version = "0.6.0"
55
edition = "2021"
66
license = "Apache-2.0"
77
documentation = "https://docs.rs/solana-snapshot-etl"

plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,29 @@ use plerkle_serialization::serializer::serialize_account;
99
use plerkle_snapshot::append_vec::StoredMeta;
1010
use solana_sdk::account::{Account, AccountSharedData, ReadableAccount};
1111
use std::error::Error;
12-
use std::sync::atomic::AtomicU64;
13-
use std::sync::atomic::Ordering;
1412
use std::sync::Arc;
1513
use tokio::sync::Mutex;
1614

1715
use crate::accounts_selector::AccountsSelector;
1816

1917
const ACCOUNT_STREAM_KEY: &str = "ACC";
18+
// the upper limit of accounts stream length for when the snapshot is in progress
19+
const MAX_INTERMEDIATE_STREAM_LEN: u64 = 50_000_000;
20+
// every PROCESSED_CHECKPOINT we check the stream length and reset the local stream_counter
21+
const PROCESSED_CHECKPOINT: u64 = 20_000_000;
2022

2123
#[derive(Clone)]
2224
pub(crate) struct GeyserDumper {
2325
messenger: Arc<Mutex<RedisMessenger>>,
2426
throttle_nanos: u64,
2527
accounts_selector: AccountsSelector,
2628
pub accounts_spinner: ProgressBar,
27-
pub accounts_count: Arc<AtomicU64>,
29+
/// how many accounts were processed in total during the snapshot run.
30+
pub accounts_count: u64,
31+
/// intermediate counter of accounts sent to regulate XLEN checks.
32+
/// the reason for a separate field is that we initialize it as the current
33+
/// stream length, which might be non-zero.
34+
pub stream_counter: u64,
2835
}
2936

3037
impl GeyserDumper {
@@ -61,13 +68,18 @@ impl GeyserDumper {
6168
messenger
6269
.set_buffer_size(ACCOUNT_STREAM_KEY, 100_000_000)
6370
.await;
71+
let initial_stream_len = messenger
72+
.stream_len(&ACCOUNT_STREAM_KEY)
73+
.await
74+
.expect("get initial stream len of accounts");
6475

6576
Self {
6677
messenger: Arc::new(Mutex::new(messenger)),
6778
accounts_spinner,
6879
accounts_selector,
69-
accounts_count: Arc::new(AtomicU64::new(0)),
80+
accounts_count: 0,
7081
throttle_nanos,
82+
stream_counter: initial_stream_len,
7183
}
7284
}
7385

@@ -76,6 +88,21 @@ impl GeyserDumper {
7688
(meta, account): (StoredMeta, AccountSharedData),
7789
slot: u64,
7890
) -> Result<(), Box<dyn Error>> {
91+
if self.stream_counter >= PROCESSED_CHECKPOINT {
92+
loop {
93+
let stream_len = self
94+
.messenger
95+
.lock()
96+
.await
97+
.stream_len(ACCOUNT_STREAM_KEY)
98+
.await?;
99+
if stream_len < MAX_INTERMEDIATE_STREAM_LEN {
100+
self.stream_counter = 0;
101+
break;
102+
}
103+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
104+
}
105+
}
79106
if self
80107
.accounts_selector
81108
.is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref())
@@ -111,23 +138,24 @@ impl GeyserDumper {
111138
.await
112139
.send(ACCOUNT_STREAM_KEY, data)
113140
.await?;
141+
self.stream_counter += 1;
114142
} else {
115143
tracing::trace!(?account, ?meta, "Account filtered out by accounts selector");
116144
return Ok(());
117145
}
118146

119-
let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed);
120-
self.accounts_spinner.set_position(prev + 1);
147+
self.accounts_count += 1;
148+
self.accounts_spinner.set_position(self.accounts_count);
121149

122150
if self.throttle_nanos > 0 {
123151
tokio::time::sleep(std::time::Duration::from_nanos(self.throttle_nanos)).await;
124152
}
153+
125154
Ok(())
126155
}
127156

128157
pub async fn force_flush(self) {
129-
self.accounts_spinner
130-
.set_position(self.accounts_count.load(Ordering::Relaxed));
158+
self.accounts_spinner.set_position(self.accounts_count);
131159
self.accounts_spinner
132160
.finish_with_message("Finished processing snapshot!");
133161
let messenger_mutex = Arc::into_inner(self.messenger)

plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7171
}
7272
}
7373

74-
info!(
75-
"Done! Accounts: {}",
76-
dumper
77-
.accounts_count
78-
.load(std::sync::atomic::Ordering::Relaxed)
79-
);
74+
info!("Done! Accounts: {}", dumper.accounts_count);
8075

8176
dumper.force_flush().await;
8277

0 commit comments

Comments
 (0)