Skip to content

Commit 24665eb

Browse files
Handle unrecoverable socket errors
1 parent 373b4e3 commit 24665eb

File tree

1 file changed

+60
-25
lines changed

1 file changed

+60
-25
lines changed

src/sinks/util/datagram.rs

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,36 @@ pub struct EncodedDatagram {
3535
pub finalizers: EventFinalizers,
3636
}
3737

38+
enum SendOutcome {
39+
/// Datagram was successfully sent.
40+
Delivered,
41+
/// Per-event error that reconnecting the socket cannot fix (e.g. EMSGSIZE,
42+
/// EDESTADDRREQ). Drop the event and move on.
43+
UnrecoverableError,
44+
/// Socket-level error that may be resolved by reconnecting.
45+
SocketError,
46+
}
47+
48+
/// Returns `true` only for errors that are known to be transient socket-level
49+
/// failures where reconnecting may succeed. All other errors — including
50+
/// EDESTADDRREQ (os error 89), EMSGSIZE, and any unknown error — are treated as
51+
/// non-recoverable so the stream can drop the event and make progress.
52+
fn is_recoverable_socket_error(error: &std::io::Error) -> bool {
53+
use std::io::ErrorKind;
54+
matches!(
55+
error.kind(),
56+
ErrorKind::ConnectionRefused
57+
| ErrorKind::ConnectionReset
58+
| ErrorKind::ConnectionAborted
59+
| ErrorKind::BrokenPipe
60+
| ErrorKind::NetworkDown
61+
| ErrorKind::HostUnreachable
62+
| ErrorKind::NetworkUnreachable
63+
| ErrorKind::TimedOut
64+
| ErrorKind::Interrupted
65+
)
66+
}
67+
3868
pub async fn send_datagrams(
3969
input: &mut Peekable<BoxStream<'_, EncodedDatagram>>,
4070
mut socket: DatagramSocket,
@@ -57,47 +87,47 @@ pub async fn send_datagrams(
5787
continue;
5888
};
5989

60-
let mut socket_error = false;
61-
let delivered = if let Some(chunker) = chunker {
90+
let outcome = if let Some(chunker) = chunker {
6291
let data_size = bytes.len();
6392
match chunker.chunk(bytes) {
6493
Ok(chunks) => {
65-
let mut chunks_delivered = true;
94+
let mut result = SendOutcome::Delivered;
6695
for chunk in chunks {
67-
if !send_and_emit(&mut socket, &chunk, bytes_sent).await {
68-
chunks_delivered = false;
69-
socket_error = true;
96+
result = send_and_emit(&mut socket, &chunk, bytes_sent).await;
97+
if !matches!(result, SendOutcome::Delivered) {
7098
break;
7199
}
72100
}
73-
chunks_delivered
101+
result
74102
}
75103
Err(err) => {
76104
emit!(UdpChunkingError {
77105
data_size,
78106
error: err
79107
});
80-
false
108+
SendOutcome::UnrecoverableError
81109
}
82110
}
83-
} else if send_and_emit(&mut socket, &bytes, bytes_sent).await {
84-
true
85111
} else {
86-
socket_error = true;
87-
false
112+
send_and_emit(&mut socket, &bytes, bytes_sent).await
88113
};
89114

90-
if delivered {
91-
if let Some(datagram) = input.next().await {
92-
datagram.finalizers.update_status(EventStatus::Delivered);
115+
match outcome {
116+
SendOutcome::Delivered => {
117+
if let Some(datagram) = input.next().await {
118+
datagram.finalizers.update_status(EventStatus::Delivered);
119+
}
93120
}
94-
} else if socket_error {
95-
// Socket error — leave item in stream for retry after reconnection.
96-
break;
97-
} else {
98-
// Chunking error — consume and mark errored, continue with next event.
99-
if let Some(datagram) = input.next().await {
100-
datagram.finalizers.update_status(EventStatus::Errored);
121+
SendOutcome::SocketError => {
122+
// Leave item in stream for retry after reconnection.
123+
break;
124+
}
125+
SendOutcome::UnrecoverableError => {
126+
// Per-event or permanent error — consume and mark errored so the
127+
// stream can make progress rather than retrying forever.
128+
if let Some(datagram) = input.next().await {
129+
datagram.finalizers.update_status(EventStatus::Errored);
130+
}
101131
}
102132
}
103133
}
@@ -107,7 +137,7 @@ async fn send_and_emit(
107137
socket: &mut DatagramSocket,
108138
bytes: &bytes::Bytes,
109139
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
110-
) -> bool {
140+
) -> SendOutcome {
111141
match send_datagram(socket, bytes).await {
112142
Ok(()) => {
113143
emit!(SocketEventsSent {
@@ -120,9 +150,10 @@ async fn send_and_emit(
120150
byte_size: bytes.len().into(),
121151
});
122152
bytes_sent.emit(ByteSize(bytes.len()));
123-
true
153+
SendOutcome::Delivered
124154
}
125155
Err(error) => {
156+
let recoverable = is_recoverable_socket_error(&error);
126157
match socket {
127158
DatagramSocket::Udp(_) => emit!(SocketSendError {
128159
mode: SocketMode::Udp,
@@ -136,7 +167,11 @@ async fn send_and_emit(
136167
})
137168
}
138169
};
139-
false
170+
if recoverable {
171+
SendOutcome::SocketError
172+
} else {
173+
SendOutcome::UnrecoverableError
174+
}
140175
}
141176
}
142177
}

0 commit comments

Comments
 (0)