Skip to content

Commit a568a5a

Browse files
author
Ariel Ben-Yehuda
committed
feat: implement MultiReporter
1 parent 9ed025b commit a568a5a

File tree

5 files changed

+193
-0
lines changed

5 files changed

+193
-0
lines changed

Cargo.lock

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ aws-arn = { version = "0.3", optional = true }
1313
aws-config = { version = "1", optional = true }
1414
aws-sdk-s3 = { version = "1", optional = true }
1515
chrono = "0.4"
16+
futures = "0.3"
1617
libloading = "0.8"
1718
reqwest = { version = "0.12", default-features = false, optional = true, features = ["charset", "http2", "rustls-tls"] }
1819
serde_json = "1"

src/metadata/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,12 @@ pub struct ReportMetadata<'a> {
3030

3131
#[cfg(feature = "aws-metadata")]
3232
pub mod aws;
33+
34+
/// [private] dummy metadata to make testing easier
35+
#[cfg(test)]
36+
pub(crate) const DUMMY_METADATA: ReportMetadata<'static> = ReportMetadata {
37+
instance: &AgentMetadata::Other,
38+
start: Duration::from_secs(1),
39+
end: Duration::from_secs(2),
40+
reporting_interval: Duration::from_secs(1),
41+
};

src/reporter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_trait::async_trait;
77

88
use crate::metadata::ReportMetadata;
99

10+
pub mod multi;
1011
#[cfg(feature = "s3")]
1112
pub mod s3;
1213

src/reporter/multi.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use async_trait::async_trait;
2+
3+
use crate::metadata::ReportMetadata;
4+
5+
use super::Reporter;
6+
7+
#[derive(Debug)]
8+
/// A reporter that reports profiling results to several destinations.
9+
///
10+
/// If one of the destinations errors, it will continue reporting to the other ones.
11+
pub struct MultiReporter {
12+
reporters: Vec<Box<dyn Reporter + Send + Sync>>,
13+
}
14+
15+
impl MultiReporter {
16+
/// Create a new MultiReporter from a set of reporters
17+
pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
18+
MultiReporter { reporters }
19+
}
20+
}
21+
22+
#[async_trait]
23+
impl Reporter for MultiReporter {
24+
async fn report(
25+
&self,
26+
jfr: Vec<u8>,
27+
metadata: &ReportMetadata,
28+
) -> Result<(), Box<dyn std::error::Error + Send>> {
29+
let errs = futures::future::join_all(
30+
self.reporters
31+
.iter()
32+
.map(|reporter| reporter.report(jfr.clone(), metadata)),
33+
)
34+
.await;
35+
// return the first error
36+
errs.into_iter().collect()
37+
}
38+
}
39+
40+
#[cfg(test)]
41+
pub mod test {
42+
use std::{
43+
sync::{
44+
atomic::{self, AtomicBool},
45+
Arc,
46+
},
47+
time::Duration,
48+
};
49+
50+
use async_trait::async_trait;
51+
52+
use crate::{
53+
metadata::{ReportMetadata, DUMMY_METADATA},
54+
reporter::Reporter,
55+
};
56+
57+
use super::MultiReporter;
58+
59+
#[derive(Debug)]
60+
struct OkReporter(Arc<AtomicBool>);
61+
#[async_trait]
62+
impl Reporter for OkReporter {
63+
async fn report(
64+
&self,
65+
_jfr: Vec<u8>,
66+
_metadata: &ReportMetadata,
67+
) -> Result<(), Box<dyn std::error::Error + Send>> {
68+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
69+
self.0.store(true, atomic::Ordering::Relaxed);
70+
Ok(())
71+
}
72+
}
73+
74+
#[derive(Debug, thiserror::Error)]
75+
enum Error {
76+
#[error("failed")]
77+
Failed,
78+
}
79+
80+
#[derive(Debug)]
81+
struct ErrReporter;
82+
#[async_trait]
83+
impl Reporter for ErrReporter {
84+
async fn report(
85+
&self,
86+
_jfr: Vec<u8>,
87+
_metadata: &ReportMetadata,
88+
) -> Result<(), Box<dyn std::error::Error + Send>> {
89+
Err(Box::new(Error::Failed))
90+
}
91+
}
92+
93+
#[tokio::test(start_paused = true)]
94+
async fn test_multi_reporter_ok() {
95+
let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
96+
let reporter = MultiReporter::new(
97+
signals
98+
.iter()
99+
.map(|signal| {
100+
Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
101+
})
102+
.collect(),
103+
);
104+
// test that reports are done in parallel
105+
tokio::time::timeout(
106+
Duration::from_secs(2),
107+
reporter.report(vec![], &DUMMY_METADATA),
108+
)
109+
.await
110+
.unwrap()
111+
.unwrap();
112+
// test that reports are done
113+
assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
114+
}
115+
116+
#[tokio::test(start_paused = true)]
117+
async fn test_multi_reporter_err() {
118+
let signal_before = Arc::new(AtomicBool::new(false));
119+
let signal_after = Arc::new(AtomicBool::new(false));
120+
let reporter = MultiReporter::new(vec![
121+
Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
122+
Box::new(ErrReporter) as Box<dyn Reporter + Send + Sync>,
123+
Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
124+
]);
125+
// test that reports are done and return an error
126+
reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err();
127+
// test that reports are done even though a reporter errored
128+
assert!(signal_before.load(atomic::Ordering::Relaxed));
129+
assert!(signal_after.load(atomic::Ordering::Relaxed));
130+
}
131+
}

0 commit comments

Comments
 (0)