Skip to content

Commit 6726dd1

Browse files
committed
change TinyDancer to ::start() and add comments to sampler
1 parent 66c55a2 commit 6726dd1

File tree

4 files changed

+66
-31
lines changed

4 files changed

+66
-31
lines changed

tinydancer/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ async fn main() -> Result<()> {
157157
})
158158
},
159159
};
160-
let client = TinyDancer::new(config).await;
161-
client.join().await;
160+
161+
TinyDancer::start(config).await.unwrap();
162162
}
163163

164164
Commands::Slot => {

tinydancer/src/rpc_wrapper/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ impl ClientService<TransactionServiceConfig> for TransactionService {
155155
tx_handle: transaction_handle,
156156
}
157157
}
158+
158159
async fn join(self) -> std::result::Result<(), Self::ServiceError> {
159160
let _ = self.tx_handle.await;
160161
Ok(())

tinydancer/src/sampler.rs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ use tokio::{
4343
};
4444
use tungstenite::{connect, Message};
4545
use url::Url;
46+
4647
pub const SHRED_CF: &str = "archived_shreds";
48+
4749
pub struct SampleService {
4850
sample_indices: Vec<u64>,
4951
// peers: Vec<(Pubkey, SocketAddr)>,
@@ -59,63 +61,78 @@ pub struct SampleServiceConfig {
5961
#[derive(Clone, Debug)]
6062
pub struct ArchiveConfig {
6163
pub shred_archive_duration: u64,
62-
6364
pub archive_path: String,
6465
}
66+
6567
#[async_trait]
6668
impl ClientService<SampleServiceConfig> for SampleService {
6769
type ServiceError = tokio::task::JoinError;
70+
6871
fn new(config: SampleServiceConfig) -> Self {
6972
let sampler_handle = tokio::spawn(async move {
7073
let rpc_url = endpoint(config.cluster);
7174
let pub_sub = convert_to_websocket!(rpc_url);
75+
7276
let mut threads = Vec::default();
7377

7478
let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::<u64>();
7579
let (shred_tx, shred_rx) = crossbeam::channel::unbounded();
7680
let (verified_shred_tx, verified_shred_rx) = crossbeam::channel::unbounded();
77-
let status_arc = Arc::clone(&config.status_sampler);
81+
82+
let status_arc = config.status_sampler.clone();
83+
84+
// waits on new slots => triggers shred_update_loop
7885
threads.push(tokio::spawn(slot_update_loop(
7986
slot_update_tx,
8087
pub_sub,
8188
config.status_sampler,
8289
)));
90+
91+
// sample shreds from new slot
92+
// verify each shred in shred_verify_loop
8393
threads.push(tokio::spawn(shred_update_loop(
8494
slot_update_rx,
8595
rpc_url,
8696
shred_tx,
8797
status_arc,
8898
)));
8999

100+
// verify shreds + store in db in shred_archiver
90101
threads.push(tokio::spawn(shred_verify_loop(shred_rx, verified_shred_tx)));
102+
91103
if let Some(archive_config) = config.archive_config {
92104
threads.push(tokio::spawn(shred_archiver(
93105
verified_shred_rx,
94106
archive_config,
95107
config.instance,
96108
)));
97109
}
110+
98111
for thread in threads {
99-
thread.await;
112+
thread.await.unwrap();
100113
}
101114
});
115+
102116
let sample_indices: Vec<u64> = Vec::default();
103117
Self {
104118
sampler_handle,
105119
sample_indices,
106120
}
107121
}
122+
108123
async fn join(self) -> std::result::Result<(), Self::ServiceError> {
109124
self.sampler_handle.await
110125
}
111126
}
127+
112128
pub fn gen_random_indices(max_shreds_per_slot: usize, sample_qty: usize) -> Vec<usize> {
113129
let mut rng = StdRng::from_entropy();
114-
let vec = (0..max_shreds_per_slot)
130+
let vec = (0..sample_qty)
115131
.map(|_| rng.gen_range(0..max_shreds_per_slot))
116132
.collect::<Vec<usize>>();
117-
vec.as_slice()[0..sample_qty].to_vec()
133+
vec
118134
}
135+
119136
pub async fn request_shreds(
120137
slot: usize,
121138
indices: Vec<usize>,
@@ -205,6 +222,7 @@ async fn shred_update_loop(
205222
}
206223

207224
if let Ok(slot) = slot_update_rx.recv() {
225+
// get shred length
208226
let shred_for_one = request_shreds(slot as usize, vec![0], endpoint.clone()).await;
209227
// info!("res {:?}", shred_for_one);
210228
let shred_indices_for_slot = match shred_for_one {
@@ -251,6 +269,8 @@ async fn shred_update_loop(
251269
None
252270
}
253271
};
272+
273+
// get a random sample of shreds
254274
info!("indices of: {:?} {:?}", shred_indices_for_slot, slot);
255275
if let Some(shred_indices_for_slot) = shred_indices_for_slot.clone() {
256276
let shreds_for_slot = request_shreds(
@@ -314,12 +334,14 @@ async fn shred_update_loop(
314334
}
315335
}
316336
}
337+
317338
// use solana_ledger::shred::dispatch;
339+
340+
// verifies the merkle proof of the shread
318341
pub fn verify_sample(shred: &Shred, leader: solana_ledger::shred::Pubkey) -> bool {
319342
// @TODO fix error handling here
320343
let verify_merkle_root = match shred {
321344
Shred::ShredData(ShredData::Merkle(shred)) => Some(shred.verify_merkle_proof()),
322-
323345
Shred::ShredCode(ShredCode::Merkle(shred)) => Some(shred.verify_merkle_proof()),
324346
_ => None,
325347
};
@@ -338,6 +360,7 @@ pub fn verify_sample(shred: &Shred, leader: solana_ledger::shred::Pubkey) -> boo
338360
.all(|s| *s);
339361
verified
340362
}
363+
341364
pub async fn shred_verify_loop(
342365
shred_rx: Receiver<(Vec<Option<Shred>>, solana_ledger::shred::Pubkey)>,
343366
verified_shred_tx: Sender<(Shred, solana_ledger::shred::Pubkey)>,
@@ -373,6 +396,9 @@ pub async fn shred_verify_loop(
373396
}
374397
}
375398
}
399+
400+
401+
// store verified shreds in db
376402
pub async fn shred_archiver(
377403
verified_shred_rx: Receiver<(Shred, solana_ledger::shred::Pubkey)>,
378404
_archive_config: ArchiveConfig,
@@ -413,6 +439,8 @@ pub async fn shred_archiver(
413439
}
414440
}
415441
}
442+
443+
416444
pub async fn pull_and_verify_shreds(slot: usize, endpoint: String) -> bool {
417445
let shred_for_one = request_shreds(slot, vec![0], endpoint.clone()).await;
418446
// info!("res {:?}", shred_for_one);
@@ -522,6 +550,7 @@ pub async fn pull_and_verify_shreds(slot: usize, endpoint: String) -> bool {
522550
false
523551
}
524552
}
553+
525554
pub fn put_serialized<T: serde::Serialize + std::fmt::Debug>(
526555
instance: &rocksdb::DB,
527556
cf: &ColumnFamily,

tinydancer/src/tinydancer.rs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::{
1515
ui::{UiConfig, UiService},
1616
};
1717
use async_trait::async_trait;
18+
use futures::future::join_all;
19+
use rand::seq::index::sample;
1820
use tiny_logger::logs::info;
1921
// use log::info;
2022
// use log4rs;
@@ -29,13 +31,15 @@ pub trait ClientService<T> {
2931
fn new(config: T) -> Self;
3032
async fn join(self) -> std::result::Result<(), Self::ServiceError>;
3133
}
34+
3235
pub struct TinyDancer {
3336
sample_service: SampleService,
3437
ui_service: Option<UiService>,
3538
sample_qty: u64,
3639
config: TinyDancerConfig,
3740
transaction_service: TransactionService,
3841
}
42+
3943
#[derive(Clone)]
4044
pub struct TinyDancerConfig {
4145
pub rpc_endpoint: Cluster,
@@ -52,6 +56,7 @@ use std::fs::read_dir;
5256
use std::io;
5357
use std::io::ErrorKind;
5458
use std::path::PathBuf;
59+
5560
// use tiny_logger::logs::info;
5661
pub fn get_project_root() -> io::Result<PathBuf> {
5762
let path = env::current_dir()?;
@@ -71,11 +76,13 @@ pub fn get_project_root() -> io::Result<PathBuf> {
7176
"Ran out of places to find Cargo.toml",
7277
))
7378
}
79+
7480
impl TinyDancer {
75-
pub async fn new(config: TinyDancerConfig) -> Self {
81+
pub async fn start(config: TinyDancerConfig) -> Result<()> {
7682
let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer"));
83+
7784
let client_status = Arc::new(Mutex::new(status));
78-
let status_sampler = Arc::clone(&client_status);
85+
let status_sampler = client_status.clone();
7986

8087
let TinyDancerConfig {
8188
enable_ui_service,
@@ -87,33 +94,34 @@ impl TinyDancer {
8794
} = config.clone();
8895
std::env::set_var("RUST_LOG", "info");
8996
tiny_logger::setup_file_with_default(&log_path, "RUST_LOG");
90-
let rpc_cluster = rpc_endpoint.clone();
9197

9298
let mut opts = rocksdb::Options::default();
9399
opts.create_if_missing(true);
94100
opts.set_error_if_exists(false);
95101
opts.create_missing_column_families(true);
96-
let instance = rocksdb::DB::open_cf(
102+
103+
// setup db
104+
let db = rocksdb::DB::open_cf(
97105
&opts,
98106
archive_config.clone().unwrap().archive_path,
99107
vec![SHRED_CF],
100108
)
101109
.unwrap();
102-
let instance = Arc::new(instance);
103-
let sample_instance = Arc::clone(&instance);
110+
let db = Arc::new(db);
104111

105-
let rpc_instance = Arc::clone(&instance);
106112
let sample_service_config = SampleServiceConfig {
107-
cluster: rpc_endpoint,
113+
cluster: rpc_endpoint.clone(),
108114
archive_config,
109-
instance: sample_instance,
115+
instance: db.clone(),
110116
status_sampler,
111117
};
112118
let sample_service = SampleService::new(sample_service_config);
119+
113120
let transaction_service = TransactionService::new(TransactionServiceConfig {
114-
cluster: rpc_cluster,
115-
db_instance: rpc_instance,
121+
cluster: rpc_endpoint.clone(),
122+
db_instance: db.clone(),
116123
});
124+
117125
let ui_service = if enable_ui_service {
118126
Some(UiService::new(UiConfig {
119127
client_status,
@@ -130,26 +138,22 @@ impl TinyDancer {
130138
None
131139
};
132140

133-
Self {
134-
config,
135-
ui_service,
136-
sample_qty,
137-
sample_service,
138-
transaction_service,
139-
}
140-
}
141-
pub async fn join(self) {
142-
self.sample_service
141+
// run
142+
sample_service
143143
.join()
144144
.await
145145
.expect("error in sample service thread");
146-
self.transaction_service
146+
147+
transaction_service
147148
.join()
148149
.await
149150
.expect("ERROR IN SIMPLE PAYMENT SERVICE");
150-
if let Some(ui_service) = self.ui_service {
151+
152+
if let Some(ui_service) = ui_service {
151153
block_on!(async { ui_service.join().await }, "Ui Service Error");
152154
}
155+
156+
Ok(())
153157
}
154158
}
155159

@@ -161,6 +165,7 @@ pub enum Cluster {
161165
Localnet,
162166
Custom(String),
163167
}
168+
164169
pub fn endpoint(cluster: Cluster) -> String {
165170
let cluster = cluster;
166171
match cluster {

0 commit comments

Comments
 (0)