|
| 1 | +use cadence::{ |
| 2 | + prelude::*, |
| 3 | + StatsdClient, |
| 4 | +}; |
| 5 | + |
| 6 | +use log::*; |
| 7 | +use std::{ |
| 8 | + error::Error, |
| 9 | + io::BufRead, |
| 10 | + sync::{ |
| 11 | + atomic::{ |
| 12 | + AtomicUsize, |
| 13 | + Ordering, |
| 14 | + }, |
| 15 | + Arc, |
| 16 | + }, |
| 17 | + time::Instant, |
| 18 | +}; |
| 19 | + |
| 20 | +use tokio::{ |
| 21 | + net::UdpSocket, |
| 22 | + spawn, |
| 23 | + time::{ |
| 24 | + delay_for, |
| 25 | + Duration, |
| 26 | + }, |
| 27 | +}; |
| 28 | + |
| 29 | +use tokio_cadence::*; |
| 30 | + |
| 31 | +const NUM_TASKS: usize = 100; |
| 32 | +const NUM_LOOPS: usize = 1000; |
| 33 | +const LOOP_DELAY: Duration = Duration::from_millis(0); |
| 34 | +const METRIC_PREFIX: &str = "async_impl"; |
| 35 | +const COUNT_METRIC: &str = "test"; |
| 36 | +const FULL_COUNT_METRIC: &str = "async_impl.test"; |
| 37 | +const CATCHUP_DELAY: Duration = Duration::from_secs(1); |
| 38 | + |
| 39 | +async fn run(client: StatsdClient) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { |
| 40 | + let mut tasks = Vec::with_capacity(NUM_TASKS); |
| 41 | + let start = Instant::now(); |
| 42 | + |
| 43 | + for _ in 0..NUM_TASKS { |
| 44 | + let client = client.clone(); |
| 45 | + let task = spawn(async move { |
| 46 | + for _ in 0..NUM_LOOPS { |
| 47 | + client.incr_with_tags(COUNT_METRIC).send(); |
| 48 | + delay_for(LOOP_DELAY).await; |
| 49 | + } |
| 50 | + }); |
| 51 | + |
| 52 | + tasks.push(task); |
| 53 | + } |
| 54 | + |
| 55 | + for task in tasks { |
| 56 | + task.await?; |
| 57 | + } |
| 58 | + |
| 59 | + info!("elapsed: {}ms", start.elapsed().as_millis()); |
| 60 | + |
| 61 | + Ok(()) |
| 62 | +} |
| 63 | + |
| 64 | +#[tokio::main] |
| 65 | +async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> { |
| 66 | + pretty_env_logger::init(); |
| 67 | + |
| 68 | + let client_socket = UdpSocket::bind("127.0.0.1:0").await?; |
| 69 | + let client_addr = client_socket.local_addr()?; |
| 70 | + |
| 71 | + let mut server_socket = UdpSocket::bind("127.0.0.1:0").await?; |
| 72 | + let server_addr = server_socket.local_addr()?; |
| 73 | + |
| 74 | + let recv_count = Arc::new(AtomicUsize::default()); |
| 75 | + let recv_count_clone = recv_count.clone(); |
| 76 | + |
| 77 | + spawn(async move { |
| 78 | + let mut buf = [0; 8192]; |
| 79 | + |
| 80 | + while let Ok((n, addr)) = server_socket.recv_from(&mut buf).await { |
| 81 | + if addr == client_addr { |
| 82 | + debug!("data: {}", String::from_utf8_lossy(&buf[..n])); |
| 83 | + |
| 84 | + &buf[..n] |
| 85 | + .lines() |
| 86 | + .filter_map(|line| { |
| 87 | + line.ok().and_then(|line| { |
| 88 | + let parts: Vec<_> = line.splitn(2, ':').collect(); |
| 89 | + parts.get(1).and_then(|suffix| { |
| 90 | + let value_parts: Vec<_> = suffix.splitn(3, '|').take(2).collect(); |
| 91 | + value_parts |
| 92 | + .get(1) |
| 93 | + .filter(|&suffix| *suffix == "c") |
| 94 | + .map(|_| { |
| 95 | + ( |
| 96 | + parts[0].to_string(), |
| 97 | + value_parts[0].parse::<usize>().unwrap_or_default(), |
| 98 | + ) |
| 99 | + }) |
| 100 | + }) |
| 101 | + }) |
| 102 | + }) |
| 103 | + .for_each(|(key, value)| { |
| 104 | + if key == FULL_COUNT_METRIC { |
| 105 | + recv_count.fetch_add(value, Ordering::AcqRel); |
| 106 | + } |
| 107 | + }); |
| 108 | + } |
| 109 | + } |
| 110 | + }); |
| 111 | + |
| 112 | + let host = format!("127.0.0.1:{}", server_addr.port()); |
| 113 | + let (sink, process) = TokioBatchUdpMetricSink::from(host, client_socket)?; |
| 114 | + |
| 115 | + let processing_job = spawn(process); |
| 116 | + |
| 117 | + let error_count = Arc::new(AtomicUsize::default()); |
| 118 | + let error_count_clone = error_count.clone(); |
| 119 | + let client = StatsdClient::builder(METRIC_PREFIX, sink) |
| 120 | + .with_error_handler(move |_| { |
| 121 | + error_count_clone.fetch_add(1, Ordering::AcqRel); |
| 122 | + }) |
| 123 | + .build(); |
| 124 | + |
| 125 | + run(client).await?; |
| 126 | + |
| 127 | + processing_job.await?; |
| 128 | + |
| 129 | + delay_for(CATCHUP_DELAY).await; |
| 130 | + |
| 131 | + info!("sent count: {}", NUM_TASKS * NUM_LOOPS); |
| 132 | + |
| 133 | + info!( |
| 134 | + "received count: {}", |
| 135 | + recv_count_clone.load(Ordering::Acquire) |
| 136 | + ); |
| 137 | + |
| 138 | + info!("error count: {}", error_count.load(Ordering::Acquire)); |
| 139 | + |
| 140 | + Ok(()) |
| 141 | +} |
0 commit comments