Skip to content

Commit cd079b8

Browse files
committed
flatten sampler loops and change to return result
1 parent 6726dd1 commit cd079b8

File tree

1 file changed

+146
-148
lines changed

1 file changed

+146
-148
lines changed

tinydancer/src/sampler.rs

Lines changed: 146 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl ClientService<SampleServiceConfig> for SampleService {
109109
}
110110

111111
for thread in threads {
112-
thread.await.unwrap();
112+
thread.await.unwrap().unwrap();
113113
}
114114
});
115115

@@ -138,199 +138,197 @@ pub async fn request_shreds(
138138
indices: Vec<usize>,
139139
endpoint: String,
140140
) -> Result<GetShredResponse, serde_json::Error> {
141-
let request =
142-
serde_json::json!( {"jsonrpc": "2.0","id":1,"method":"getShreds","params":[slot,&indices,{
143-
"commitment": "confirmed"
144-
}]}) // getting one shred just to get max shreds per slot, can maybe randomize the selection here
141+
let request = serde_json::json!({
142+
"jsonrpc": "2.0",
143+
"id": 1,
144+
"method": "getShreds",
145+
"params":[
146+
slot,
147+
indices,
148+
{ "commitment": "confirmed"}
149+
]
150+
}) // getting one shred just to get max shreds per slot, can maybe randomize the selection here
145151
.to_string();
152+
146153
let res = send_rpc_call!(endpoint, request);
147154
// info!("{:?}", res);
148-
serde_json::from_str::<GetShredResponse>(res.as_str())
155+
serde_json::from_str::<GetShredResponse>(&res)
149156
}
150157

151158
async fn slot_update_loop(
152159
slot_update_tx: Sender<u64>,
153160
pub_sub: String,
154161
status_sampler: Arc<Mutex<ClientStatus>>,
155-
) {
156-
let connection = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
162+
) -> anyhow::Result<()> {
163+
let (mut socket, _response) = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
157164
Ok((socket, _response)) => Some((socket, _response)),
158165
Err(_) => {
159166
let mut status = status_sampler.lock().unwrap();
160167
*status = ClientStatus::Crashed(String::from("Client can't connect to socket"));
161-
162-
Mutex::unlock(status);
163168
None
164169
}
165-
}; //
166-
167-
match connection {
168-
Some((mut socket, _response)) => {
169-
socket
170-
.write_message(Message::Text(
171-
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
172-
))
173-
.unwrap();
174-
175-
loop {
176-
match socket.read_message() {
177-
Ok(msg) => {
178-
let res =
179-
serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());
180-
// info!("res: {:?}", msg.to_string().as_str());
181-
if let Ok(res) = res {
182-
match slot_update_tx.send(res.params.result.root as u64) {
183-
Ok(_) => {
184-
info!("slot updated: {:?}", res.params.result.root);
185-
}
186-
Err(e) => {
187-
info!(
188-
"error here: {:?} {:?}",
189-
e, res.params.result.root as u64
190-
);
191-
continue; // @TODO: we should add retries here incase send fails for some reason
192-
}
193-
}
170+
}.unwrap();
171+
172+
socket.write_message(Message::Text(
173+
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
174+
))?;
175+
176+
loop {
177+
match socket.read_message() {
178+
Ok(msg) => {
179+
let res =
180+
serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());
181+
182+
// info!("res: {:?}", msg.to_string().as_str());
183+
if let Ok(res) = res {
184+
match slot_update_tx.send(res.params.result.root as u64) {
185+
Ok(_) => {
186+
info!("slot updated: {:?}", res.params.result.root);
187+
}
188+
Err(e) => {
189+
info!(
190+
"error here: {:?} {:?}",
191+
e, res.params.result.root as u64
192+
);
193+
continue; // @TODO: we should add retries here incase send fails for some reason
194194
}
195195
}
196-
Err(e) => info!("err: {:?}", e),
197196
}
198197
}
198+
Err(e) => info!("err: {:?}", e),
199199
}
200-
None => {}
201200
}
202201
}
203202

203+
macro_rules! unwrap_or_continue {
204+
(Result $var:ident) => {
205+
if $var.is_err() {
206+
continue;
207+
} else {
208+
$var.unwrap()
209+
}
210+
};
211+
(Option $var:ident) => {
212+
if $var.is_none() {
213+
continue;
214+
} else {
215+
$var.unwrap()
216+
}
217+
};
218+
(OptionRef $var:ident) => {
219+
if $var.is_none() {
220+
continue;
221+
} else {
222+
$var.as_ref().unwrap()
223+
}
224+
};
225+
}
226+
204227
async fn shred_update_loop(
205228
slot_update_rx: Receiver<u64>,
206229
endpoint: String,
207230
shred_tx: Sender<(Vec<Option<Shred>>, solana_ledger::shred::Pubkey)>,
208231
status_sampler: Arc<Mutex<ClientStatus>>,
209-
) {
232+
) -> anyhow::Result<()> {
210233
loop {
211234
{
212235
let mut status = status_sampler.lock().unwrap();
213236

214-
if let ClientStatus::Crashed(_) = &*status {
215-
Mutex::unlock(status);
216-
} else {
237+
if let ClientStatus::Crashed(_) = &*status { } else {
217238
*status = ClientStatus::Active(String::from(
218239
"Monitoring Tinydancer: Actively Sampling Shreds",
219240
));
220-
Mutex::unlock(status);
221241
}
222242
}
223243

224244
if let Ok(slot) = slot_update_rx.recv() {
225-
// get shred length
226-
let shred_for_one = request_shreds(slot as usize, vec![0], endpoint.clone()).await;
227-
// info!("res {:?}", shred_for_one);
228-
let shred_indices_for_slot = match shred_for_one {
229-
Ok(first_shred) => {
230-
let first_shred = &first_shred.result.shreds[1].clone(); // add some check later
231-
232-
let max_shreds_per_slot = if let Some(first_shred) = first_shred {
233-
match (
234-
first_shred.clone().shred_data,
235-
first_shred.clone().shred_code,
236-
) {
237-
(Some(data_shred), None) => {
238-
Some(
239-
Shred::ShredData(data_shred)
240-
.num_data_shreds()
241-
.expect("num data shreds error"),
242-
)
243-
// Some(data_shred. ().expect("num data shreds error"))
244-
}
245-
(None, Some(coding_shred)) => Some(
246-
Shred::ShredCode(coding_shred)
247-
.num_coding_shreds()
248-
.expect("num code shreds error"),
249-
),
250-
_ => None,
251-
}
252-
} else {
253-
info!("shred: {:?}", first_shred);
254-
None
255-
};
256-
info!("max_shreds_per_slot {:?}", max_shreds_per_slot);
257-
258-
if let Some(max_shreds_per_slot) = max_shreds_per_slot {
259-
let mut indices = gen_random_indices(max_shreds_per_slot as usize, 10); // unwrap only temporary
260-
indices.push(0_usize);
261-
Some(indices)
262-
} else {
263-
None
264-
}
265-
}
266-
Err(_) => {
267-
//@TODO: add logger here
268-
269-
None
245+
// get shred length (max_shreds_per_slot)
246+
let first_shred = request_shreds(slot as usize, vec![0], endpoint.clone()).await;
247+
let first_shred = unwrap_or_continue!(Result first_shred);
248+
249+
let first_shred = &first_shred.result.shreds[1];
250+
let first_shred = unwrap_or_continue!(OptionRef first_shred);
251+
252+
let max_shreds_per_slot = {
253+
if let Some(data_shred) = &first_shred.shred_data {
254+
Shred::ShredData(data_shred.clone())
255+
.num_data_shreds()
256+
.expect("num data shreds error")
257+
} else if let Some(code_shred) = &first_shred.shred_code {
258+
Shred::ShredCode(code_shred.clone())
259+
.num_coding_shreds()
260+
.expect("num code shreds error")
261+
} else {
262+
// todo
263+
continue;
270264
}
271265
};
272266

273267
// get a random sample of shreds
268+
let mut shred_indices_for_slot = gen_random_indices(max_shreds_per_slot as usize, 10); // unwrap only temporary
269+
shred_indices_for_slot.push(0_usize);
274270
info!("indices of: {:?} {:?}", shred_indices_for_slot, slot);
275-
if let Some(shred_indices_for_slot) = shred_indices_for_slot.clone() {
276-
let shreds_for_slot = request_shreds(
277-
slot as usize,
278-
shred_indices_for_slot.clone(),
279-
endpoint.clone(),
280-
)
281-
.await;
282-
// info!("made 2nd req: {:?}", shreds_for_slot);
283-
if let Ok(shreds_for_slot) = shreds_for_slot {
284-
info!("get shred for slot in 2nd req");
285-
let mut shreds: Vec<Option<Shred>> = shreds_for_slot
286-
.result
287-
.shreds
288-
.par_iter()
289-
.map(|s| try_coerce_shred!(s))
290-
.collect();
291-
// info!("before leader");
292-
let leader = solana_ledger::shred::Pubkey::from_str(
293-
shreds_for_slot.result.leader.as_str(),
294-
)
295-
.unwrap();
296-
// info!("leader {:?}", leader);
297-
let mut fullfill_count = AtomicU32::new(0u32);
298-
shreds.dedup();
299-
shreds.iter().for_each(|f| {
300-
if let Some(s) = f {
301-
info!("{:?}", s.index());
302-
}
303-
});
304-
shreds.par_iter().for_each(|s| {
305-
if let Some(s) = s {
306-
match shred_indices_for_slot.contains(&(s.index() as usize)) {
307-
true => {
308-
fullfill_count.fetch_add(1, Ordering::Relaxed);
309-
info!(
310-
"Received requested shred: {:?} for slot: {:?}",
311-
s.index(),
312-
s.slot()
313-
)
314-
}
315-
false => info!(
316-
"Received unrequested shred index: {:?} for slot: {:?}",
317-
s.index(),
318-
s.slot()
319-
),
320-
}
321-
} else {
322-
info!("Received empty")
271+
272+
let shreds_for_slot = request_shreds(
273+
slot as usize,
274+
shred_indices_for_slot.clone(),
275+
endpoint.clone(),
276+
)
277+
.await;
278+
let shreds_for_slot = unwrap_or_continue!(Result shreds_for_slot);
279+
280+
info!("get shred for slot in 2nd req");
281+
let mut shreds: Vec<Option<Shred>> = shreds_for_slot
282+
.result
283+
.shreds
284+
.par_iter()
285+
.map(|s| try_coerce_shred!(s))
286+
.collect();
287+
288+
// info!("before leader");
289+
let leader = solana_ledger::shred::Pubkey::from_str(
290+
shreds_for_slot.result.leader.as_str(),
291+
)?;
292+
293+
// info!("leader {:?}", leader);
294+
let mut fullfill_count = AtomicU32::new(0u32);
295+
shreds.dedup();
296+
shreds.iter().for_each(|f| {
297+
if let Some(s) = f {
298+
info!("{:?}", s.index());
299+
}
300+
});
301+
302+
shreds.par_iter().for_each(|s| {
303+
if let Some(s) = s {
304+
match shred_indices_for_slot.contains(&(s.index() as usize)) {
305+
true => {
306+
fullfill_count.fetch_add(1, Ordering::Relaxed);
307+
info!(
308+
"Received requested shred: {:?} for slot: {:?}",
309+
s.index(),
310+
s.slot()
311+
)
323312
}
324-
});
325-
if (fullfill_count.get_mut().to_owned() as usize) < shred_indices_for_slot.len()
326-
{
327-
info!("Received incomplete number of shreds, requested {:?} shreds for slot {:?} and received {:?}", shred_indices_for_slot.len(),slot, fullfill_count);
313+
false => info!(
314+
"Received unrequested shred index: {:?} for slot: {:?}",
315+
s.index(),
316+
s.slot()
317+
),
328318
}
329-
shred_tx
330-
.send((shreds, leader))
331-
.expect("shred tx send error");
319+
} else {
320+
info!("Received empty")
332321
}
322+
});
323+
324+
if (fullfill_count.get_mut().to_owned() as usize) < shred_indices_for_slot.len()
325+
{
326+
info!("Received incomplete number of shreds, requested {:?} shreds for slot {:?} and received {:?}", shred_indices_for_slot.len(),slot, fullfill_count);
333327
}
328+
329+
shred_tx
330+
.send((shreds, leader))
331+
.expect("shred tx send error");
334332
}
335333
}
336334
}
@@ -364,7 +362,7 @@ pub fn verify_sample(shred: &Shred, leader: solana_ledger::shred::Pubkey) -> boo
364362
pub async fn shred_verify_loop(
365363
shred_rx: Receiver<(Vec<Option<Shred>>, solana_ledger::shred::Pubkey)>,
366364
verified_shred_tx: Sender<(Shred, solana_ledger::shred::Pubkey)>,
367-
) {
365+
) -> anyhow::Result<()> {
368366
loop {
369367
let rx = shred_rx.recv();
370368

@@ -403,7 +401,7 @@ pub async fn shred_archiver(
403401
verified_shred_rx: Receiver<(Shred, solana_ledger::shred::Pubkey)>,
404402
_archive_config: ArchiveConfig,
405403
instance: Arc<rocksdb::DB>,
406-
) {
404+
) -> anyhow::Result<()> {
407405
loop {
408406
if let Ok((verified_shred, leader)) = verified_shred_rx.recv() {
409407
let mut opts = RocksOptions::default();

0 commit comments

Comments
 (0)