Skip to content

Commit b246f2f

Browse files
Make generic encode_to_datagrams
1 parent 24665eb commit b246f2f

File tree

3 files changed

+36
-39
lines changed

3 files changed

+36
-39
lines changed

src/sinks/util/datagram.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::pin::Pin;
22

3-
use bytes::Bytes;
3+
use bytes::{Bytes, BytesMut};
44
use futures::{StreamExt, stream::BoxStream};
55
use futures_util::stream::Peekable;
66
#[cfg(unix)]
77
use std::path::PathBuf;
88
use tokio::net::UdpSocket;
99
#[cfg(unix)]
1010
use tokio::net::UnixDatagram;
11+
use tokio_util::codec::Encoder;
1112
use vector_lib::{
1213
codecs::encoding::{Chunker, Chunking},
1314
internal_event::{ByteSize, BytesSent, InternalEventHandle, RegisterInternalEvent},
@@ -16,7 +17,8 @@ use vector_lib::{
1617
#[cfg(unix)]
1718
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
1819
use crate::{
19-
event::{EventFinalizers, EventStatus},
20+
codecs::Transformer,
21+
event::{Event, EventFinalizers, EventStatus, Finalizable},
2022
internal_events::{
2123
SocketEventsSent, SocketMode, SocketSendError, UdpChunkingError, UdpSendIncompleteError,
2224
},
@@ -65,6 +67,32 @@ fn is_recoverable_socket_error(error: &std::io::Error) -> bool {
6567
)
6668
}
6769

70+
/// Transforms and encodes a raw event stream into a stream of [`EncodedDatagram`]s
71+
/// ready to be passed to [`send_datagrams`].
72+
pub fn encode_to_datagrams<'a, E>(
73+
input: BoxStream<'a, Event>,
74+
transformer: Transformer,
75+
mut encoder: E,
76+
) -> Peekable<BoxStream<'a, EncodedDatagram>>
77+
where
78+
E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Send + 'a,
79+
{
80+
input
81+
.map(move |mut event| {
82+
transformer.transform(&mut event);
83+
let finalizers = event.take_finalizers();
84+
let mut bytes = BytesMut::new();
85+
let bytes = if encoder.encode(event, &mut bytes).is_ok() {
86+
Some(bytes.freeze())
87+
} else {
88+
None
89+
};
90+
EncodedDatagram { bytes, finalizers }
91+
})
92+
.boxed()
93+
.peekable()
94+
}
95+
6896
pub async fn send_datagrams(
6997
input: &mut Peekable<BoxStream<'_, EncodedDatagram>>,
7098
mut socket: DatagramSocket,

src/sinks/util/udp.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use std::{
44
};
55

66
use async_trait::async_trait;
7-
use bytes::BytesMut;
8-
use futures::{FutureExt, StreamExt, stream::BoxStream};
7+
use futures::{FutureExt, stream::BoxStream};
98
use snafu::{ResultExt, Snafu};
109
use tokio::{net::UdpSocket, time::sleep};
1110
use tokio_util::codec::Encoder;
@@ -17,13 +16,13 @@ use vector_lib::{
1716

1817
use super::{
1918
SinkBuildError,
20-
datagram::{DatagramSocket, EncodedDatagram, send_datagrams},
19+
datagram::{DatagramSocket, encode_to_datagrams, send_datagrams},
2120
};
2221
use crate::{
2322
codecs::Transformer,
2423
common::backoff::ExponentialBackoff,
2524
dns,
26-
event::{Event, Finalizable},
25+
event::Event,
2726
internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
2827
net,
2928
sinks::{Healthcheck, VectorSink, util::StreamSink},
@@ -199,22 +198,7 @@ where
199198
E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
200199
{
201200
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
202-
let mut encoder = self.encoder.clone();
203-
let transformer = self.transformer.clone();
204-
let mut input = input
205-
.map(move |mut event| {
206-
transformer.transform(&mut event);
207-
let finalizers = event.take_finalizers();
208-
let mut bytes = BytesMut::new();
209-
let bytes = if encoder.encode(event, &mut bytes).is_ok() {
210-
Some(bytes.freeze())
211-
} else {
212-
None
213-
};
214-
EncodedDatagram { bytes, finalizers }
215-
})
216-
.boxed()
217-
.peekable();
201+
let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone());
218202

219203
let chunker = self.chunker.clone();
220204
while Pin::new(&mut input).peek().await.is_some() {

src/sinks/util/unix.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use vector_lib::{
2222
json_size::JsonSize,
2323
};
2424

25-
use super::datagram::{DatagramSocket, EncodedDatagram, send_datagrams};
25+
use super::datagram::{DatagramSocket, encode_to_datagrams, send_datagrams};
2626
use crate::{
2727
codecs::Transformer,
2828
common::backoff::ExponentialBackoff,
@@ -259,22 +259,7 @@ where
259259

260260
async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
261261
let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
262-
let mut encoder = self.encoder.clone();
263-
let transformer = self.transformer.clone();
264-
let mut input = input
265-
.map(move |mut event| {
266-
transformer.transform(&mut event);
267-
let finalizers = event.take_finalizers();
268-
let mut bytes = BytesMut::new();
269-
let bytes = if encoder.encode(event, &mut bytes).is_ok() {
270-
Some(bytes.freeze())
271-
} else {
272-
None
273-
};
274-
EncodedDatagram { bytes, finalizers }
275-
})
276-
.boxed()
277-
.peekable();
262+
let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone());
278263

279264
while Pin::new(&mut input).peek().await.is_some() {
280265
let socket = match self.connector.connect_backoff().await {

0 commit comments

Comments
 (0)