-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathcommon.rs
More file actions
77 lines (64 loc) · 2.33 KB
/
common.rs
File metadata and controls
77 lines (64 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! A common suite of structs, functions et al that are useful for the
//! benchmarking of vector transforms.
use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
};
use futures::{Stream, task::noop_waker};
use vector::event::{Event, LogEvent};
// == Streams ==
/// Consume a `Stream<T>` and do nothing with the received Items, runs to
/// completion
pub fn consume<T>(mut stream: Pin<Box<dyn Stream<Item = T>>>) {
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
while let Poll::Ready(Some(_)) = stream.as_mut().poll_next(&mut context) {}
}
// ==== FixedLogStream ====
/// A fixed size [`futures::stream::Stream`] of `Event::Log` instances.
#[derive(Debug, Clone)]
pub struct FixedLogStream {
events: Vec<Event>,
}
impl FixedLogStream {
/// Create a new `FixedLogStream` with `total` unspecified `Event` instances
/// internal. `cycle_size` controls how often an `Event` will repeat.
///
/// This constructor is useful for benchmarks where you do not care how the
/// `Event`s are shaped, only that they exist.
pub fn new(total: NonZeroUsize, cycle_size: NonZeroUsize) -> Self {
let mut events = Vec::with_capacity(total.get());
let mut cycle = 0;
for _ in 0..total.get() {
events.push(Event::Log(LogEvent::from(format!("event{cycle}"))));
cycle = (cycle + 1) % cycle_size;
}
Self::new_from_vec(events)
}
/// Create a new `FixedLogStream` from an `Vec<Event>`
///
/// This constructor is useful for benchmarks where you do care how the
/// `Event`s are shaped, that is, their specific details are relevant to the
/// measure you're trying to establish.
pub fn new_from_vec(events: Vec<Event>) -> Self {
FixedLogStream { events }
}
/// Return the length of the fixed stream
///
/// This function will return the length of the items remaining in the
/// stream.
pub fn len(&self) -> usize {
self.events.len()
}
}
impl Stream for FixedLogStream {
type Item = Event;
fn poll_next(self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Poll::Ready(this.events.pop())
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.events.len(), Some(self.events.len()))
}
}