Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ If you are building the Metaplex RPC API infrastructure please follow the instru

If you are using this plugin for your bespoke use case then the build steps are below.

#### A note on formatting

Since `rustfmt.toml` uses unstable configuration options, it is required to run formatting with the nightly toolchain: `cargo +nightly fmt`.

### Building Locally

#### Linux
Expand Down Expand Up @@ -80,7 +84,7 @@ The process running the validator must have access to environment variables. Tho

```bash
RUST_LOG=warn
PLUGIN_MESSENGER_CONFIG='{ messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }'
PLUGIN_MESSENGER_CONFIG={ messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }
```

The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a specific configuration for the messenger.
Expand All @@ -101,11 +105,11 @@ The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a
```
Lower Scale Low network latency

PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=1000000,local_buffer_max_window=10, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }'
PLUGIN_MESSENGER_CONFIG={pipeline_size_bytes=1000000,local_buffer_max_window=10, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }

High Scale Higher latency

PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=50000000,local_buffer_max_window=500, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }'
PLUGIN_MESSENGER_CONFIG={pipeline_size_bytes=50000000,local_buffer_max_window=500, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }


```
Expand All @@ -120,7 +124,7 @@ PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=50000000,local_buffer_max_window=5

```

PLUGIN_MESSENGER_CONFIG='{batch_size=1000, message_wait_timeout=5, retries=5, consumer_id="random_string",messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }'
PLUGIN_MESSENGER_CONFIG={batch_size=1000, message_wait_timeout=5, retries=5, consumer_id="random_string",messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }
PLUGIN_ACCOUNT_STREAM_SIZE=250000000
PLUGIN_SLOT_STREAM_SIZE=250000
PLUGIN_TRANSACTION_STREAM_SIZE=25000000
Expand Down
2 changes: 1 addition & 1 deletion plerkle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod accounts_selector;
pub mod config;
pub mod error;
pub mod geyser_plugin_nft;
pub mod config;
pub mod metrics;
pub mod transaction_selector;
3 changes: 2 additions & 1 deletion plerkle_messenger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ mod plerkle_messenger;
pub mod redis;
pub use redis::*;

pub use {crate::error::*, plerkle_messenger::*};
pub use crate::error::*;
pub use plerkle_messenger::*;
10 changes: 5 additions & 5 deletions plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ pub async fn select_messenger_read(
config: MessengerConfig,
) -> Result<Box<dyn Messenger>, MessengerError> {
match config.messenger_type {
MessengerType::Redis => {
RedisMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn Messenger>)
}
MessengerType::Redis => RedisMessenger::new(config)
.await
.map(|a| Box::new(a) as Box<dyn Messenger>),
_ => Err(MessengerError::ConfigurationError {
msg: "This Messenger type is not valid or not unimplemented.".to_string()
})
msg: "This Messenger type is not valid or not unimplemented.".to_string(),
}),
}
}

Expand Down
12 changes: 6 additions & 6 deletions plerkle_messenger/src/redis/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,10 @@ impl MessageStreamer for RedisMessenger {
);

// Add stream to Redis.
let result: RedisResult<()> = self.connection.xgroup_create_mkstream(
stream_key,
self.consumer_group_name.as_str(),
"$",
).await;
let result: RedisResult<()> = self
.connection
.xgroup_create_mkstream(stream_key, self.consumer_group_name.as_str(), "$")
.await;

if let Err(e) = result {
info!("Group already exists: {:?}", e)
Expand Down Expand Up @@ -424,7 +423,8 @@ impl MessageStreamer for RedisMessenger {
for bytes in stream.local_buffer.iter() {
pipe.xadd_maxlen(stream_key, maxlen, "*", &[(DATA_KEY, &bytes)]);
}
let result: Result<Vec<String>, redis::RedisError> = pipe.query_async(&mut self.connection).await;
let result: Result<Vec<String>, redis::RedisError> =
pipe.query_async(&mut self.connection).await;
if let Err(e) = result {
error!("Redis send error: {e}");
return Err(MessengerError::SendError { msg: e.to_string() });
Expand Down
6 changes: 3 additions & 3 deletions plerkle_serialization/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq,Error)]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum PlerkleSerializationError {
#[error("Serialization error: {0}")]
SerializationError(String)
}
SerializationError(String),
}
6 changes: 6 additions & 0 deletions plerkle_serialization/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
#[allow(unused_imports)]
#[rustfmt::skip]
mod account_info_generated;
#[allow(unused_imports)]
#[rustfmt::skip]
mod block_info_generated;
#[allow(unused_imports)]
#[rustfmt::skip]
mod common_generated;
#[allow(unused_imports)]
#[rustfmt::skip]
mod compiled_instruction_generated;
#[allow(unused_imports)]
#[rustfmt::skip]
mod slot_status_info_generated;
#[allow(unused_imports)]
#[rustfmt::skip]
mod transaction_info_generated;

pub mod deserializer;
Expand Down
2 changes: 1 addition & 1 deletion plerkle_serialization/src/serializer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod serializer_common;
mod serializer_stable;
pub use serializer_stable::*;
pub use serializer_stable::*;
24 changes: 13 additions & 11 deletions plerkle_serialization/src/serializer/serializer_stable.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use crate::error::PlerkleSerializationError;
use crate::solana_geyser_plugin_interface_shims::{
ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus,
};
use crate::{
error::PlerkleSerializationError,
solana_geyser_plugin_interface_shims::{
ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus,
},
AccountInfo, AccountInfoArgs, BlockInfo, BlockInfoArgs, CompiledInnerInstruction,
CompiledInnerInstructionArgs, CompiledInnerInstructions, CompiledInnerInstructionsArgs,
CompiledInstruction, CompiledInstructionArgs,
Pubkey as FBPubkey, Pubkey, SlotStatusInfo, SlotStatusInfoArgs, Status as FBSlotStatus,
TransactionInfo, TransactionInfoArgs, TransactionVersion,
CompiledInstruction, CompiledInstructionArgs, Pubkey as FBPubkey, Pubkey, SlotStatusInfo,
SlotStatusInfoArgs, Status as FBSlotStatus, TransactionInfo, TransactionInfoArgs,
TransactionVersion,
};
use chrono::Utc;
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use solana_sdk::message::{SanitizedMessage, VersionedMessage};
use solana_sdk::transaction::VersionedTransaction;
use solana_transaction_status::option_serializer::OptionSerializer;
use solana_sdk::{
message::{SanitizedMessage, VersionedMessage},
transaction::VersionedTransaction,
};
use solana_transaction_status::{
EncodedConfirmedTransactionWithStatusMeta, UiInstruction, UiTransactionStatusMeta,
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, UiInstruction,
UiTransactionStatusMeta,
};

pub fn serialize_account<'a>(
Expand Down
2 changes: 1 addition & 1 deletion plerkle_snapshot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
# Renamed from original "solana-snapshot-etl"
name = "plerkle_snapshot"
version = "0.7.0"
version = "0.8.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/solana-snapshot-etl"
Expand Down
58 changes: 19 additions & 39 deletions plerkle_snapshot/src/append_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@
// This file contains code vendored from https://github.com/solana-labs/solana
// Source: solana/runtime/src/append_vec.rs

use {
log::*,
memmap2::{Mmap, MmapMut},
serde::{Deserialize, Serialize},
solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
clock::Epoch,
hash::Hash,
pubkey::Pubkey,
},
std::{
convert::TryFrom,
fs::OpenOptions,
io::{self, Read},
mem,
path::Path,
},
use std::{
convert::TryFrom,
fs::OpenOptions,
io::{self, Read},
mem,
path::Path,
};

use log::*;
use memmap2::{Mmap, MmapMut};
use serde::{Deserialize, Serialize};
use solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
clock::Epoch,
hash::Hash,
pubkey::Pubkey,
};

// Data placement should be aligned at the next boundary. Without alignment accessing the memory may
Expand Down Expand Up @@ -186,11 +185,7 @@ impl AppendVec {
current_len: usize,
slot: u64,
) -> io::Result<Self> {
let data = OpenOptions::new()
.read(true)
.write(false)
.create(false)
.open(&path)?;
let data = OpenOptions::new().read(true).write(false).create(false).open(&path)?;

let file_size = std::fs::metadata(&path)?.len();
AppendVec::sanitize_len_and_size(current_len, file_size as usize)?;
Expand All @@ -204,12 +199,7 @@ impl AppendVec {
result?
};

let new = AppendVec {
map,
current_len,
file_size,
slot,
};
let new = AppendVec { map, current_len, file_size, slot };

Ok(new)
}
Expand Down Expand Up @@ -269,17 +259,7 @@ impl AppendVec {
let (hash, next): (&'a Hash, _) = self.get_type(next)?;
let (data, next) = self.get_slice(next, meta.data_len as usize)?;
let stored_size = next - offset;
Some((
StoredAccountMeta {
meta,
account_meta,
data,
offset,
stored_size,
hash,
},
next,
))
Some((StoredAccountMeta { meta, account_meta, data, offset, stored_size, hash }, next))
}

pub fn get_slot(&self) -> u64 {
Expand Down
70 changes: 27 additions & 43 deletions plerkle_snapshot/src/archived.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::{
fs::File,
io::{BufReader, Read},
path::{Component, Path},
pin::Pin,
time::Instant,
};

use log::info;
use tar::{Archive, Entries, Entry};

use crate::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, Result, SerializableAccountStorageEntry, SnapshotError,
SnapshotExtractor,
};
use log::info;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::{Component, Path};
use std::pin::Pin;
use std::time::Instant;
use tar::{Archive, Entries, Entry};

/// Extracts account data from a .tar.zst stream.
pub struct ArchiveSnapshotExtractor<Source>
Expand Down Expand Up @@ -72,10 +76,7 @@ where
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);

info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!("Read bank fields in {:?}", versioned_bank_post_time - pre_unpack);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
Expand All @@ -89,22 +90,18 @@ where
}

fn unboxed_iter(&mut self) -> impl Iterator<Item = Result<AppendVec>> + '_ {
self.entries
.take()
.into_iter()
.flatten()
.filter_map(|entry| {
let mut entry = match entry {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let path = match entry.path() {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let (slot, id) = path.file_name().and_then(parse_append_vec_name)?;
Some(self.process_entry(&mut entry, slot, id))
})
self.entries.take().into_iter().flatten().filter_map(|entry| {
let mut entry = match entry {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let path = match entry.path() {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let (slot, id) = path.file_name().and_then(parse_append_vec_name)?;
Some(self.process_entry(&mut entry, slot, id))
})
}

fn process_entry(
Expand All @@ -113,22 +110,13 @@ where
slot: u64,
id: u64,
) -> Result<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vecs = self.accounts_db_fields.0.get(&slot).map(|v| &v[..]).unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_reader(
entry,
known_vec.accounts_current_len,
slot,
)?)
Ok(AppendVec::new_from_reader(entry, known_vec.accounts_current_len, slot)?)
}

fn is_snapshot_manifest_file(path: &Path) -> bool {
Expand All @@ -141,11 +129,7 @@ where
_ => return false,
};
// Check if slot number file is valid u64.
if slot_number_str_1
.to_str()
.and_then(|s| s.parse::<u64>().ok())
.is_none()
{
if slot_number_str_1.to_str().and_then(|s| s.parse::<u64>().ok()).is_none() {
return false;
}
let slot_number_str_2 = match components.next() {
Expand Down
Loading
Loading