Skip to content

Commit e94cca6

Browse files
committed
WIP
1 parent 7d53e7d commit e94cca6

File tree

2 files changed

+169
-3
lines changed

2 files changed

+169
-3
lines changed

p2p/src/protocol/cbfmgr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use nakamoto_common::collections::{AddressBook, HashMap, HashSet};
2020
use nakamoto_common::source;
2121

2222
use super::filter_cache::FilterCache;
23-
use super::output::{Disconnect, Wakeup};
23+
use super::output::{Blocks, Disconnect, Wakeup};
2424
use super::{DisconnectReason, Link, PeerId, Socket};
2525

2626
/// Idle timeout.
@@ -361,7 +361,7 @@ pub struct FilterManager<F, U> {
361361
inflight: HashMap<BlockHash, (Height, PeerId, LocalTime)>,
362362
}
363363

364-
impl<F: Filters, U: SyncFilters + Events + Wakeup + Disconnect> FilterManager<F, U> {
364+
impl<F: Filters, U: SyncFilters + Events + Wakeup + Blocks + Disconnect> FilterManager<F, U> {
365365
/// Create a new filter manager.
366366
pub fn new(config: Config, rng: fastrand::Rng, filters: F, upstream: U) -> Self {
367367
let peers = AddressBook::new(rng.clone());

p2p/src/protocol/output.rs

Lines changed: 167 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
use log::*;
99
use std::cell::RefCell;
1010
use std::collections::{HashMap, VecDeque};
11+
use std::future::Future;
12+
use std::pin::Pin;
1113
use std::rc::Rc;
1214
use std::sync::Arc;
15+
use std::task::Context;
16+
use std::task::Poll;
1317
use std::{fmt, io, net};
1418

1519
pub use crossbeam_channel as chan;
@@ -23,13 +27,14 @@ use nakamoto_common::bitcoin::network::message_filter::{
2327
CFHeaders, CFilter, GetCFHeaders, GetCFilters,
2428
};
2529
use nakamoto_common::bitcoin::network::message_network::VersionMessage;
26-
use nakamoto_common::bitcoin::Transaction;
30+
use nakamoto_common::bitcoin::{Block, Transaction};
2731

2832
use nakamoto_common::block::time::LocalDuration;
2933
use nakamoto_common::block::{BlockHash, BlockHeader, BlockTime, Height};
3034

3135
use crate::protocol::{Event, PeerId};
3236

37+
use super::invmgr::Inventories;
3338
use super::network::Network;
3439
use super::{addrmgr, cbfmgr, invmgr, peermgr, pingmgr, syncmgr, Locators};
3540

@@ -119,6 +124,48 @@ impl fmt::Display for DisconnectReason {
119124
}
120125
}
121126

127+
struct Waker;
128+
129+
impl std::task::Wake for Waker {
130+
fn wake(self: Arc<Self>) {}
131+
fn wake_by_ref(self: &Arc<Self>) {}
132+
}
133+
134+
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
135+
136+
struct Task {
137+
future: Option<BoxFuture<'static, ()>>,
138+
}
139+
140+
impl fmt::Debug for Task {
141+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142+
f.debug_struct("Task").finish()
143+
}
144+
}
145+
146+
#[derive(Debug, Clone)]
147+
pub struct Request<T> {
148+
result: Rc<RefCell<Option<Result<T, ()>>>>,
149+
}
150+
151+
impl<T: Clone + std::marker::Unpin + std::fmt::Debug> Future for Request<T> {
152+
type Output = Result<T, ()>;
153+
154+
fn poll(
155+
self: std::pin::Pin<&mut Self>,
156+
_ctx: &mut std::task::Context<'_>,
157+
) -> std::task::Poll<Self::Output> {
158+
// TODO: Use `take()` instead of cloning, once you figure it out.
159+
// For now we have to clone, as multiple futures may share the same
160+
// refcell.
161+
if let Some(result) = self.get_mut().result.borrow().clone() {
162+
Poll::Ready(result)
163+
} else {
164+
Poll::Pending
165+
}
166+
}
167+
}
168+
122169
pub(crate) mod message {
123170
use nakamoto_common::bitcoin::consensus::Encodable;
124171
use nakamoto_common::bitcoin::network::message::RawNetworkMessage;
@@ -161,10 +208,15 @@ pub struct Outbox {
161208
outbound: Rc<RefCell<VecDeque<Io>>>,
162209
/// Message outbox.
163210
outbox: Rc<RefCell<HashMap<PeerId, Vec<u8>>>>,
211+
/// Block requests to the network.
212+
block_requests: Rc<RefCell<HashMap<BlockHash, Request<Block>>>>,
164213
/// Network message builder.
165214
builder: message::Builder,
166215
/// Log target.
167216
target: &'static str,
217+
218+
tasks: Rc<RefCell<Vec<Task>>>,
219+
queue: Rc<RefCell<Vec<Task>>>,
168220
}
169221

170222
impl Outbox {
@@ -174,8 +226,11 @@ impl Outbox {
174226
version,
175227
outbound: Rc::new(RefCell::new(VecDeque::new())),
176228
outbox: Rc::new(RefCell::new(HashMap::new())),
229+
block_requests: Rc::new(RefCell::new(HashMap::new())),
177230
builder: message::Builder::new(network),
178231
target,
232+
tasks: Rc::new(RefCell::new(Vec::new())),
233+
queue: Rc::new(RefCell::new(Vec::new())),
179234
}
180235
}
181236

@@ -236,6 +291,50 @@ impl Outbox {
236291
}
237292
}
238293

294+
impl Outbox {
295+
/// A block was received from the network.
296+
pub fn block_received(&mut self, blk: Block) {
297+
let block_hash = blk.block_hash();
298+
299+
if let Some(req) = self.block_requests.borrow_mut().remove(&block_hash) {
300+
*req.result.borrow_mut() = Some(Ok(blk.clone()));
301+
}
302+
}
303+
304+
/// Spawn a future to be executed.
305+
pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) {
306+
self.queue.borrow_mut().push(Task {
307+
future: Some(Box::pin(future)),
308+
});
309+
}
310+
311+
/// Poll all tasks for completion.
312+
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
313+
let mut tasks = self.tasks.borrow_mut();
314+
315+
for task in tasks.iter_mut() {
316+
if let Some(mut fut) = task.future.take() {
317+
if fut.as_mut().poll(cx).is_pending() {
318+
task.future = Some(fut);
319+
}
320+
}
321+
}
322+
// Clear out all completed futures.
323+
tasks.retain(|t| t.future.is_some());
324+
325+
// Spawn and poll queued tasks.
326+
for t in self.queue.borrow_mut().drain(..) {
327+
tasks.push(t);
328+
}
329+
// TODO: WHO WILL POLL THESE NEW TASKS
330+
331+
if tasks.is_empty() {
332+
return Poll::Ready(());
333+
}
334+
Poll::Pending
335+
}
336+
}
337+
239338
/// Draining iterator over outbound channel queue.
240339
pub struct Drain {
241340
items: Rc<RefCell<VecDeque<Io>>>,
@@ -266,6 +365,32 @@ impl Disconnect for () {
266365
fn disconnect(&self, _addr: net::SocketAddr, _reason: DisconnectReason) {}
267366
}
268367

368+
pub trait Blocks {
369+
fn get_block(&mut self, hash: BlockHash, peer: &PeerId) -> Request<Block>;
370+
}
371+
372+
impl Blocks for Outbox {
373+
/// Fetch a block from a peer.
374+
fn get_block(&mut self, hash: BlockHash, addr: &PeerId) -> Request<Block> {
375+
use std::collections::hash_map::Entry;
376+
377+
let request = Request {
378+
result: Rc::new(RefCell::new(None)),
379+
};
380+
match self.block_requests.borrow_mut().entry(hash) {
381+
Entry::Vacant(e) => {
382+
e.insert(request.clone());
383+
}
384+
Entry::Occupied(e) => {
385+
return e.get().clone();
386+
}
387+
}
388+
self.getdata(*addr, vec![Inventory::Block(hash)]);
389+
390+
request
391+
}
392+
}
393+
269394
/// The ability to be woken up in the future.
270395
pub trait Wakeup {
271396
/// Ask to be woken up in a predefined amount of time.
@@ -494,6 +619,7 @@ impl cbfmgr::Events for Outbox {
494619
pub mod test {
495620
use super::*;
496621
use nakamoto_common::bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
622+
use nakamoto_test::assert_matches;
497623

498624
pub fn messages(
499625
channel: &mut Outbox,
@@ -511,4 +637,44 @@ pub mod test {
511637
}
512638
msgs.into_iter()
513639
}
640+
641+
#[test]
642+
fn test_get_block() {
643+
let network = Network::Mainnet;
644+
let remote: net::SocketAddr = ([88, 88, 88, 88], 8333).into();
645+
let mut outbox = Outbox::new(network, crate::protocol::PROTOCOL_VERSION, "test");
646+
let waker = Arc::new(Waker).into();
647+
let mut cx = Context::from_waker(&waker);
648+
649+
outbox.spawn({
650+
let mut outbox = outbox.clone();
651+
652+
async move {
653+
outbox
654+
.get_block(network.genesis_hash(), &remote)
655+
.await
656+
.unwrap();
657+
}
658+
});
659+
outbox.spawn({
660+
let mut outbox = outbox.clone();
661+
662+
async move {
663+
outbox
664+
.get_block(network.genesis_hash(), &remote)
665+
.await
666+
.unwrap();
667+
}
668+
});
669+
assert!(outbox.poll(&mut cx).is_pending());
670+
assert!(outbox.poll(&mut cx).is_pending());
671+
672+
assert_matches!(
673+
messages(&mut outbox, &remote).next(),
674+
Some(NetworkMessage::GetData(_))
675+
);
676+
677+
outbox.block_received(network.genesis_block());
678+
assert!(outbox.poll(&mut cx).is_ready());
679+
}
514680
}

0 commit comments

Comments
 (0)