Skip to content

Commit 3c4cc3a

Browse files
Sebastien Boeufgeorgepisaltu
authored andcommitted
Expect stream to implement ScmSocket trait
The stream used by the Connection structure is expected to implement both Read and Write traits. Since we want the server to be able to receive file descriptors through control messages mechanism, this patch extends the expectations regarding the stream by adding ScmSocket to the list of traits. Since the stream is a UnixStream structure, and since vmm-sys-util already provides an ScmSocket implementation for UnixStream, extending the list of traits is very straightforward. Relying on the newly added trait, the server now reads incoming bytes through recv_with_fd() function, which replaces the former call to read(). This change has no intent of modifying the former behavior from the read(), which is why the returned Option<File> is ignored for now. Signed-off-by: Sebastien Boeuf <[email protected]>
1 parent 81a3c71 commit 3c4cc3a

File tree

3 files changed

+24
-16
lines changed

3 files changed

+24
-16
lines changed

src/common/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,10 @@ pub enum ConnectionError {
117117
InvalidWrite,
118118
/// The request parsing has failed.
119119
ParseError(RequestError),
120-
/// Could not perform a stream operation successfully.
121-
StreamError(std::io::Error),
120+
/// Could not perform a read operation from stream successfully.
121+
StreamReadError(vmm_sys_util::errno::Error),
122+
/// Could not perform a write operation to stream successfully.
123+
StreamWriteError(std::io::Error),
122124
}
123125

124126
impl Display for ConnectionError {
@@ -127,7 +129,8 @@ impl Display for ConnectionError {
127129
Self::ConnectionClosed => write!(f, "Connection closed."),
128130
Self::InvalidWrite => write!(f, "Invalid write attempt."),
129131
Self::ParseError(inner) => write!(f, "Parsing error: {}", inner),
130-
Self::StreamError(inner) => write!(f, "Stream error: {}", inner),
132+
Self::StreamReadError(inner) => write!(f, "Reading stream error: {}", inner),
133+
Self::StreamWriteError(inner) => write!(f, "Writing stream error: {}", inner),
131134
}
132135
}
133136
}
@@ -322,7 +325,10 @@ mod tests {
322325
match (self, other) {
323326
(ParseError(ref e), ParseError(ref other_e)) => e.eq(other_e),
324327
(ConnectionClosed, ConnectionClosed) => true,
325-
(StreamError(ref e), StreamError(ref other_e)) => {
328+
(StreamReadError(ref e), StreamReadError(ref other_e)) => {
329+
format!("{}", e).eq(&format!("{}", other_e))
330+
}
331+
(StreamWriteError(ref e), StreamWriteError(ref other_e)) => {
326332
format!("{}", e).eq(&format!("{}", other_e))
327333
}
328334
(InvalidWrite, InvalidWrite) => true,
@@ -489,9 +495,9 @@ mod tests {
489495
assert_eq!(
490496
format!(
491497
"{}",
492-
ConnectionError::StreamError(std::io::Error::from_raw_os_error(11))
498+
ConnectionError::StreamWriteError(std::io::Error::from_raw_os_error(11))
493499
),
494-
"Stream error: Resource temporarily unavailable (os error 11)"
500+
"Writing stream error: Resource temporarily unavailable (os error 11)"
495501
);
496502
}
497503

src/connection.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub use crate::common::{ConnectionError, HttpHeaderError, RequestError};
1010
use crate::headers::Headers;
1111
use crate::request::{find, Request, RequestLine};
1212
use crate::response::{Response, StatusCode};
13+
use vmm_sys_util::sock_ctrl_msg::ScmSocket;
1314

1415
const BUFFER_SIZE: usize = 1024;
1516

@@ -51,7 +52,7 @@ pub struct HttpConnection<T> {
5152
response_buffer: Option<Vec<u8>>,
5253
}
5354

54-
impl<T: Read + Write> HttpConnection<T> {
55+
impl<T: Read + Write + ScmSocket> HttpConnection<T> {
5556
/// Creates an empty connection.
5657
pub fn new(stream: T) -> Self {
5758
Self {
@@ -126,10 +127,10 @@ impl<T: Read + Write> HttpConnection<T> {
126127
}
127128
// Append new bytes to what we already have in the buffer.
128129
// The slice access is safe, the index is checked above.
129-
let bytes_read = self
130+
let (bytes_read, _) = self
130131
.stream
131-
.read(&mut self.buffer[self.read_cursor..])
132-
.map_err(ConnectionError::StreamError)?;
132+
.recv_with_fd(&mut self.buffer[self.read_cursor..])
133+
.map_err(ConnectionError::StreamReadError)?;
133134

134135
// If the read returned 0 then the client has closed the connection.
135136
if bytes_read == 0 {
@@ -392,7 +393,7 @@ impl<T: Read + Write> HttpConnection<T> {
392393
let mut response_buffer_vec: Vec<u8> = Vec::new();
393394
response
394395
.write_all(&mut response_buffer_vec)
395-
.map_err(ConnectionError::StreamError)?;
396+
.map_err(ConnectionError::StreamWriteError)?;
396397
self.response_buffer = Some(response_buffer_vec);
397398
} else {
398399
return Err(ConnectionError::InvalidWrite);

src/server.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::collections::HashMap;
45
use std::io::{Read, Write};
56
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
67
use std::os::unix::net::{UnixListener, UnixStream};
@@ -11,7 +12,7 @@ pub use crate::common::{ConnectionError, RequestError, ServerError};
1112
use crate::connection::HttpConnection;
1213
use crate::request::Request;
1314
use crate::response::{Response, StatusCode};
14-
use std::collections::HashMap;
15+
use vmm_sys_util::sock_ctrl_msg::ScmSocket;
1516

1617
use vmm_sys_util::epoll;
1718

@@ -92,7 +93,7 @@ struct ClientConnection<T> {
9293
in_flight_response_count: u32,
9394
}
9495

95-
impl<T: Read + Write> ClientConnection<T> {
96+
impl<T: Read + Write + ScmSocket> ClientConnection<T> {
9697
fn new(connection: HttpConnection<T>) -> Self {
9798
Self {
9899
connection,
@@ -113,7 +114,7 @@ impl<T: Read + Write> ClientConnection<T> {
113114
// safe to drop.
114115
return Ok(vec![]);
115116
}
116-
Err(ConnectionError::StreamError(inner)) => {
117+
Err(ConnectionError::StreamReadError(inner)) => {
117118
// Reading from the connection failed.
118119
// We should try to write an error message regardless.
119120
let mut internal_error_response =
@@ -134,7 +135,7 @@ impl<T: Read + Write> ClientConnection<T> {
134135
)));
135136
self.connection.enqueue_response(error_response);
136137
}
137-
Err(ConnectionError::InvalidWrite) => {
138+
Err(ConnectionError::InvalidWrite) | Err(ConnectionError::StreamWriteError(_)) => {
138139
// This is unreachable because `HttpConnection::try_read()` cannot return this error variant.
139140
unreachable!();
140141
}
@@ -161,7 +162,7 @@ impl<T: Read + Write> ClientConnection<T> {
161162
fn write(&mut self) -> Result<()> {
162163
// The stream is available for writing.
163164
match self.connection.try_write() {
164-
Err(ConnectionError::ConnectionClosed) | Err(ConnectionError::StreamError(_)) => {
165+
Err(ConnectionError::ConnectionClosed) | Err(ConnectionError::StreamWriteError(_)) => {
165166
// Writing to the stream failed so it will be removed.
166167
self.state = ClientConnectionState::Closed;
167168
}

0 commit comments

Comments
 (0)