Skip to content

Commit 9e64448

Browse files
committed
Spawning each data-event in it's own task to avoid deadlocking while doing nested function calls.
Need to test this more to make sure the logic is right.
1 parent 0e83a3c commit 9e64448

File tree

10 files changed

+346
-185
lines changed

10 files changed

+346
-185
lines changed

src/connection/base_connection.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use crate::{connection::Buffer, utils::Error};
1+
use crate::{connection::Buffer, utils::Error, juno_module_impl::JunoModuleImpl};
22
use async_trait::async_trait;
3+
use std::sync::Arc;
34

45
#[async_trait]
56
pub trait BaseConnection {
67
async fn setup_connection(&mut self) -> Result<(), Error>;
78
async fn close_connection(&mut self) -> Result<(), Error>;
89
async fn send(&mut self, buffer: Buffer) -> Result<(), Error>;
9-
async fn read_data(&mut self) -> Option<Buffer>;
10+
11+
fn set_data_listener(&mut self, listener: Arc<JunoModuleImpl>);
12+
fn get_data_listener(&self) -> &Option<Arc<JunoModuleImpl>>;
1013
}
Lines changed: 113 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,37 @@
11
use crate::{
22
connection::{BaseConnection, Buffer},
3-
utils::{Error, READ_BUFFER_SIZE},
3+
juno_module_impl::JunoModuleImpl,
4+
utils::Error,
45
};
5-
use std::time::Duration;
6+
use std::sync::Arc;
67

7-
use async_std::{
8-
io,
9-
net::{Shutdown, TcpStream},
10-
prelude::*,
11-
};
8+
use async_std::{io::BufReader, net::TcpStream, prelude::*, task};
129
use async_trait::async_trait;
10+
use future::Either;
11+
use futures::{
12+
channel::{
13+
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
14+
oneshot::{channel, Sender},
15+
},
16+
future, SinkExt,
17+
};
1318

1419
pub struct InetSocketConnection {
1520
connection_setup: bool,
1621
socket_path: String,
17-
client: Option<TcpStream>,
22+
on_data_handler: Option<Arc<JunoModuleImpl>>,
23+
write_data_sender: Option<UnboundedSender<Buffer>>,
24+
close_sender: Option<UnboundedSender<()>>,
1825
}
1926

2027
impl InetSocketConnection {
2128
pub fn new(socket_path: String) -> Self {
2229
InetSocketConnection {
2330
connection_setup: false,
2431
socket_path,
25-
client: None,
32+
on_data_handler: None,
33+
write_data_sender: None,
34+
close_sender: None,
2635
}
2736
}
2837
}
@@ -33,56 +42,119 @@ impl BaseConnection for InetSocketConnection {
3342
if self.connection_setup {
3443
panic!("Cannot call setup_connection() more than once!");
3544
}
36-
let result = TcpStream::connect(&self.socket_path).await;
37-
if let Err(err) = result {
38-
return Err(Error::Internal(format!("{}", err)));
39-
}
40-
let client = result.unwrap();
41-
self.client = Some(client);
45+
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
46+
let (close_sender, close_receiver) = unbounded::<()>();
47+
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
48+
49+
self.write_data_sender = Some(write_data_sender);
50+
self.close_sender = Some(close_sender);
51+
let socket_path = self.socket_path.clone();
52+
let juno_module_impl = self.on_data_handler.as_ref().unwrap().clone();
53+
54+
task::spawn(async {
55+
read_data_from_socket(
56+
socket_path,
57+
init_sender,
58+
juno_module_impl,
59+
write_data_receiver,
60+
close_receiver,
61+
)
62+
.await;
63+
});
4264

4365
self.connection_setup = true;
44-
Ok(())
66+
init_receiver.await.unwrap()
4567
}
4668

4769
async fn close_connection(&mut self) -> Result<(), Error> {
48-
if !self.connection_setup || self.client.is_none() {
70+
if !self.connection_setup || self.close_sender.is_none() {
4971
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
5072
}
51-
let result = self.client.as_ref().unwrap().shutdown(Shutdown::Both);
52-
if let Err(err) = result {
53-
return Err(Error::Internal(format!("{}", err)));
54-
}
73+
self.close_sender.as_ref().unwrap().send(()).await.unwrap();
5574
Ok(())
5675
}
5776

5877
async fn send(&mut self, buffer: Buffer) -> Result<(), Error> {
59-
if !self.connection_setup || self.client.is_none() {
78+
if !self.connection_setup || self.write_data_sender.is_none() {
6079
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
6180
}
62-
let result = self.client.as_mut().unwrap().write_all(&buffer).await;
63-
if let Err(err) = result {
64-
return Err(Error::Internal(format!("{}", err)));
65-
}
81+
self.write_data_sender
82+
.as_ref()
83+
.unwrap()
84+
.send(buffer)
85+
.await
86+
.unwrap();
6687
Ok(())
6788
}
6889

69-
async fn read_data(&mut self) -> Option<Buffer> {
70-
if self.client.is_none() {
71-
None
72-
} else {
73-
let client = self.client.as_mut().unwrap();
74-
let mut buffer = Vec::new();
75-
let mut read_size = READ_BUFFER_SIZE;
76-
while read_size > 0 {
77-
let mut buf = [0u8; READ_BUFFER_SIZE];
78-
let result = io::timeout(Duration::from_millis(10), client.read(&mut buf)).await;
79-
if result.is_err() {
80-
return Some(buffer);
90+
fn set_data_listener(&mut self, listener: Arc<JunoModuleImpl>) {
91+
self.on_data_handler = Some(listener);
92+
}
93+
94+
fn get_data_listener(&self) -> &Option<Arc<JunoModuleImpl>> {
95+
&self.on_data_handler
96+
}
97+
}
98+
99+
async fn read_data_from_socket(
100+
socket_path: String,
101+
init_sender: Sender<Result<(), Error>>,
102+
juno_impl: Arc<JunoModuleImpl>,
103+
mut write_receiver: UnboundedReceiver<Vec<u8>>,
104+
mut close_receiver: UnboundedReceiver<()>,
105+
) {
106+
let result = TcpStream::connect(socket_path).await;
107+
if let Err(err) = result {
108+
init_sender
109+
.send(Err(Error::Internal(format!("{}", err))))
110+
.unwrap_or(());
111+
return;
112+
}
113+
let client = result.unwrap();
114+
init_sender.send(Ok(())).unwrap_or(());
115+
let reader = BufReader::new(&client);
116+
let mut lines = reader.lines();
117+
let mut read_future = lines.next();
118+
let mut write_future = write_receiver.next();
119+
let mut close_future = close_receiver.next();
120+
let mut read_or_write_future = future::select(read_future, write_future);
121+
while let Either::Left((read_write_future, next_close_future)) =
122+
future::select(read_or_write_future, close_future).await
123+
{
124+
// Either a read or a write event has happened
125+
close_future = next_close_future;
126+
match read_write_future {
127+
Either::Left((read_future_result, next_write_future)) => {
128+
// Read event has happened
129+
read_future = lines.next();
130+
write_future = next_write_future;
131+
read_or_write_future = future::select(read_future, write_future);
132+
// Send the read data to the MPSC sender
133+
if let Some(Ok(line)) = read_future_result {
134+
let juno_impl = juno_impl.clone();
135+
task::spawn(async move {
136+
juno_impl.on_data(line.as_bytes().to_vec()).await;
137+
});
138+
}
139+
}
140+
Either::Right((write_future_result, next_read_future)) => {
141+
// Write event has happened
142+
read_future = next_read_future;
143+
write_future = write_receiver.next();
144+
read_or_write_future = future::select(read_future, write_future);
145+
// Write the recieved bytes to the socket
146+
if let Some(bytes) = write_future_result {
147+
let mut socket = &client;
148+
if let Err(err) = socket.write_all(&bytes).await {
149+
println!("Error while sending data to socket: {}", err);
150+
}
81151
}
82-
read_size = result.unwrap();
83-
buffer.extend(buf[..read_size].iter());
84152
}
85-
Some(buffer)
86153
}
87154
}
155+
// Either a read, nor a write event has happened.
156+
// This means the socket close event happened. Shutdown the socket and close any mpsc channels
157+
drop(lines);
158+
write_receiver.close();
159+
close_receiver.close();
88160
}

0 commit comments

Comments
 (0)