Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "1.10.0"
version = "1.11.0"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
2 changes: 1 addition & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "1.10.0"
version = "1.11.0"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
7 changes: 7 additions & 0 deletions plerkle_messenger/src/redis/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ impl RedisMessenger {
}
Ok(())
}

pub async fn stream_len(&mut self, stream_key: &'static str) -> Result<u64, MessengerError> {
Ok(self.connection.xlen(stream_key).await.map_err(|e| {
error!("Failed to read stream length: {}", e);
MessengerError::ConnectionError { msg: e.to_string() }
})?)
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion plerkle_snapshot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
# Renamed from original "solana-snapshot-etl"
name = "plerkle_snapshot"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/solana-snapshot-etl"
Expand Down
44 changes: 36 additions & 8 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,29 @@ use plerkle_serialization::serializer::serialize_account;
use plerkle_snapshot::append_vec::StoredMeta;
use solana_sdk::account::{Account, AccountSharedData, ReadableAccount};
use std::error::Error;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::accounts_selector::AccountsSelector;

const ACCOUNT_STREAM_KEY: &str = "ACC";
// the upper limit of accounts stream length for when the snapshot is in progress
const MAX_INTERMEDIATE_STREAM_LEN: u64 = 50_000_000;
// every PROCESSED_CHECKPOINT we check the stream length and reset the local stream_counter
const PROCESSED_CHECKPOINT: u64 = 20_000_000;

#[derive(Clone)]
pub(crate) struct GeyserDumper {
messenger: Arc<Mutex<RedisMessenger>>,
throttle_nanos: u64,
accounts_selector: AccountsSelector,
pub accounts_spinner: ProgressBar,
pub accounts_count: Arc<AtomicU64>,
/// how many accounts were processed in total during the snapshot run.
pub accounts_count: u64,
/// intermediate counter of accounts sent to regulate XLEN checks.
/// the reason for a separate field is that we initialize it as the current
/// stream length, which might be non-zero.
pub stream_counter: u64,
}

impl GeyserDumper {
Expand Down Expand Up @@ -61,13 +68,18 @@ impl GeyserDumper {
messenger
.set_buffer_size(ACCOUNT_STREAM_KEY, 100_000_000)
.await;
let initial_stream_len = messenger
.stream_len(&ACCOUNT_STREAM_KEY)
.await
.expect("get initial stream len of accounts");

Self {
messenger: Arc::new(Mutex::new(messenger)),
accounts_spinner,
accounts_selector,
accounts_count: Arc::new(AtomicU64::new(0)),
accounts_count: 0,
throttle_nanos,
stream_counter: initial_stream_len,
}
}

Expand All @@ -76,6 +88,21 @@ impl GeyserDumper {
(meta, account): (StoredMeta, AccountSharedData),
slot: u64,
) -> Result<(), Box<dyn Error>> {
if self.stream_counter >= PROCESSED_CHECKPOINT {
loop {
let stream_len = self
.messenger
.lock()
.await
.stream_len(ACCOUNT_STREAM_KEY)
.await?;
if stream_len < MAX_INTERMEDIATE_STREAM_LEN {
self.stream_counter = 0;
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if self
.accounts_selector
.is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref())
Expand Down Expand Up @@ -111,23 +138,24 @@ impl GeyserDumper {
.await
.send(ACCOUNT_STREAM_KEY, data)
.await?;
self.stream_counter += 1;
} else {
tracing::trace!(?account, ?meta, "Account filtered out by accounts selector");
return Ok(());
}

let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed);
self.accounts_spinner.set_position(prev + 1);
self.accounts_count += 1;
self.accounts_spinner.set_position(self.accounts_count);

if self.throttle_nanos > 0 {
tokio::time::sleep(std::time::Duration::from_nanos(self.throttle_nanos)).await;
}

Ok(())
}

pub async fn force_flush(self) {
self.accounts_spinner
.set_position(self.accounts_count.load(Ordering::Relaxed));
self.accounts_spinner.set_position(self.accounts_count);
self.accounts_spinner
.finish_with_message("Finished processing snapshot!");
let messenger_mutex = Arc::into_inner(self.messenger)
Expand Down
7 changes: 1 addition & 6 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

info!(
"Done! Accounts: {}",
dumper
.accounts_count
.load(std::sync::atomic::Ordering::Relaxed)
);
info!("Done! Accounts: {}", dumper.accounts_count);

dumper.force_flush().await;

Expand Down
Loading