Skip to content

Commit 115c49f

Browse files
committed
Got a basic working version
1 parent 4229113 commit 115c49f

File tree

9 files changed

+356
-183
lines changed

9 files changed

+356
-183
lines changed

src/connection/base_connection.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use crate::{connection::Buffer, utils::Error};
22
use async_trait::async_trait;
3+
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
34

45
#[async_trait]
56
pub trait BaseConnection {
67
async fn setup_connection(&mut self) -> Result<(), Error>;
7-
async fn close_connection(&mut self) -> Result<(), Error>;
8-
async fn send(&mut self, buffer: Buffer) -> Result<(), Error>;
9-
async fn read_data(&mut self) -> Option<Buffer>;
8+
async fn close_connection(&mut self);
9+
async fn send(&mut self, buffer: Buffer);
10+
11+
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer>;
12+
fn clone_write_sender(&self) -> UnboundedSender<Buffer>;
1013
}
Lines changed: 126 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,169 @@
11
use crate::{
22
connection::{BaseConnection, Buffer},
3-
utils::{Error, READ_BUFFER_SIZE},
3+
utils::Error,
44
};
55

6-
use async_std::{
7-
net::{Shutdown, TcpStream},
8-
prelude::*,
9-
};
6+
use async_std::{io::BufReader, net::TcpStream, prelude::*, task};
107
use async_trait::async_trait;
118

9+
use futures::{
10+
channel::{
11+
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
12+
oneshot::{channel, Sender},
13+
},
14+
future::{self, Either},
15+
};
16+
use futures_util::SinkExt;
17+
1218
pub struct InetSocketConnection {
1319
connection_setup: bool,
20+
read_data_receiver: Option<UnboundedReceiver<Vec<u8>>>,
21+
write_data_sender: Option<UnboundedSender<Vec<u8>>>,
22+
close_sender: Option<UnboundedSender<()>>,
1423
socket_path: String,
15-
client: Option<TcpStream>,
1624
}
1725

1826
impl InetSocketConnection {
1927
pub fn new(socket_path: String) -> Self {
2028
InetSocketConnection {
2129
connection_setup: false,
30+
read_data_receiver: None,
31+
write_data_sender: None,
32+
close_sender: None,
2233
socket_path,
23-
client: None,
2434
}
2535
}
2636
}
2737

38+
async fn read_data_from_socket(
39+
port: String,
40+
init_sender: Sender<Result<(), Error>>,
41+
mut read_sender: UnboundedSender<Vec<u8>>,
42+
mut write_receiver: UnboundedReceiver<Vec<u8>>,
43+
mut close_receiver: UnboundedReceiver<()>,
44+
) {
45+
let result = TcpStream::connect(port).await;
46+
if let Err(err) = result {
47+
init_sender
48+
.send(Err(Error::Internal(format!("{}", err))))
49+
.unwrap_or(());
50+
return;
51+
}
52+
let client = result.unwrap();
53+
init_sender.send(Ok(())).unwrap_or(());
54+
let reader = BufReader::new(&client);
55+
let mut lines = reader.lines();
56+
let mut read_future = lines.next();
57+
let mut write_future = write_receiver.next();
58+
let mut close_future = close_receiver.next();
59+
let mut read_or_write_future = future::select(read_future, write_future);
60+
while let Either::Left((read_write_future, next_close_future)) =
61+
future::select(read_or_write_future, close_future).await
62+
{
63+
// Either a read or a write event has happened
64+
close_future = next_close_future;
65+
match read_write_future {
66+
Either::Left((read_future_result, next_write_future)) => {
67+
// Read event has happened
68+
read_future = lines.next();
69+
write_future = next_write_future;
70+
read_or_write_future = future::select(read_future, write_future);
71+
// Send the read data to the MPSC sender
72+
if let Some(Ok(line)) = read_future_result {
73+
let result = read_sender.send(line.as_bytes().to_vec()).await;
74+
if let Err(err) = result {
75+
println!("Error queing data from the socket to the module: {}", err);
76+
}
77+
}
78+
}
79+
Either::Right((write_future_result, next_read_future)) => {
80+
// Write event has happened
81+
read_future = next_read_future;
82+
write_future = write_receiver.next();
83+
read_or_write_future = future::select(read_future, write_future);
84+
// Write the recieved bytes to the socket
85+
if let Some(bytes) = write_future_result {
86+
let mut socket = &client;
87+
if let Err(err) = socket.write_all(&bytes).await {
88+
println!("Error while sending data to socket: {}", err);
89+
}
90+
}
91+
}
92+
}
93+
}
94+
// Either a read, nor a write event has happened.
95+
// This means the socket close event happened. Shutdown the socket and close any mpsc channels
96+
drop(lines);
97+
let result = read_sender.close().await;
98+
if let Err(err) = result {
99+
println!("Error closing the MPSC sender to queue data: {}", err);
100+
}
101+
write_receiver.close();
102+
close_receiver.close();
103+
}
104+
28105
#[async_trait]
29106
impl BaseConnection for InetSocketConnection {
30107
async fn setup_connection(&mut self) -> Result<(), Error> {
31108
if self.connection_setup {
32109
panic!("Cannot call setup_connection() more than once!");
33110
}
34-
let result = TcpStream::connect(&self.socket_path).await;
35-
if let Err(err) = result {
36-
return Err(Error::Internal(format!("{}", err)));
37-
}
38-
self.client = Some(result.unwrap());
111+
let (read_data_sender, read_data_receiver) = unbounded::<Vec<u8>>();
112+
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
113+
let (close_sender, close_receiver) = unbounded::<()>();
114+
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
115+
116+
self.read_data_receiver = Some(read_data_receiver);
117+
self.write_data_sender = Some(write_data_sender);
118+
self.close_sender = Some(close_sender);
119+
let socket_path = self.socket_path.clone();
120+
121+
task::spawn(async {
122+
read_data_from_socket(
123+
socket_path,
124+
init_sender,
125+
read_data_sender,
126+
write_data_receiver,
127+
close_receiver,
128+
)
129+
.await;
130+
});
39131

40132
self.connection_setup = true;
41-
Ok(())
133+
init_receiver.await.unwrap()
42134
}
43135

44-
async fn close_connection(&mut self) -> Result<(), Error> {
45-
if !self.connection_setup || self.client.is_none() {
136+
async fn close_connection(&mut self) {
137+
if !self.connection_setup || self.close_sender.is_none() {
46138
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
47139
}
48-
let result = self.client.as_ref().unwrap().shutdown(Shutdown::Both);
49-
if let Err(err) = result {
50-
return Err(Error::Internal(format!("{}", err)));
140+
let mut sender = &self.close_sender.as_ref().unwrap().clone();
141+
if let Err(err) = sender.send(()).await {
142+
println!("Error attempting to close connection: {}", err);
51143
}
52-
Ok(())
53144
}
54145

55-
async fn send(&mut self, buffer: Buffer) -> Result<(), Error> {
56-
if !self.connection_setup || self.client.is_none() {
146+
async fn send(&mut self, buffer: Buffer) {
147+
if !self.connection_setup || self.write_data_sender.is_none() {
57148
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
58149
}
59-
let result = self.client.as_mut().unwrap().write_all(&buffer).await;
60-
if let Err(err) = result {
61-
return Err(Error::Internal(format!("{}", err)));
150+
let mut sender = &self.write_data_sender.as_ref().unwrap().clone();
151+
if let Err(err) = sender.send(buffer).await {
152+
println!("Error attempting to send data to connection: {}", err);
62153
}
63-
Ok(())
64154
}
65155

66-
async fn read_data(&mut self) -> Option<Buffer> {
67-
if self.client.is_none() {
68-
None
69-
} else {
70-
let client = self.client.as_mut().unwrap();
71-
let mut buffer = Vec::new();
72-
let mut read_size = READ_BUFFER_SIZE;
73-
while read_size > 0 {
74-
let mut buf = [0u8; READ_BUFFER_SIZE];
75-
let result = client.read(&mut buf).await;
76-
if result.is_err() {
77-
return None;
78-
}
79-
read_size = result.unwrap();
80-
buffer.extend(buf[..read_size].into_iter());
81-
}
82-
Some(buffer)
156+
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer> {
157+
if !self.connection_setup || self.read_data_receiver.is_none() {
158+
panic!("Cannot get read sender to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
159+
}
160+
self.read_data_receiver.take().unwrap()
161+
}
162+
163+
fn clone_write_sender(&self) -> UnboundedSender<Buffer> {
164+
if !self.connection_setup || self.write_data_sender.is_none() {
165+
panic!("Cannot get write sender of a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
83166
}
167+
self.write_data_sender.as_ref().unwrap().clone()
84168
}
85169
}

0 commit comments

Comments
 (0)