Skip to content

Commit 3f7806e

Browse files
committed
Factor out common worker logic
1 parent 5e308ca commit 3f7806e

File tree

4 files changed

+136
-217
lines changed

4 files changed

+136
-217
lines changed

src/builder.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! TODO docs!
1+
//! Builders for customizing asynchronous Metric Sinks.
22
33
use tokio::time::Duration;
44

@@ -8,7 +8,8 @@ use crate::{
88
DEFAULT_QUEUE_CAPACITY,
99
};
1010

11-
/// TODO docs!
11+
/// Builder allows you to override various default parameter values before creating an instance
12+
/// of the desired Metric Sink.
1213
#[derive(Debug)]
1314
pub struct Builder<T, S> {
1415
pub(crate) addr: T,
@@ -29,19 +30,19 @@ impl<T, S> Builder<T, S> {
2930
}
3031
}
3132

32-
/// TODO docs!
33+
/// Sets the maximum metric queue capacity (default: [DEFAULT_QUEUE_CAPACITY](crate::DEFAULT_QUEUE_CAPACITY)).
3334
pub fn queue_cap(&mut self, queue_cap: usize) -> &mut Self {
3435
self.queue_cap = queue_cap;
3536
self
3637
}
3738

38-
/// TODO docs!
39+
/// Sets the batch buffer size (default: [DEFAULT_BATCH_BUF_SIZE](crate::DEFAULT_BATCH_BUF_SIZE)).
3940
pub fn buf_size(&mut self, buf_size: usize) -> &mut Self {
4041
self.buf_size = buf_size;
4142
self
4243
}
4344

44-
/// TODO docs!
45+
/// Sets the maximum delay before flushing any buffered metrics (default: [DEFAULT_MAX_BATCH_DELAY](crate::DEFAULT_MAX_BATCH_DELAY)).
4546
pub fn max_delay(&mut self, max_delay: Duration) -> &mut Self {
4647
self.max_delay = max_delay;
4748
self

src/udp.rs

Lines changed: 14 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! TODO docs!
1+
//! Asynchronous Metric Sink implementation that uses UDP sockets.
22
33
use cadence::{
44
ErrorKind as MetricErrorKind,
@@ -26,26 +26,22 @@ use tokio::{
2626
net::UdpSocket,
2727
sync::mpsc::{
2828
channel,
29-
Receiver,
3029
Sender,
3130
},
32-
time::{
33-
timeout_at,
34-
Duration,
35-
Instant,
36-
},
31+
time::Duration,
3732
};
3833

3934
use crate::{
4035
builder::Builder,
36+
define_worker,
4137
worker::{
4238
Cmd,
4339
TrySend,
4440
},
4541
};
4642

4743
impl<T: ToSocketAddrs> Builder<T, UdpSocket> {
48-
/// TODO docs!
44+
/// Creates a customized instance of the [TokioBatchUdpMetricSink](crate::udp::TokioBatchUdpMetricSink).
4945
pub fn build(
5046
self,
5147
) -> MetricResult<(
@@ -58,13 +54,13 @@ impl<T: ToSocketAddrs> Builder<T, UdpSocket> {
5854
})?;
5955

6056
let (tx, rx) = channel(self.queue_cap);
61-
let worker_fut = worker(rx, addr, self.sock, self.buf_size, self.max_delay);
57+
let worker_fut = worker(rx, self.sock, addr, self.buf_size, self.max_delay);
6258

6359
Ok((TokioBatchUdpMetricSink { tx }, Box::pin(worker_fut)))
6460
}
6561
}
6662

67-
/// Metric sink that allows clients to enqueue metrics without blocking, and processing
63+
/// Metric sink that allows clients to enqueue metrics without blocking, and sending
6864
/// them asynchronously via UDP using Tokio runtime.
6965
///
7066
/// It also accumulates individual metrics for a configured maximum amount of time
@@ -134,6 +130,13 @@ impl TokioBatchUdpMetricSink {
134130
Builder::new(host, socket).build()
135131
}
136132

133+
/// Returns a builder for creating a new metric sink for the given statsd host
134+
/// using a previously bound UDP socket. The builder may be used to customize various
135+
/// configuration parameters before creating an instance of this sink.
136+
pub fn builder<T: ToSocketAddrs>(host: T, socket: UdpSocket) -> Builder<T, UdpSocket> {
137+
Builder::new(host, socket)
138+
}
139+
137140
/// Creates a new metric sink for the given statsd host, using the UDP socket, as well as
138141
/// metric queue capacity, batch buffer size, and maximum delay (in milliseconds) to wait
139142
/// before submitting any accumulated metrics as a batch.
@@ -174,88 +177,7 @@ impl MetricSink for TokioBatchUdpMetricSink {
174177
}
175178
}
176179

177-
async fn do_send(socket: &mut UdpSocket, addr: &SocketAddr, buf: &mut String) {
178-
match socket.send_to(buf.as_bytes(), addr).await {
179-
Ok(n) => {
180-
debug!("sent {} bytes", n);
181-
}
182-
183-
Err(e) => {
184-
error!("failed to send metrics: {:?}", e);
185-
}
186-
}
187-
188-
buf.clear();
189-
}
190-
191-
async fn worker(
192-
mut rx: Receiver<Cmd>,
193-
addr: SocketAddr,
194-
mut socket: UdpSocket,
195-
buf_size: usize,
196-
max_delay: Duration,
197-
) {
198-
let mut buf = String::with_capacity(buf_size);
199-
let now = Instant::now();
200-
let mut deadline = now.checked_add(max_delay).unwrap_or(now);
201-
loop {
202-
match timeout_at(deadline, rx.recv()).await {
203-
Ok(Some(Cmd::Write(msg))) => {
204-
trace!("write: {}", msg);
205-
206-
let msg_len = msg.len();
207-
if msg_len > buf.capacity() {
208-
warn!("metric exceeds buffer capacity: {}", msg);
209-
} else {
210-
let buf_len = buf.len();
211-
if buf_len > 0 {
212-
if buf_len + 1 + msg_len > buf.capacity() {
213-
do_send(&mut socket, &addr, &mut buf).await;
214-
let now = Instant::now();
215-
deadline = now.checked_add(max_delay).unwrap_or(now);
216-
} else {
217-
buf.push('\n');
218-
}
219-
}
220-
221-
buf.push_str(&msg);
222-
}
223-
}
224-
225-
Ok(Some(Cmd::Flush)) => {
226-
trace!("flush");
227-
228-
if !buf.is_empty() {
229-
do_send(&mut socket, &addr, &mut buf).await;
230-
}
231-
232-
let now = Instant::now();
233-
deadline = now.checked_add(max_delay).unwrap_or(now);
234-
}
235-
236-
Ok(None) => {
237-
debug!("stop");
238-
239-
if !buf.is_empty() {
240-
do_send(&mut socket, &addr, &mut buf).await;
241-
}
242-
243-
break;
244-
}
245-
246-
Err(_) => {
247-
trace!("timeout");
248-
249-
if !buf.is_empty() {
250-
do_send(&mut socket, &addr, &mut buf).await;
251-
}
252-
253-
let now = Instant::now();
254-
deadline = now.checked_add(max_delay).unwrap_or(now);
255-
}
256-
}
257-
}
258-
}
180+
define_worker!(UdpSocket, SocketAddr);
259181

260182
#[cfg(test)]
261183
mod tests {

src/unix.rs

Lines changed: 20 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! TODO docs!
1+
//! Asynchronous Metric Sink implementation that uses Unix Datagram sockets.
22
33
use cadence::{
44
MetricResult,
@@ -21,41 +21,37 @@ use tokio::{
2121
net::UnixDatagram,
2222
sync::mpsc::{
2323
channel,
24-
Receiver,
2524
Sender,
2625
},
27-
time::{
28-
timeout_at,
29-
Duration,
30-
Instant,
31-
},
3226
};
3327

3428
use crate::{
3529
builder::Builder,
30+
define_worker,
3631
worker::{
3732
Cmd,
3833
TrySend,
3934
},
4035
};
4136

4237
impl<T: AsRef<Path> + Send + Sync + Unpin + 'static> Builder<T, UnixDatagram> {
43-
/// TODO docs!
38+
/// Creates a customized instance of the [TokioBatchUnixMetricSink](crate::unix::TokioBatchUnixMetricSink).
4439
pub fn build(
4540
self,
4641
) -> MetricResult<(
4742
TokioBatchUnixMetricSink,
4843
Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
4944
)> {
5045
let (tx, rx) = channel(self.queue_cap);
51-
let worker_fut = worker(rx, self.addr, self.sock, self.buf_size, self.max_delay);
46+
let worker_fut = worker(rx, self.sock, self.addr, self.buf_size, self.max_delay);
5247

5348
Ok((TokioBatchUnixMetricSink { tx }, Box::pin(worker_fut)))
5449
}
5550
}
5651

57-
/// Metric sink that allows clients to enqueue metrics without blocking, and processing
58-
/// them asynchronously over a Unix Domain Socket using Tokio runtime.
52+
/// Metric sink that allows clients to enqueue metrics without blocking, and sending
53+
/// them asynchronously over a [Unix Domain Socket](https://docs.datadoghq.com/developers/dogstatsd/unix_socket)
54+
/// using Tokio runtime.
5955
///
6056
/// It also accumulates individual metrics for a configured maximum amount of time
6157
/// before submitting them as a single [batch](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#multi-metric-packets).
@@ -112,24 +108,26 @@ impl UnwindSafe for TokioBatchUnixMetricSink {}
112108
impl RefUnwindSafe for TokioBatchUnixMetricSink {}
113109

114110
impl TokioBatchUnixMetricSink {
115-
/// Creates a new metric sink for the given statsd host using a previously bound Unix socket.
111+
/// Creates a new metric sink for the given statsd socket path using an unbound Unix socket.
116112
/// Other sink parameters are defaulted.
117113
pub fn from<T: AsRef<Path> + Send + Sync + Unpin + 'static>(
118-
host: T,
114+
path: T,
119115
socket: UnixDatagram,
120116
) -> MetricResult<(
121117
Self,
122118
Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
123119
)> {
124-
Self::builder(host, socket).build()
120+
Self::builder(path, socket).build()
125121
}
126122

127-
/// TODO docs!
123+
/// Returns a builder for creating a new metric sink for the given statsd socket path
124+
/// using an unbound Unix socket. The builder may be used to customize various
125+
/// configuration parameters before creating an instance of this sink.
128126
pub fn builder<T: AsRef<Path> + Send + Sync + Unpin + 'static>(
129127
path: T,
130-
sock: UnixDatagram,
128+
socket: UnixDatagram,
131129
) -> Builder<T, UnixDatagram> {
132-
Builder::new(path, sock)
130+
Builder::new(path, socket)
133131
}
134132
}
135133

@@ -151,88 +149,11 @@ impl MetricSink for TokioBatchUnixMetricSink {
151149
}
152150
}
153151

154-
async fn do_send<T: AsRef<Path> + Unpin>(socket: &mut UnixDatagram, addr: &T, buf: &mut String) {
155-
match socket.send_to(buf.as_bytes(), addr).await {
156-
Ok(n) => {
157-
debug!("sent {} bytes", n);
158-
}
159-
160-
Err(e) => {
161-
error!("failed to send metrics: {:?}", e);
162-
}
163-
}
164-
165-
buf.clear();
166-
}
167-
168-
async fn worker(
169-
mut rx: Receiver<Cmd>,
170-
addr: impl AsRef<Path> + Unpin,
171-
mut socket: UnixDatagram,
172-
buf_size: usize,
173-
max_delay: Duration,
174-
) {
175-
let mut buf = String::with_capacity(buf_size);
176-
let now = Instant::now();
177-
let mut deadline = now.checked_add(max_delay).unwrap_or(now);
178-
loop {
179-
match timeout_at(deadline, rx.recv()).await {
180-
Ok(Some(Cmd::Write(msg))) => {
181-
trace!("write: {}", msg);
182-
183-
let msg_len = msg.len();
184-
if msg_len > buf.capacity() {
185-
warn!("metric exceeds buffer capacity: {}", msg);
186-
} else {
187-
let buf_len = buf.len();
188-
if buf_len > 0 {
189-
if buf_len + 1 + msg_len > buf.capacity() {
190-
do_send(&mut socket, &addr, &mut buf).await;
191-
let now = Instant::now();
192-
deadline = now.checked_add(max_delay).unwrap_or(now);
193-
} else {
194-
buf.push('\n');
195-
}
196-
}
197-
198-
buf.push_str(&msg);
199-
}
200-
}
201-
202-
Ok(Some(Cmd::Flush)) => {
203-
trace!("flush");
204-
205-
if !buf.is_empty() {
206-
do_send(&mut socket, &addr, &mut buf).await;
207-
}
208-
209-
let now = Instant::now();
210-
deadline = now.checked_add(max_delay).unwrap_or(now);
211-
}
212-
213-
Ok(None) => {
214-
debug!("stop");
215-
216-
if !buf.is_empty() {
217-
do_send(&mut socket, &addr, &mut buf).await;
218-
}
219-
220-
break;
221-
}
222-
223-
Err(_) => {
224-
trace!("timeout");
225-
226-
if !buf.is_empty() {
227-
do_send(&mut socket, &addr, &mut buf).await;
228-
}
229-
230-
let now = Instant::now();
231-
deadline = now.checked_add(max_delay).unwrap_or(now);
232-
}
233-
}
234-
}
235-
}
152+
define_worker!(
153+
UnixDatagram,
154+
impl AsRef<Path> + Unpin,
155+
impl AsRef<Path> + Unpin
156+
);
236157

237158
#[cfg(test)]
238159
mod tests {

0 commit comments

Comments
 (0)