Skip to content

Commit e73f412

Browse files
georgepisaltuacatangiu
authored andcommitted
micro-http: explicit HUP/ERR handling in server
The server in `micro_http` was not explicitly handling the `EPOLLHUP`, `EPOLLRDHUP` and `EPOLLERR` epoll events. Added new checks that close a connection that is signaled with these events. Signed-off-by: George Pisaltu <[email protected]> Signed-off-by: YUAN LYU <[email protected]>
1 parent ed44e2a commit e73f412

File tree

2 files changed

+76
-6
lines changed

2 files changed

+76
-6
lines changed

src/connection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,8 @@ impl<T: Read + Write> HttpConnection<T> {
428428
Ok(())
429429
}
430430

431-
fn clear_write_buffer(&mut self) {
431+
/// Discards all pending writes from the connection.
432+
pub fn clear_write_buffer(&mut self) {
432433
self.response_queue.clear();
433434
self.response_buffer.take();
434435
}

src/server.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ impl<T: Read + Write> ClientConnection<T> {
193193
Ok(())
194194
}
195195

196+
/// Discards all pending writes from the inner connection.
197+
fn clear_write_buffer(&mut self) {
198+
self.connection.clear_write_buffer();
199+
}
200+
196201
// Returns `true` if the connection is closed and safe to drop.
197202
fn is_done(&self) -> bool {
198203
self.state == ClientConnectionState::Closed
@@ -343,6 +348,22 @@ impl HttpServer {
343348
// We have a notification on one of our open connections.
344349
let fd = e.fd();
345350
let client_connection = self.connections.get_mut(&fd).unwrap();
351+
352+
// If we receive a hang up on a connection, we clear the write buffer and set
353+
// the connection state to closed to mark it ready for removal from the
354+
// connections map, which will gracefully close the socket.
355+
// The connection is also marked for removal when encountering `EPOLLERR`,
356+
// since this is an "error condition happened on the associated file
357+
// descriptor", according to the `epoll_ctl` man page.
358+
if e.event_set().contains(epoll::EventSet::ERROR)
359+
|| e.event_set().contains(epoll::EventSet::HANG_UP)
360+
|| e.event_set().contains(epoll::EventSet::READ_HANG_UP)
361+
{
362+
client_connection.clear_write_buffer();
363+
client_connection.state = ClientConnectionState::Closed;
364+
continue;
365+
}
366+
346367
if e.event_set().contains(epoll::EventSet::IN) {
347368
// We have bytes to read from this connection.
348369
// If our `read` yields `Request` objects, we wrap them with an ID before
@@ -358,7 +379,11 @@ impl HttpServer {
358379
// either an error message or an `expect` response, we change its `epoll`
359380
// event set to notify us when the stream is ready for writing.
360381
if client_connection.state == ClientConnectionState::AwaitingOutgoing {
361-
Self::epoll_mod(&self.epoll, fd, epoll::EventSet::OUT)?;
382+
Self::epoll_mod(
383+
&self.epoll,
384+
fd,
385+
epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP,
386+
)?;
362387
}
363388
} else if e.event_set().contains(epoll::EventSet::OUT) {
364389
// We have bytes to write on this connection.
@@ -367,7 +392,11 @@ impl HttpServer {
367392
// and we don't have any more responses to write, we change the `epoll`
368393
// event set to notify us when we have bytes to read from the stream.
369394
if client_connection.state == ClientConnectionState::AwaitingIncoming {
370-
Self::epoll_mod(&self.epoll, fd, epoll::EventSet::IN)?;
395+
Self::epoll_mod(
396+
&self.epoll,
397+
fd,
398+
epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP,
399+
)?;
371400
}
372401
}
373402
}
@@ -492,7 +521,11 @@ impl HttpServer {
492521
// `epoll` event set to notify us when the stream is ready for writing.
493522
if let ClientConnectionState::AwaitingIncoming = client_connection.state {
494523
client_connection.state = ClientConnectionState::AwaitingOutgoing;
495-
Self::epoll_mod(&self.epoll, response.id as RawFd, epoll::EventSet::OUT)?;
524+
Self::epoll_mod(
525+
&self.epoll,
526+
response.id as RawFd,
527+
epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP,
528+
)?;
496529
}
497530
client_connection.enqueue_response(response.response)?;
498531
}
@@ -554,7 +587,10 @@ impl HttpServer {
554587
.ctl(
555588
epoll::ControlOperation::Add,
556589
stream_fd,
557-
epoll::EpollEvent::new(epoll::EventSet::IN, stream_fd as u64),
590+
epoll::EpollEvent::new(
591+
epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP,
592+
stream_fd as u64,
593+
),
558594
)
559595
.map_err(ServerError::IOError)
560596
}
@@ -575,6 +611,7 @@ impl HttpServer {
575611
mod tests {
576612
use super::*;
577613
use std::io::{Read, Write};
614+
use std::net::Shutdown;
578615
use std::os::unix::net::UnixStream;
579616

580617
use crate::common::Body;
@@ -750,7 +787,7 @@ mod tests {
750787
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
751788
server.start_server().unwrap();
752789

753-
let mut sockets: Vec<UnixStream> = Vec::with_capacity(11);
790+
let mut sockets: Vec<UnixStream> = Vec::with_capacity(MAX_CONNECTIONS + 1);
754791
for _ in 0..MAX_CONNECTIONS {
755792
sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
756793
assert!(server.requests().unwrap().is_empty());
@@ -761,6 +798,38 @@ mod tests {
761798
let mut buf: [u8; 120] = [0; 120];
762799
sockets[MAX_CONNECTIONS].read_exact(&mut buf).unwrap();
763800
assert_eq!(&buf[..], SERVER_FULL_ERROR_MESSAGE);
801+
assert_eq!(server.connections.len(), 10);
802+
{
803+
// Drop this stream.
804+
let _refused_stream = sockets.pop().unwrap();
805+
}
806+
assert_eq!(server.connections.len(), 10);
807+
808+
// Check that the server detects a connection shutdown.
809+
let sock: &UnixStream = sockets.get(0).unwrap();
810+
sock.shutdown(Shutdown::Both).unwrap();
811+
assert!(server.requests().unwrap().is_empty());
812+
// Server should drop a closed connection.
813+
assert_eq!(server.connections.len(), 9);
814+
815+
// Close the backing FD of this connection by dropping
816+
// it out of scope.
817+
{
818+
// Enforce the drop call on the stream
819+
let _sock = sockets.pop().unwrap();
820+
}
821+
assert!(server.requests().unwrap().is_empty());
822+
// Server should drop a closed connection.
823+
assert_eq!(server.connections.len(), 8);
824+
825+
let sock: &UnixStream = sockets.get(1).unwrap();
826+
// Close both the read and write sides of the socket
827+
// separately and check that the server detects it.
828+
sock.shutdown(Shutdown::Read).unwrap();
829+
sock.shutdown(Shutdown::Write).unwrap();
830+
assert!(server.requests().unwrap().is_empty());
831+
// Server should drop a closed connection.
832+
assert_eq!(server.connections.len(), 7);
764833
}
765834

766835
#[test]

0 commit comments

Comments
 (0)