Skip to content

Commit 6ba9699

Browse files
committed
Merge #1451: Temporary solution to handle lagged stats listeners
6fdbc47 fix: [#1449] don't stop stats listeners when lagged (Jose Celano) ed8acac fix: [#1449] increase broadcast channel capacity (Jose Celano) Pull request description: Relates to: #1449 When I switched the channels for statistics from [tokio mpsc](https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html) to [tokio broadcast](https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html), I introduced a bug that was only revealed after deploying the demo tracker. The problem is that MPSC provides backpressure, so all events are processed. However, the broadcast channel has a limit, and listeners are informed with a `Lagged` error when the limit has been reached and events have been deleted. We need to decide if: 1. We go back to mpsc channel 2. Or we implement another solution 3. Or we accept imprecise metrics on a high load In the meantime, I have increased the channel capacity to be able to collect all events at the current demo peak load. That should avoid the problem of loosing events in practice. I have also fixed the listener to handle the `Lagged` error. In this case, we don't have to break the loop but continue processing events to update metrics. The only difference is metrics will not be precise after this error happens because some events were lost. ACKs for top commit: josecelano: ACK 6fdbc47 Tree-SHA512: 3bc60ddb253cf8b6873abd5e6300f8731ee677338c6944360caa10a023d470c258ff163f48d5d4e1c0350266a73e6709910a30e728890ff5295353c262b46380
2 parents 30684b5 + 6fdbc47 commit 6ba9699

File tree

8 files changed

+39
-12
lines changed

8 files changed

+39
-12
lines changed

packages/http-tracker-core/src/event/sender.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

10-
const CHANNEL_CAPACITY: usize = 1024;
10+
const CHANNEL_CAPACITY: usize = 32768;
1111

1212
/// A trait for sending sending.
1313
#[cfg_attr(test, automock)]

packages/http-tracker-core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub(crate) type CurrentClock = clock::Working;
1616
#[allow(dead_code)]
1717
pub(crate) type CurrentClock = clock::Stopped;
1818

19+
pub const HTTP_TRACKER_LOG_TARGET: &str = "HTTP TRACKER";
20+
1921
#[cfg(test)]
2022
pub(crate) mod tests {
2123
use bittorrent_primitives::info_hash::InfoHash;

packages/http-tracker-core/src/statistics/event/listener.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@ use torrust_tracker_clock::clock::Time;
44
use super::handler::handle_event;
55
use crate::event::Event;
66
use crate::statistics::repository::Repository;
7-
use crate::CurrentClock;
7+
use crate::{CurrentClock, HTTP_TRACKER_LOG_TARGET};
88

99
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
1010
loop {
1111
match receiver.recv().await {
1212
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
1313
Err(e) => {
14-
tracing::error!("Error receiving http tracker core event: {:?}", e);
15-
break;
14+
match e {
15+
broadcast::error::RecvError::Closed => {
16+
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver closed.");
17+
break;
18+
}
19+
broadcast::error::RecvError::Lagged(n) => {
20+
// From now on, metrics will be imprecise
21+
tracing::warn!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver lagged by {} events.", n);
22+
}
23+
}
1624
}
1725
}
1826
}

packages/udp-tracker-core/src/event/sender.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

10-
const CHANNEL_CAPACITY: usize = 1024;
10+
const CHANNEL_CAPACITY: usize = 32768;
1111

1212
/// A trait for sending sending.
1313
#[cfg_attr(test, automock)]

packages/udp-tracker-core/src/statistics/event/listener.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@ use torrust_tracker_clock::clock::Time;
44
use super::handler::handle_event;
55
use crate::event::Event;
66
use crate::statistics::repository::Repository;
7-
use crate::CurrentClock;
7+
use crate::{CurrentClock, UDP_TRACKER_LOG_TARGET};
88

99
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
1010
loop {
1111
match receiver.recv().await {
1212
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
1313
Err(e) => {
14-
tracing::error!("Error receiving udp tracker core event: {:?}", e);
15-
break;
14+
match e {
15+
broadcast::error::RecvError::Closed => {
16+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver closed.");
17+
break;
18+
}
19+
broadcast::error::RecvError::Lagged(n) => {
20+
// From now on, metrics will be imprecise
21+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver lagged by {} events.", n);
22+
}
23+
}
1624
}
1725
}
1826
}

packages/udp-tracker-server/src/event/sender.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

10-
const CHANNEL_CAPACITY: usize = 1024;
10+
const CHANNEL_CAPACITY: usize = 32768;
1111

1212
/// A trait for sending sending.
1313
#[cfg_attr(test, automock)]

packages/udp-tracker-server/src/statistics/event/listener.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
12
use tokio::sync::broadcast;
23
use torrust_tracker_clock::clock::Time;
34

@@ -11,8 +12,16 @@ pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_rep
1112
match receiver.recv().await {
1213
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
1314
Err(e) => {
14-
tracing::error!("Error receiving udp tracker server event: {:?}", e);
15-
break;
15+
match e {
16+
broadcast::error::RecvError::Closed => {
17+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver closed.");
18+
break;
19+
}
20+
broadcast::error::RecvError::Lagged(n) => {
21+
// From now on, metrics will be imprecise
22+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver lagged by {} events.", n);
23+
}
24+
}
1625
}
1726
}
1827
}

src/bootstrap/jobs/torrent_cleanup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub fn start_job(config: &Core, torrents_manager: &Arc<TorrentsManager>) -> Join
3737
loop {
3838
tokio::select! {
3939
_ = tokio::signal::ctrl_c() => {
40-
tracing::info!("Stopping torrent cleanup job..");
40+
tracing::info!("Stopping torrent cleanup job ...");
4141
break;
4242
}
4343
_ = interval.tick() => {

0 commit comments

Comments
 (0)