Skip to content

Commit 06f5999

Browse files
authored
feat: Update trackers with current torrent state (#180)
2 parents 9c25a00 + 3dc87d2 commit 06f5999

File tree

4 files changed

+172
-14
lines changed

4 files changed

+172
-14
lines changed

crates/libtortillas/src/torrent/actor.rs

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
TorrentExport,
3232
piece_manager::{FilePieceManager, PieceManager},
3333
},
34-
tracker::{Tracker, TrackerActor, udp::UdpServer},
34+
tracker::{Event, Tracker, TrackerActor, TrackerMessage, TrackerUpdate, udp::UdpServer},
3535
};
3636
pub const BLOCK_SIZE: usize = 16 * 1024;
3737

@@ -257,26 +257,48 @@ impl TorrentActor {
257257
if self.is_full() {
258258
self.state = TorrentState::Seeding;
259259
info!(id = %self.info_hash(), "Torrent is now seeding");
260+
self
261+
.update_trackers(TrackerUpdate::Event(Event::Completed))
262+
.await;
260263
} else {
261264
self.state = TorrentState::Downloading;
262265
info!(id = %self.info_hash(), "Torrent is now downloading");
263266

264267
trace!(id = %self.info_hash(), peer_count = self.peers.len(), "Requesting first piece from peers");
265268

266269
self.next_piece = self.bitfield.first_zero().unwrap_or_default();
270+
// Announce that we have started
271+
self
272+
.update_trackers(TrackerUpdate::Event(Event::Started))
273+
.await;
274+
275+
// Force announce
276+
self.broadcast_to_trackers(TrackerMessage::Announce).await;
277+
278+
// Now apperently we're supposed to set our event back to "empty" for the next
279+
// announce (done via the interval), no clue why, just the way it's
280+
// specified in the spec.
281+
self
282+
.update_trackers(TrackerUpdate::Event(Event::Empty))
283+
.await;
284+
267285
// Request first piece from peers
268286
self
269287
.broadcast_to_peers(PeerTell::NeedPiece(self.next_piece, 0, BLOCK_SIZE))
270288
.await;
271289
self.start_time = Some(Instant::now());
272290
}
291+
// Send ready hook
273292
if let Some(err) = self.ready_hook.take().and_then(|hook| hook.send(()).err()) {
274293
error!(?err, "Failed to send ready hook");
275294
}
295+
276296
let Some(info) = self.info.as_ref() else {
277297
warn!(id = %self.info_hash(), "Start requested before info dict is available; deferring");
278298
return;
279299
};
300+
301+
// Start piece manager
280302
self
281303
.piece_manager
282304
// Probably not the best to clone here, but should be fine for now
@@ -297,6 +319,60 @@ impl TorrentActor {
297319
);
298320
}
299321

322+
/// Calculates the total number of bytes downloaded by the torrent. Returns
323+
/// None if the info dict is not present.
324+
pub fn total_bytes_downloaded(&self) -> Option<usize> {
325+
let info = self.info_dict()?;
326+
let total_length = info.total_length();
327+
let piece_length = info.piece_length as usize;
328+
329+
let num_pieces = self.bitfield.len();
330+
let mut total_bytes = 0usize;
331+
332+
// Calculate the size of the last piece
333+
let last_piece_len = if total_length % piece_length == 0 {
334+
piece_length
335+
} else {
336+
total_length % piece_length
337+
};
338+
339+
// Sum bytes from completed pieces
340+
for piece_idx in 0..num_pieces {
341+
if self.bitfield[piece_idx] {
342+
let piece_size = if piece_idx == num_pieces - 1 {
343+
last_piece_len
344+
} else {
345+
piece_length
346+
};
347+
total_bytes = total_bytes.saturating_add(piece_size);
348+
}
349+
}
350+
351+
// Sum bytes from incomplete pieces via block_map
352+
for (piece_idx, block) in self.block_map.iter().enumerate() {
353+
if piece_idx < num_pieces && !self.bitfield[piece_idx] {
354+
let piece_size = if piece_idx == num_pieces - 1 {
355+
last_piece_len
356+
} else {
357+
piece_length
358+
};
359+
360+
let mut piece_offset = 0usize;
361+
for block_idx in 0..block.len() {
362+
if block[block_idx] {
363+
let block_size = (piece_size - piece_offset).min(BLOCK_SIZE);
364+
total_bytes = total_bytes.saturating_add(block_size);
365+
piece_offset = piece_offset.saturating_add(block_size);
366+
} else {
367+
piece_offset =
368+
piece_offset.saturating_add(BLOCK_SIZE.min(piece_size - piece_offset));
369+
}
370+
}
371+
}
372+
}
373+
374+
Some(total_bytes)
375+
}
300376
pub fn export(&self) -> TorrentExport {
301377
TorrentExport {
302378
info_hash: self.info_hash(),
@@ -463,6 +539,59 @@ impl TorrentActor {
463539
}
464540
// Returns immediately, without waiting for any peer responses
465541
}
542+
543+
/// Broadcasts a [`TrackerUpdate`] to all trackers concurrently. similar to
544+
/// [`Self::broadcast_to_peers`], but for trackers.
545+
#[instrument(skip(self, message), fields(torrent_id = %self.info_hash()))]
546+
pub(super) async fn update_trackers(&self, message: TrackerUpdate) {
547+
let trackers = self.trackers.clone();
548+
549+
let actor_refs: Vec<(Tracker, ActorRef<TrackerActor>)> = trackers
550+
.iter()
551+
.map(|entry| (entry.key().clone(), entry.value().clone()))
552+
.collect();
553+
554+
for (uri, actor) in actor_refs {
555+
let msg = message.clone();
556+
let trackers = trackers.clone();
557+
558+
tokio::spawn(async move {
559+
if actor.is_alive() {
560+
if let Err(e) = actor.tell(msg).await {
561+
warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker");
562+
}
563+
} else {
564+
trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set");
565+
trackers.remove(&uri);
566+
}
567+
});
568+
}
569+
}
570+
571+
pub(super) async fn broadcast_to_trackers(&self, message: TrackerMessage) {
572+
let trackers = self.trackers.clone();
573+
574+
let actor_refs: Vec<(Tracker, ActorRef<TrackerActor>)> = trackers
575+
.iter()
576+
.map(|entry| (entry.key().clone(), entry.value().clone()))
577+
.collect();
578+
579+
for (uri, actor) in actor_refs {
580+
let trackers = trackers.clone();
581+
582+
tokio::spawn(async move {
583+
if actor.is_alive() {
584+
if let Err(e) = actor.tell(message).await {
585+
warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker");
586+
}
587+
} else {
588+
trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set");
589+
trackers.remove(&uri);
590+
}
591+
});
592+
}
593+
}
594+
466595
/// Gets the path to a piece file based on the index. Only should be used
467596
/// when the piece storage strategy is [`Disk`](PieceStorageStrategy::Disk),
468597
/// this function will panic otherwise.

crates/libtortillas/src/torrent/messages.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
peer::{Peer, PeerId, PeerTell},
2626
protocol::stream::PeerStream,
2727
torrent::{PieceManagerProxy, piece_manager::PieceManager},
28-
tracker::Tracker,
28+
tracker::{Event, Tracker, TrackerMessage, TrackerUpdate},
2929
};
3030

3131
/// For incoming from outside sources (e.g Peers, Trackers and Engine)
@@ -196,6 +196,8 @@ impl Message<TorrentMessage> for TorrentActor {
196196
.info_dict()
197197
.expect("Can't receive piece without info dict");
198198

199+
let total_length = info_dict.total_length();
200+
199201
let block_index = offset / BLOCK_SIZE;
200202
let piece_count = info_dict.piece_count();
201203

@@ -296,9 +298,24 @@ impl Message<TorrentMessage> for TorrentActor {
296298

297299
// Announce to peers that we have this piece
298300
self.broadcast_to_peers(PeerTell::Have(cur_piece)).await;
301+
302+
if let Some(total_downloaded) = self.total_bytes_downloaded() {
303+
let total_bytes_left = total_length - total_downloaded;
304+
self
305+
.update_trackers(TrackerUpdate::Left(total_bytes_left))
306+
.await;
307+
}
308+
299309
if self.next_piece >= piece_count - 1 {
300310
// Handle end of torrenting process
301311
self.state = TorrentState::Seeding;
312+
313+
// Announce to trackers that we have completed the torrent
314+
self
315+
.update_trackers(TrackerUpdate::Event(Event::Completed))
316+
.await;
317+
self.broadcast_to_trackers(TrackerMessage::Announce).await;
318+
302319
info!("Torrenting process completed, switching to seeding mode");
303320
} else {
304321
self

crates/libtortillas/src/tracker/http.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,12 @@ impl TrackerRequest {
8080
if let Some(left) = self.left {
8181
params.push(format!("left={left}"));
8282
}
83-
let event_str = format!("{:?}", self.event).to_lowercase(); // Hack to get the string representation of the enum
8483

85-
params.push(format!("event={event_str}"));
84+
// Don't include event if it's empty
85+
if self.event != Event::Empty {
86+
params.push(format!("event={:?}", self.event).to_lowercase());
87+
}
88+
8689
params.push(format!("compact={}", self.compact.unwrap_or(true) as u8));
8790

8891
params.join("&")

crates/libtortillas/src/tracker/mod.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -291,27 +291,26 @@ impl Actor for TrackerActor {
291291
}
292292

293293
async fn next(
294-
&mut self, _: kameo::prelude::WeakActorRef<Self>,
294+
&mut self, actor_ref: kameo::prelude::WeakActorRef<Self>,
295295
mailbox_rx: &mut kameo::prelude::MailboxReceiver<Self>,
296296
) -> Option<Signal<Self>> {
297297
tokio::select! {
298298
signal = mailbox_rx.recv() => signal,
299299
// Waits for the next interval to tick
300300
_ = self.interval.tick() => {
301-
if let Ok(peers) = self.tracker.announce().await {
302-
let _ = self.supervisor.tell(TorrentMessage::Announce(peers)).await;
303-
}
304-
let duration = Duration::from_secs(self.tracker.interval() as u64);
305-
self.interval = interval(duration);
306-
307-
None
308-
}
301+
let msg = TrackerMessage::Announce;
302+
Some(Signal::Message{ message: Box::new(msg),
303+
actor_ref: actor_ref.upgrade()?.clone(),
304+
reply: None,
305+
sent_within_actor: true,
306+
})}
309307
}
310308
}
311309
}
312310

313311
/// A message from an outside source.
314312
#[allow(dead_code)]
313+
#[derive(Debug, Clone, Copy)]
315314
pub(crate) enum TrackerMessage {
316315
/// Forces the tracker to make an announce request. By default, announce
317316
/// requests are made on an interval.
@@ -333,6 +332,15 @@ impl Message<TrackerMessage> for TrackerActor {
333332
Ok(peers) => {
334333
if let Err(e) = self.supervisor.tell(TorrentMessage::Announce(peers)).await {
335334
error!("Failed to send announce to supervisor: {}", e);
335+
} else {
336+
// Ensure we don't have a delay of 0
337+
let delay = self.tracker.interval().max(1) as u64;
338+
339+
self.interval = interval(Duration::from_secs(delay));
340+
// Tick because when starting a new interval, it will tick immediately and
341+
// cause a never ending loop, adding this doesn't add any delay and fixes that
342+
// issue
343+
self.interval.tick().await;
336344
}
337345
}
338346
Err(e) => {
@@ -347,6 +355,7 @@ impl Message<TrackerMessage> for TrackerActor {
347355
}
348356

349357
/// Updates the tracker's announce fields
358+
#[derive(Debug, Clone)]
350359
pub enum TrackerUpdate {
351360
/// The amount of data uploaded, in bytes
352361
Uploaded(usize),
@@ -383,8 +392,8 @@ impl Message<TrackerUpdate> for TrackerActor {
383392
)]
384393
#[repr(u32)]
385394
pub enum Event {
386-
Empty = 0,
387395
#[default]
396+
Empty = 0,
388397
Started = 1,
389398
Completed = 2,
390399
Stopped = 3,

0 commit comments

Comments
 (0)