Skip to content

Commit 1897cd7

Browse files
committed
Add a config option to distribute intervals equidistantly
1 parent 575f6e3 commit 1897cd7

File tree

5 files changed

+134
-35
lines changed

5 files changed

+134
-35
lines changed

src/config/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ use crate::{
2424
pub struct Config {
2525
#[serde(with = "humantime_serde")]
2626
pub interval: Duration,
27+
// Distribute intervals equidistantly, which helps avoid periodic CPU peaks and request peaks
28+
// when there are many tasks.
29+
#[serde(default)]
30+
pub equidistant_intervals: bool,
2731
reporter: Accessor<Option<ConfigReporterRaw>>,
2832
#[serde(default)]
2933
platform: Accessor<PlatformGlobal>,
@@ -102,7 +106,7 @@ impl Config {
102106
name.clone(),
103107
SubscriptionRef {
104108
platform: &subscription.platform,
105-
interval: subscription.interval,
109+
interval: subscription.interval.unwrap_or(self.interval),
106110
notify: subscription
107111
.notify_ref
108112
.iter()
@@ -184,7 +188,7 @@ pub struct SubscriptionRaw {
184188
#[derive(Debug, PartialEq)]
185189
pub struct SubscriptionRef<'a> {
186190
pub platform: &'a Accessor<SourceConfig>,
187-
pub interval: Option<Duration>,
191+
pub interval: Duration,
188192
pub notify: Vec<Accessor<NotifierConfig>>,
189193
}
190194

@@ -378,6 +382,7 @@ mod tests {
378382
Config::parse_for_test(
379383
r#"
380384
interval = '1min'
385+
equidistant_intervals = true
381386
reporter = { log = { notify = ["meow"] }, heartbeat = { type = "HttpGet", url = "https://example.com/", interval = '1min' } }
382387
383388
[platform.QQ.account.MyQQ]
@@ -412,6 +417,7 @@ notify = ["meow", "woof", { ref = "woof", id = 123 }]
412417
|c| {
413418
assert_eq!(c.unwrap(), &Config {
414419
interval: Duration::from_secs(60), // 1min
420+
equidistant_intervals: true,
415421
reporter: Accessor::new(Some(ConfigReporterRaw {
416422
log: Accessor::new(Some(ConfigReporterLog {
417423
notify_ref: vec![NotifyRef::Direct("meow".into())],
@@ -668,7 +674,7 @@ notify = ["meow", { ref = "woof", thread_id = 114 }, { ref = "woof", switch = {
668674
platform: &Accessor::new(SourceConfig::BilibiliLive(Accessor::new(
669675
bilibili::source::live::ConfigParams { user_id: 123456 }
670676
))),
671-
interval: None,
677+
interval: Duration::from_secs(60),
672678
notify: vec![
673679
Accessor::new(NotifierConfig::Telegram(Accessor::new(
674680
telegram::notify::ConfigParams {

src/lib.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,26 @@ pub async fn run(args: cli::Args) -> anyhow::Result<()> {
3333
)
3434
.await?;
3535

36-
let subscription_tasks = config.subscriptions().map(|(name, subscription)| {
37-
Box::new(TaskSubscription::new(
38-
name,
39-
subscription.interval.unwrap_or(config.interval),
40-
subscription.notify,
41-
subscription.platform,
42-
)) as Box<dyn Task>
36+
let initial_offsets = config.equidistant_intervals.then(|| {
37+
task::equidistant_intervals(
38+
config
39+
.subscriptions()
40+
.map(|(_, subscription)| subscription.interval),
41+
)
4342
});
43+
44+
let subscription_tasks = config
45+
.subscriptions()
46+
.enumerate()
47+
.map(|(i, (name, subscription))| {
48+
Box::new(TaskSubscription::new(
49+
name,
50+
subscription.interval,
51+
initial_offsets.as_ref().map(|v| *v.get(i).unwrap()),
52+
subscription.notify,
53+
subscription.platform,
54+
)) as Box<dyn Task>
55+
});
4456
let reporter_task = config
4557
.reporter()
4658
.map(|params| Box::new(TaskReporter::new(params)) as Box<dyn Task>);

src/task/equidistant.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::{collections::HashMap, time::Duration};
2+
3+
pub fn equidistant_intervals(intervals: impl IntoIterator<Item = Duration>) -> Vec<Duration> {
4+
let intervals = intervals.into_iter().collect::<Vec<_>>();
5+
let mut states = intervals
6+
.iter()
7+
.fold(HashMap::new(), |mut acc, dur| {
8+
*acc.entry(dur).or_insert(0) += 1;
9+
acc
10+
})
11+
.into_iter()
12+
.map(|(dur, num)| (*dur, (dur.div_f32(num as f32), 0)))
13+
.collect::<HashMap<_, _>>();
14+
intervals
15+
.into_iter()
16+
.map(|dur| {
17+
let (offset, occurrences) = states.get_mut(&dur).unwrap();
18+
let specific_offset = *offset * (*occurrences);
19+
*occurrences += 1;
20+
specific_offset
21+
})
22+
.collect()
23+
}
24+
25+
#[cfg(test)]
26+
mod tests {
27+
use std::iter::repeat_n;
28+
29+
use super::*;
30+
31+
#[test]
32+
fn equidistant_intervals_algorithm() {
33+
let sec = Duration::from_secs;
34+
let min = |mins: u64| sec(mins * 60);
35+
36+
assert_eq!(
37+
equidistant_intervals(repeat_n(min(1), 10).chain(repeat_n(min(10), 3))),
38+
[
39+
Duration::ZERO,
40+
sec(6) * 1,
41+
sec(6) * 2,
42+
sec(6) * 3,
43+
sec(6) * 4,
44+
sec(6) * 5,
45+
sec(6) * 6,
46+
sec(6) * 7,
47+
sec(6) * 8,
48+
sec(6) * 9,
49+
Duration::ZERO,
50+
sec(200) * 1,
51+
sec(200) * 2,
52+
]
53+
);
54+
}
55+
}

src/task/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
mod equidistant;
12
mod reporter;
23
mod subscription;
34

45
use std::{future::Future, pin::Pin};
56

7+
pub use equidistant::equidistant_intervals;
68
pub use reporter::TaskReporter;
79
use spdlog::prelude::*;
810
pub use subscription::TaskSubscription;

src/task/subscription.rs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{fmt::Display, future::Future, pin::Pin, time::Duration};
22

3+
use humantime_serde::re::humantime;
34
use spdlog::prelude::*;
45
use tokio::{sync::mpsc, time::MissedTickBehavior};
56

@@ -13,6 +14,7 @@ use crate::{
1314
pub struct TaskSubscription {
1415
name: String,
1516
interval: Duration,
17+
initial_offset: Option<Duration>,
1618
notifiers: Vec<Box<dyn NotifierTrait>>,
1719
sourcer: Option<Sourcer>, // took when the task is running
1820
}
@@ -21,52 +23,74 @@ impl TaskSubscription {
2123
pub fn new(
2224
name: String,
2325
interval: Duration,
26+
initial_offset: Option<Duration>,
2427
notify: Vec<Accessor<NotifierConfig>>,
2528
source_platform: &Accessor<SourceConfig>,
2629
) -> Self {
30+
trace!(
31+
"task subscription '{name}' created, source '{source_platform}', interval {} (initial offset {})",
32+
humantime::format_duration(interval),
33+
humantime::format_duration(initial_offset.unwrap_or_default())
34+
);
2735
Self {
2836
name,
2937
interval,
38+
initial_offset,
3039
notifiers: notify.into_iter().map(notifier).collect(),
3140
sourcer: Some(sourcer(source_platform)),
3241
}
3342
}
3443

3544
// Handler for poll-based subscription
3645
async fn continuous_fetch(&mut self, fetcher: Box<dyn FetcherTrait>) {
37-
let mut interval = tokio::time::interval(self.interval);
38-
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
39-
interval.reset_immediately();
40-
4146
let mut last_status = Status::empty();
4247

43-
loop {
44-
interval.tick().await;
45-
46-
let Ok(mut status) = fetcher.fetch_status().await.inspect_err(|err| {
47-
error!(
48-
"failed to fetch status for '{}' on '{}': {err}",
49-
self.name, fetcher
50-
)
51-
}) else {
52-
continue;
53-
};
54-
55-
status.sort();
48+
// Fetch for the first time immediately
49+
self.continuous_fetch_once(&*fetcher, &mut last_status)
50+
.await;
5651

57-
trace!(
58-
"status of '{}' on '{fetcher}' now is '{status:?}'",
59-
self.name
60-
);
52+
if let Some(initial_offset) = self.initial_offset {
53+
tokio::time::sleep(initial_offset).await;
54+
}
6155

62-
let notifications = status.generate_notifications(&last_status);
63-
self.notify(notifications, &fetcher).await;
56+
let mut interval = tokio::time::interval(self.interval);
57+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
6458

65-
last_status.update_incrementally(status);
66-
trace!("subscription '{}' updated once", self.name);
59+
loop {
60+
interval.tick().await;
61+
self.continuous_fetch_once(&*fetcher, &mut last_status)
62+
.await;
6763
}
6864
}
6965

66+
async fn continuous_fetch_once(
67+
&mut self,
68+
fetcher: &dyn FetcherTrait,
69+
last_status: &mut Status,
70+
) {
71+
let Ok(mut status) = fetcher.fetch_status().await.inspect_err(|err| {
72+
error!(
73+
"failed to fetch status for '{}' on '{}': {err}",
74+
self.name, fetcher
75+
)
76+
}) else {
77+
return;
78+
};
79+
80+
status.sort();
81+
82+
trace!(
83+
"status of '{}' on '{fetcher}' now is '{status:?}'",
84+
self.name
85+
);
86+
87+
let notifications = status.generate_notifications(last_status);
88+
self.notify(notifications, &fetcher).await;
89+
90+
last_status.update_incrementally(status);
91+
trace!("subscription '{}' updated once", self.name);
92+
}
93+
7094
// Handler for listen-based subscription
7195
async fn continuous_wait(
7296
&mut self,

0 commit comments

Comments
 (0)