Skip to content

Commit 68e1915

Browse files
Merge #364
364: [PLAT-396] Refactor threads r=Pagten a=raoulstrackx This PR improves on two key areas: - Information about TCP connections are stored in a hashmap, similar to how information about TCP server sockets is tracked. This will enable this information to be requested from the enclave in a subsequent PR - The way how connections are proxied, is modified. Instead of starting another thread per TCP connection, a single proxy thread is started. It iterates over all TCP connections and exchanges message once for each connection. When no connections are currently available, the thread yields its CPU time. Co-authored-by: Raoul Strackx <[email protected]>
2 parents c203721 + 2247591 commit 68e1915

File tree

3 files changed

+158
-78
lines changed

3 files changed

+158
-78
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fortanix-vme/fortanix-vme-runner/src/lib.rs

Lines changed: 156 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::thread::{self, JoinHandle};
88
use std::io::{self, Error as IoError, ErrorKind as IoErrorKind, Read, Write};
99
use std::net::{Shutdown, TcpListener, TcpStream};
1010
use std::os::unix::io::AsRawFd;
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::{Arc, Mutex, RwLock};
1212
use fortanix_vme_abi::{self, Addr, Response, Request};
1313
use vsock::{self, SockAddr as VsockAddr, Std, Vsock, VsockListener, VsockStream};
1414

@@ -89,14 +89,105 @@ impl Listener {
8989
}
9090
}
9191

92+
#[derive(Debug)]
93+
struct Connection {
94+
tcp_stream: TcpStream,
95+
vsock_stream: VsockStream<Std>,
96+
remote_name: String,
97+
}
98+
99+
#[derive(Clone, Debug)]
100+
struct ConnectionInfo {
101+
}
102+
103+
impl Connection {
104+
pub fn new(vsock_stream: VsockStream<Std>, tcp_stream: TcpStream, remote_name: String) -> Self {
105+
Connection {
106+
tcp_stream,
107+
vsock_stream,
108+
remote_name,
109+
}
110+
}
111+
112+
pub fn info(&self) -> ConnectionInfo {
113+
ConnectionInfo{}
114+
}
115+
116+
/// Exchanges messages between the remote server and enclave. Returns on error, or when one of
117+
/// the connections terminated
118+
pub fn proxy(&mut self) -> Result<(), IoError> {
119+
let remote = &mut self.tcp_stream;
120+
let enclave = &mut self.vsock_stream;
121+
122+
let mut golden_set = FdSet::new();
123+
golden_set.insert(remote.as_raw_fd());
124+
golden_set.insert(enclave.as_raw_fd());
125+
126+
while golden_set != FdSet::new() {
127+
let mut read_set = golden_set.clone();
128+
129+
if let Ok(_num) = select(None, Some(&mut read_set), None, None, None) {
130+
if read_set.contains(remote.as_raw_fd()) {
131+
// According to the `Read` trait documentation, reading 0 bytes
132+
// indicates that the connection has been shutdown (for writes) correctly. We
133+
// - reflect this change on the other connection
134+
// - avoid reading from the socket again
135+
// https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
136+
if Server::transfer_data(remote, &self.remote_name, enclave, "enclave")? == 0 {
137+
enclave.shutdown(Shutdown::Write)?;
138+
golden_set.remove(remote.as_raw_fd());
139+
}
140+
}
141+
if read_set.contains(enclave.as_raw_fd()) {
142+
if Server::transfer_data(enclave, "enclave", remote, &self.remote_name)? == 0 {
143+
remote.shutdown(Shutdown::Write)?;
144+
golden_set.remove(enclave.as_raw_fd());
145+
}
146+
}
147+
}
148+
}
149+
Ok(())
150+
}
151+
}
152+
153+
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
154+
struct ConnectionKey {
155+
enclave: VsockAddr,
156+
runner: VsockAddr,
157+
}
158+
159+
impl ConnectionKey {
160+
pub fn from_vsock_stream(runner_enclave: &VsockStream<Std>) -> Self {
161+
let runner_cid = runner_enclave.local_addr().unwrap().cid();
162+
let runner_port = runner_enclave.local_addr().unwrap().port();
163+
let enclave_cid = runner_enclave.peer_addr().unwrap().cid();
164+
let enclave_port = runner_enclave.peer_addr().unwrap().port();
165+
Self::connection_key(enclave_cid, enclave_port, runner_cid, runner_port)
166+
}
167+
168+
pub fn from_addresses(enclave: VsockAddr, runner: VsockAddr) -> Self {
169+
ConnectionKey {
170+
enclave,
171+
runner,
172+
}
173+
}
174+
175+
fn connection_key(enclave_cid: u32, enclave_port: u32, runner_cid: u32, runner_port: u32) -> Self {
176+
let enclave = VsockAddr::new(enclave_cid, enclave_port);
177+
let runner = VsockAddr::new(runner_cid, runner_port);
178+
Self::from_addresses(enclave, runner)
179+
}
180+
}
181+
92182
pub struct Server {
93183
command_listener: Mutex<VsockListener>,
94184
/// Tracks information about TCP sockets that are currently listening for new connections. For
95185
/// every TCP listener socket in the runner, there is a vsock listener socket in the enclave.
96186
/// When the enclave instructs to accept a new connection, the runner accepts a new TCP
97187
/// connection. It then locates the ListenerInfo and finds the information it needs to set up a
98188
/// new vsock connection to the enclave
99-
listeners: Mutex<FnvHashMap<VsockAddr, Arc<Mutex<Listener>>>>,
189+
listeners: RwLock<FnvHashMap<VsockAddr, Arc<Mutex<Listener>>>>,
190+
connections: RwLock<FnvHashMap<ConnectionKey, ConnectionInfo>>,
100191
}
101192

102193
impl Server {
@@ -179,9 +270,9 @@ impl Server {
179270
* [2] remote
180271
* [3] proxy
181272
*/
182-
fn handle_request_connect(&self, remote_addr: &String, enclave: &mut VsockStream) -> Result<(), IoError> {
273+
fn handle_request_connect(self: Arc<Self>, remote_addr: &String, enclave: &mut VsockStream) -> Result<(), IoError> {
183274
// Connect to remote server
184-
let mut remote_socket = TcpStream::connect(remote_addr)?;
275+
let remote_socket = TcpStream::connect(remote_addr)?;
185276
let remote_name = remote_addr.split_terminator(":").next().unwrap_or(remote_addr);
186277

187278
// Create listening socket that the enclave can connect to
@@ -206,19 +297,44 @@ impl Server {
206297
Self::send(enclave, &response)?;
207298

208299
// Wait for incoming connection from enclave
209-
let (mut proxy, _proxy_addr) = proxy_server.accept()?;
300+
let (proxy, _proxy_addr) = proxy_server.accept()?;
301+
302+
// Store connection info
303+
self.add_connection(proxy, remote_socket, remote_name.to_string())?;
210304

211-
// Pass messages between remote server <-> enclave
212-
Self::proxy_connection((&mut remote_socket, remote_name), (&mut proxy, "proxy"));
213305
Ok(())
214306
}
215307

216308
fn add_listener(&self, addr: VsockAddr, info: Listener) {
217-
self.listeners.lock().unwrap().insert(addr, Arc::new(Mutex::new(info)));
309+
self.listeners.write().unwrap().insert(addr, Arc::new(Mutex::new(info)));
218310
}
219311

220312
fn listener(&self, addr: &VsockAddr) -> Option<Arc<Mutex<Listener>>> {
221-
self.listeners.lock().unwrap().get(&addr).cloned()
313+
self.listeners.read().unwrap().get(&addr).cloned()
314+
}
315+
316+
// Preliminary work for PLAT-367
317+
#[allow(dead_code)]
318+
fn connection(&self, enclave: VsockAddr, runner: VsockAddr) -> Option<ConnectionInfo> {
319+
let k = ConnectionKey::from_addresses(enclave, runner);
320+
self.connections
321+
.read()
322+
.unwrap()
323+
.get(&k)
324+
.cloned()
325+
}
326+
327+
fn add_connection(self: Arc<Self>, runner_enclave: VsockStream<Std>, runner_remote: TcpStream, remote_name: String) -> Result<JoinHandle<()>, IoError> {
328+
let k = ConnectionKey::from_vsock_stream(&runner_enclave);
329+
let mut connection = Connection::new(runner_enclave, runner_remote, remote_name);
330+
self.connections.write().unwrap().insert(k.clone(), connection.info());
331+
332+
thread::Builder::new().spawn(move || {
333+
if let Err(e) = connection.proxy() {
334+
eprintln!("Connection failed: {}", e);
335+
}
336+
self.connections.write().unwrap().remove(&k);
337+
})
222338
}
223339

224340
/*
@@ -244,7 +360,7 @@ impl Server {
244360
* runner
245361
* `enclave`: The runner-enclave vsock connection
246362
*/
247-
fn handle_request_bind(&self, addr: &String, enclave_port: u32, enclave: &mut VsockStream) -> Result<(), IoError> {
363+
fn handle_request_bind(self: Arc<Self>, addr: &String, enclave_port: u32, enclave: &mut VsockStream) -> Result<(), IoError> {
248364
let cid: u32 = enclave.peer().unwrap().parse().unwrap_or(vsock::VMADDR_CID_HYPERVISOR);
249365
let listener = TcpListener::bind(addr)?;
250366
let local: Addr = listener.local_addr()?.into();
@@ -262,20 +378,21 @@ impl Server {
262378
Ok(())
263379
}
264380

265-
fn handle_request_accept(&self, vsock_listener_port: u32, enclave: &mut VsockStream) -> Result<(), IoError> {
381+
fn handle_request_accept(self: Arc<Self>, vsock_listener_port: u32, enclave: &mut VsockStream) -> Result<(), IoError> {
266382
let enclave_cid: u32 = enclave.peer().unwrap().parse().unwrap_or(vsock::VMADDR_CID_HYPERVISOR);
267383
let enclave_addr = VsockAddr::new(enclave_cid, vsock_listener_port);
268384
let listener = self.listener(&enclave_addr)
269385
.ok_or(IoError::new(IoErrorKind::InvalidInput, "Information about provided file descriptor was not found"))?;
270386
let listener = listener.lock().unwrap();
271387

272388
match listener.listener.accept() {
273-
Ok((mut conn, peer)) => {
389+
Ok((conn, peer)) => {
274390
let vsock = Vsock::new::<Std>()?;
391+
let runner_addr = vsock.addr::<Std>()?;
275392
let response = Response::IncomingConnection{
276393
local: conn.local_addr()?.into(),
277394
peer: peer.into(),
278-
proxy_port: vsock.addr::<Std>()?.port(),
395+
proxy_port: runner_addr.port(),
279396
};
280397
Self::log_communication(
281398
"runner",
@@ -286,56 +403,17 @@ impl Server {
286403
Direction::Right,
287404
"vsock");
288405
enclave.write(&serde_cbor::ser::to_vec(&response).unwrap())?;
289-
let _ = thread::Builder::new().spawn(move || {
290-
let mut proxy = vsock.connect_with_cid_port(enclave_addr.cid(), enclave_addr.port()).unwrap();
291-
Self::proxy_connection((&mut conn, "remote"), (&mut proxy, "proxy"));
292-
});
406+
407+
let proxy = vsock.connect_with_cid_port(enclave_addr.cid(), enclave_addr.port()).unwrap();
408+
self.add_connection(proxy, conn, "remote".to_string())?;
409+
293410
Ok(())
294411
},
295412
Err(e) => Err(e),
296413
}
297414
}
298415

299-
fn proxy_connection(remote: (&mut TcpStream, &str), proxy: (&mut VsockStream, &str)) {
300-
loop {
301-
let mut read_set = FdSet::new();
302-
read_set.insert(remote.0.as_raw_fd());
303-
read_set.insert(proxy.0.as_raw_fd());
304-
305-
if let Ok(_num) = select(None, Some(&mut read_set), None, None, None) {
306-
if read_set.contains(remote.0.as_raw_fd()) {
307-
match Self::transfer_data(remote.0, remote.1, proxy.0, proxy.1) {
308-
Ok(0) => {
309-
// According to the `Read` threat documentation, reading 0 bytes
310-
// indicates that the connection has been shutdown correctly. So we
311-
// close the proxy service
312-
// https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
313-
break
314-
},
315-
Ok(_) => (),
316-
Err(e) => {
317-
eprintln!("transfer from remote failed: {:?}", e);
318-
break;
319-
}
320-
}
321-
}
322-
if read_set.contains(proxy.0.as_raw_fd()) {
323-
match Self::transfer_data(proxy.0, proxy.1, remote.0, remote.1) {
324-
Ok(0) => break,
325-
Ok(_) => (),
326-
Err(e) => {
327-
eprintln!("transfer from proxy failed: {:?}", e);
328-
break;
329-
}
330-
}
331-
}
332-
}
333-
}
334-
let _ = proxy.0.shutdown(Shutdown::Both);
335-
let _ = remote.0.shutdown(Shutdown::Both);
336-
}
337-
338-
fn handle_client(&self, stream: &mut VsockStream) -> Result<(), IoError> {
416+
fn handle_client(self: Arc<Self>, stream: &mut VsockStream) -> Result<(), IoError> {
339417
match Self::read_request(stream) {
340418
Ok(Request::Connect{ addr }) => self.handle_request_connect(&addr, stream)?,
341419
Ok(Request::Bind{ addr, enclave_port }) => self.handle_request_bind(&addr, enclave_port, stream)?,
@@ -349,33 +427,35 @@ impl Server {
349427
let command_listener = VsockListener::<Std>::bind_with_cid_port(vsock::VMADDR_CID_ANY, port)?;
350428
Ok(Server {
351429
command_listener: Mutex::new(command_listener),
352-
listeners: Mutex::new(FnvHashMap::default()),
430+
listeners: RwLock::new(FnvHashMap::default()),
431+
connections: RwLock::new(FnvHashMap::default()),
353432
})
354433
}
355434

356-
pub fn run(port: u32) -> std::io::Result<(JoinHandle<()>, u32)> {
435+
fn start_command_server(self: Arc<Self>) -> Result<JoinHandle<()>, IoError> {
436+
thread::Builder::new().spawn(move || {
437+
let command_listener = self.command_listener.lock().unwrap();
438+
for stream in command_listener.incoming() {
439+
let server = self.clone();
440+
let _ = thread::Builder::new()
441+
.spawn(move || {
442+
let mut stream = stream.unwrap();
443+
if let Err(e) = server.handle_client(&mut stream) {
444+
eprintln!("Error handling connection: {}, shutting connection down", e);
445+
let _ = stream.shutdown(Shutdown::Both);
446+
}
447+
});
448+
}
449+
})
450+
}
451+
452+
pub fn run(port: u32) -> std::io::Result<JoinHandle<()>> {
357453
println!("Starting enclave runner.");
358454
let server = Arc::new(Self::bind(port)?);
359455
let port = server.command_listener.lock().unwrap().local_addr()?.port();
360456
println!("Listening on vsock port {}...", port);
361457

362-
let handle = thread::Builder::new().spawn(move || {
363-
let server = server;
364-
let server = server.clone();
365-
let command_listener = server.command_listener.lock().unwrap();
366-
for stream in command_listener.incoming() {
367-
let server = server.clone();
368-
let _ = thread::Builder::new()
369-
.spawn(move || {
370-
let mut stream = stream.unwrap();
371-
if let Err(e) = server.handle_client(&mut stream) {
372-
eprintln!("Error handling connection: {}, shutting connection down", e);
373-
let _ = stream.shutdown(Shutdown::Both);
374-
}
375-
});
376-
}
377-
})?;
378-
Ok((handle, port))
458+
server.start_command_server()
379459
}
380460
}
381461

fortanix-vme/fortanix-vme-runner/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io::ErrorKind;
44

55
fn main() {
66
match Server::run(SERVER_PORT) {
7-
Ok((server_thread, _port)) => server_thread.join().expect("Server panicked"),
7+
Ok(handle) => { handle.join().unwrap(); },
88
Err(e) if e.kind() == ErrorKind::AddrInUse => println!("Server failed. Do you already have a runner running on vsock port {}? (Error: {:?})", SERVER_PORT, e),
99
Err(e) => println!("Server failed. Error: {:?}", e),
1010
}

0 commit comments

Comments
 (0)