Skip to content

Commit f12ce29

Browse files
authored
feat: Implement a blocking start method & auto starting functionality (#150)
2 parents a7bed75 + 48c6b2f commit f12ce29

File tree

6 files changed

+274
-73
lines changed

6 files changed

+274
-73
lines changed

crates/libtortillas/src/engine/actor.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ pub struct EngineActor {
5151

5252
/// Mailbox size for each torrent instance
5353
pub(crate) mailbox_size: usize,
54+
55+
/// If we autostart torrents
56+
pub(crate) autostart: Option<bool>,
57+
/// How many peers we need to have before we start downloading
58+
pub(crate) sufficient_peers: Option<usize>,
5459
}
5560

5661
pub(crate) type EngineActorArgs = (
@@ -68,6 +73,10 @@ pub(crate) type EngineActorArgs = (
6873
//
6974
// If 0 is provided, the mailbox size will be unbounded
7075
Option<usize>,
76+
// If we autostart torrents
77+
Option<bool>,
78+
// How many peers we need to have before we start downloading
79+
Option<usize>,
7180
);
7281

7382
impl Actor for EngineActor {
@@ -86,8 +95,16 @@ impl Actor for EngineActor {
8695
async fn on_start(
8796
args: Self::Args, actor_ref: kameo::prelude::ActorRef<Self>,
8897
) -> Result<Self, Self::Error> {
89-
let (tcp_addr, utp_addr, udp_addr, peer_id, default_piece_storage_strategy, mailbox_size) =
90-
args;
98+
let (
99+
tcp_addr,
100+
utp_addr,
101+
udp_addr,
102+
peer_id,
103+
default_piece_storage_strategy,
104+
mailbox_size,
105+
autostart,
106+
sufficient_peers,
107+
) = args;
91108

92109
let tcp_addr = tcp_addr.unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0)));
93110
// Should this be port 6881?
@@ -112,6 +129,8 @@ impl Actor for EngineActor {
112129
actor_ref,
113130
default_piece_storage_strategy,
114131
mailbox_size: mailbox_size.unwrap_or(64),
132+
autostart,
133+
sufficient_peers,
115134
})
116135
}
117136

crates/libtortillas/src/engine/messages.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
messages::PeerMessages,
1515
stream::{PeerRecv, PeerStream},
1616
},
17-
torrent::{TorrentActor, TorrentMessage},
17+
torrent::{TorrentActor, TorrentMessage, TorrentState},
1818
};
1919

2020
pub(crate) enum EngineMessage {
@@ -64,7 +64,7 @@ impl Message<EngineMessage> for EngineActor {
6464
EngineMessage::StartAll => {
6565
for torrent in self.torrents.iter() {
6666
torrent
67-
.tell(TorrentMessage::Start)
67+
.tell(TorrentMessage::SetState(TorrentState::Downloading))
6868
.await
6969
.expect("Failed to start torrent");
7070
}
@@ -104,6 +104,8 @@ impl Message<EngineRequest> for EngineActor {
104104
self.udp_server.clone(),
105105
None,
106106
self.default_piece_storage_strategy.clone(),
107+
self.autostart,
108+
self.sufficient_peers,
107109
),
108110
// if the size is 0, we use an unbounded mailbox
109111
match self.mailbox_size {

crates/libtortillas/src/engine/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@ impl Engine {
114114
///
115115
/// Default: `64` when `None` is provided.
116116
mailbox_size: Option<usize>,
117+
/// If we autostart torrents as soon as we have [`Self::sufficient_peers`]
118+
/// peers connected.
119+
/// Default: `true`
120+
autostart: Option<bool>,
121+
/// How many peers we need to have before we start downloading.
122+
///
123+
/// Is ignored if [`Self::autostart`] is `false`.
124+
///
125+
/// Default: `6`
126+
sufficient_peers: Option<usize>,
117127
) -> Self {
118128
let args: EngineActorArgs = (
119129
tcp_addr,
@@ -122,6 +132,8 @@ impl Engine {
122132
Some(custom_id),
123133
piece_storage_strategy,
124134
mailbox_size,
135+
autostart,
136+
sufficient_peers,
125137
);
126138

127139
let actor = EngineActor::spawn(args);

crates/libtortillas/src/torrent/actor.rs

Lines changed: 117 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ use std::{
33
net::SocketAddr,
44
path::PathBuf,
55
sync::{Arc, atomic::AtomicU8},
6+
time::Instant,
67
};
78

89
use anyhow::ensure;
910
use bitvec::vec::BitVec;
1011
use dashmap::DashMap;
1112
use kameo::{Actor, actor::ActorRef, mailbox};
1213
use librqbit_utp::UtpSocketUdp;
13-
use tracing::{debug, error, info, instrument, warn};
14+
use tokio::sync::oneshot;
15+
use tracing::{debug, error, info, instrument, trace, warn};
1416

1517
use super::util;
1618
use crate::{
@@ -62,8 +64,9 @@ pub enum PieceStorageStrategy {
6264
Disk(PathBuf),
6365
}
6466

65-
/// The current state of the torrent, defaults to `Paused`
66-
#[derive(Debug, Default, Clone, Copy)]
67+
/// The current state of the torrent, defaults to
68+
/// [`Inactive`](TorrentState::Inactive)
69+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
6770
pub enum TorrentState {
6871
/// Torrent is downloading new pieces actively
6972
///
@@ -78,6 +81,10 @@ pub enum TorrentState {
7881
Inactive,
7982
}
8083

84+
/// A hook that is called when the torrent is ready to start downloading.
85+
/// This is used to implement [`Torrent::poll_ready`].
86+
pub(super) type ReadyHook = oneshot::Sender<()>;
87+
8188
pub(crate) struct TorrentActor {
8289
pub(crate) peers: Arc<DashMap<PeerId, ActorRef<PeerActor>>>,
8390
pub(crate) trackers: Arc<DashMap<Tracker, ActorRef<TrackerActor>>>,
@@ -98,6 +105,18 @@ pub(crate) struct TorrentActor {
98105
/// blocks we have for each piece. Each entry is deleted when the piece is
99106
/// completed.
100107
pub(super) block_map: Arc<DashMap<usize, BitVec<usize>>>,
108+
109+
pub(super) start_time: Option<Instant>,
110+
/// The number of peers we need to have before we start downloading, defaults
111+
/// to 6.
112+
pub(super) sufficient_peers: usize,
113+
114+
pub(super) autostart: bool,
115+
116+
/// If there is already a pending start, we don't want to start a new one
117+
pub(super) pending_start: bool,
118+
119+
pub(super) ready_hook: Option<ReadyHook>,
101120
}
102121

103122
impl fmt::Display for TorrentActor {
@@ -147,13 +166,66 @@ impl TorrentActor {
147166
self.bitfield.count_zeros() == self.bitfield.len()
148167
}
149168

169+
/// Checks if the torrent is ready to autostart (via [`Self::autostart`]) and
170+
/// torrenting process
171+
pub async fn autostart(&mut self) {
172+
trace!("Checking if we should autostart");
173+
174+
self.pending_start = true;
175+
176+
let is_ready = self.is_ready_to_start();
177+
if is_ready {
178+
if self.autostart {
179+
trace!("Autostarting torrent");
180+
self.start().await;
181+
} else {
182+
// Torrent is ready, but auto-start is disabled
183+
if let Some(err) = self.ready_hook.take().and_then(|hook| hook.send(()).err()) {
184+
error!(?err, "Failed to send ready hook");
185+
}
186+
}
187+
}
188+
189+
self.pending_start = false;
190+
}
191+
192+
pub async fn start(&mut self) {
193+
if self.is_full() {
194+
self.state = TorrentState::Seeding;
195+
info!(id = %self.info_hash(), "Torrent is now seeding");
196+
} else {
197+
self.state = TorrentState::Downloading;
198+
info!(id = %self.info_hash(), "Torrent is now downloading");
199+
200+
trace!(id = %self.info_hash(), peer_count = self.peers.len(), "Requesting first piece from peers");
201+
202+
self.next_piece = self.bitfield.first_zero().unwrap_or_default();
203+
// Request first piece from peers
204+
self
205+
.broadcast_to_peers(PeerTell::NeedPiece(self.next_piece, 0, BLOCK_SIZE))
206+
.await;
207+
self.start_time = Some(Instant::now());
208+
}
209+
if let Some(err) = self.ready_hook.take().and_then(|hook| hook.send(()).err()) {
210+
error!(?err, "Failed to send ready hook");
211+
}
212+
}
213+
150214
/// Checks if the torrent has all of the pieces (we've downloaded/have
151215
/// started with the entire file) by checking if our bitfield is filled with
152216
/// zeroes.
153217
pub fn is_full(&self) -> bool {
154218
self.bitfield.count_ones() == self.bitfield.len()
155219
}
156220

221+
pub fn is_ready(&self) -> bool {
222+
self.info.is_some() && self.peers.len() >= self.sufficient_peers
223+
}
224+
225+
pub fn is_ready_to_start(&self) -> bool {
226+
self.is_ready() && self.state == TorrentState::Inactive
227+
}
228+
157229
/// Spawns a new [`PeerActor`] for the given [`Peer`] and adds it to the
158230
/// torrent's peer set.
159231
///
@@ -310,12 +382,23 @@ impl Actor for TorrentActor {
310382
UdpServer,
311383
Option<SocketAddr>,
312384
PieceStorageStrategy,
385+
Option<bool>,
386+
Option<usize>,
313387
);
314388

315389
type Error = TorrentError;
316390

317391
async fn on_start(args: Self::Args, us: ActorRef<Self>) -> Result<Self, Self::Error> {
318-
let (peer_id, metainfo, utp_server, tracker_server, primary_addr, piece_storage) = args;
392+
let (
393+
peer_id,
394+
metainfo,
395+
utp_server,
396+
tracker_server,
397+
primary_addr,
398+
piece_storage,
399+
autostart,
400+
sufficient_peers,
401+
) = args;
319402
let primary_addr = primary_addr.unwrap_or_else(|| {
320403
let addr = utp_server.bind_addr();
321404
info!("No primary address provided, using {}", addr);
@@ -371,8 +454,23 @@ impl Actor for TorrentActor {
371454
state: TorrentState::default(),
372455
next_piece: 0,
373456
block_map: Arc::new(DashMap::new()),
457+
start_time: None,
458+
sufficient_peers: sufficient_peers.unwrap_or(6),
459+
autostart: autostart.unwrap_or(true),
460+
pending_start: false,
461+
ready_hook: None,
374462
})
375463
}
464+
465+
async fn next(
466+
&mut self, _: kameo::prelude::WeakActorRef<Self>,
467+
mailbox_rx: &mut kameo::prelude::MailboxReceiver<Self>,
468+
) -> Option<mailbox::Signal<Self>> {
469+
if !self.pending_start {
470+
self.autostart().await;
471+
}
472+
mailbox_rx.recv().await
473+
}
376474
}
377475

378476
#[cfg(test)]
@@ -386,7 +484,7 @@ mod tests {
386484
use super::*;
387485
use crate::{
388486
metainfo::{MagnetUri, TorrentFile},
389-
torrent::{TorrentMessage, TorrentRequest, TorrentResponse},
487+
torrent::{Torrent, TorrentRequest, TorrentResponse},
390488
};
391489

392490
#[tokio::test(flavor = "multi_thread")]
@@ -408,6 +506,9 @@ mod tests {
408506
UtpSocket::new_udp(SocketAddr::from_str("0.0.0.0:0").expect("Failed to parse"))
409507
.await
410508
.unwrap();
509+
let sufficient_peers = 6;
510+
511+
let info_hash = metainfo.clone().info_hash().unwrap();
411512

412513
let actor = TorrentActor::spawn((
413514
peer_id,
@@ -416,32 +517,15 @@ mod tests {
416517
udp_server.clone(),
417518
None,
418519
PieceStorageStrategy::default(),
520+
Some(false), // We don't need to autostart because we're only checking if we have peers
521+
Some(sufficient_peers),
419522
));
420523

421-
// Blocking loop that runs until we successfully handshake with atleast 6 peers
422-
loop {
423-
let peers_count = match actor.ask(TorrentRequest::PeerCount).await.unwrap() {
424-
TorrentResponse::PeerCount(count) => count,
425-
_ => unreachable!(),
426-
};
427-
if peers_count > 6 {
428-
break;
429-
} else {
430-
info!(
431-
current_peers_count = peers_count,
432-
"Waiting for more peers...."
433-
)
434-
}
435-
sleep(Duration::from_millis(100)).await;
436-
}
524+
let torrent = Torrent::new(info_hash, actor.clone());
437525

438-
let peers_count = match actor.ask(TorrentRequest::PeerCount).await.unwrap() {
439-
TorrentResponse::PeerCount(count) => count,
440-
_ => unreachable!(),
441-
};
526+
assert!(torrent.poll_ready().await.is_ok());
442527

443528
actor.stop_gracefully().await.expect("Failed to stop");
444-
info!("Connected to {peers_count} peers!")
445529
}
446530

447531
#[tokio::test(flavor = "multi_thread")]
@@ -475,6 +559,8 @@ mod tests {
475559
udp_server.clone(),
476560
None,
477561
PieceStorageStrategy::default(),
562+
Some(false),
563+
None,
478564
));
479565

480566
// Blocking loop that runs until we get an info dict
@@ -509,6 +595,7 @@ mod tests {
509595
MetaInfo::Torrent(file) => file.info.clone(),
510596
_ => unreachable!(),
511597
};
598+
let info_hash = info_dict.hash().unwrap();
512599

513600
// Clears piece files
514601
async fn clear_piece_files(piece_path: &PathBuf) {
@@ -540,28 +627,13 @@ mod tests {
540627
udp_server.clone(),
541628
None,
542629
PieceStorageStrategy::Disk(piece_path.clone()),
630+
None,
631+
None,
543632
));
544633

545-
// Blocking loop that runs until we successfully handshake with atleast 6 peers
546-
loop {
547-
let peers_count = match actor.ask(TorrentRequest::PeerCount).await.unwrap() {
548-
TorrentResponse::PeerCount(count) => count,
549-
_ => unreachable!(),
550-
};
551-
if peers_count > 6 {
552-
break;
553-
} else {
554-
info!(
555-
current_peers_count = peers_count,
556-
"Waiting for more peers...."
557-
)
558-
}
559-
sleep(Duration::from_millis(100)).await;
560-
}
561-
562-
clear_piece_files(&piece_path).await;
634+
let torrent = Torrent::new(info_hash, actor.clone());
563635

564-
actor.tell(TorrentMessage::Start).await.unwrap();
636+
assert!(torrent.poll_ready().await.is_ok());
565637

566638
loop {
567639
let mut entries = fs::read_dir(&piece_path).await.unwrap();

0 commit comments

Comments
 (0)