Skip to content

Commit 4229113

Browse files
committed
Initial restructure. Doesn't work entirely
1 parent 553498b commit 4229113

File tree

13 files changed

+345
-469
lines changed

13 files changed

+345
-469
lines changed

src/connection/base_connection.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
use crate::{connection::Buffer, utils::Error};
22
use async_trait::async_trait;
3-
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
43

54
#[async_trait]
65
pub trait BaseConnection {
76
async fn setup_connection(&mut self) -> Result<(), Error>;
8-
async fn close_connection(&mut self);
9-
async fn send(&mut self, buffer: Buffer);
10-
11-
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer>;
12-
fn clone_write_sender(&self) -> UnboundedSender<Buffer>;
7+
async fn close_connection(&mut self) -> Result<(), Error>;
8+
async fn send(&mut self, buffer: Buffer) -> Result<(), Error>;
9+
async fn read_data(&mut self) -> Option<Buffer>;
1310
}
Lines changed: 42 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,169 +1,85 @@
11
use crate::{
22
connection::{BaseConnection, Buffer},
3-
utils::Error,
3+
utils::{Error, READ_BUFFER_SIZE},
44
};
55

6-
use async_std::{io::BufReader, net::TcpStream, prelude::*, task};
7-
use async_trait::async_trait;
8-
9-
use futures::{
10-
channel::{
11-
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
12-
oneshot::{channel, Sender},
13-
},
14-
future::{self, Either},
6+
use async_std::{
7+
net::{Shutdown, TcpStream},
8+
prelude::*,
159
};
16-
use futures_util::SinkExt;
10+
use async_trait::async_trait;
1711

1812
pub struct InetSocketConnection {
1913
connection_setup: bool,
20-
read_data_receiver: Option<UnboundedReceiver<Vec<u8>>>,
21-
write_data_sender: Option<UnboundedSender<Vec<u8>>>,
22-
close_sender: Option<UnboundedSender<()>>,
2314
socket_path: String,
15+
client: Option<TcpStream>,
2416
}
2517

2618
impl InetSocketConnection {
2719
pub fn new(socket_path: String) -> Self {
2820
InetSocketConnection {
2921
connection_setup: false,
30-
read_data_receiver: None,
31-
write_data_sender: None,
32-
close_sender: None,
3322
socket_path,
23+
client: None,
3424
}
3525
}
3626
}
3727

38-
async fn read_data_from_socket(
39-
port: String,
40-
init_sender: Sender<Result<(), Error>>,
41-
mut read_sender: UnboundedSender<Vec<u8>>,
42-
mut write_receiver: UnboundedReceiver<Vec<u8>>,
43-
mut close_receiver: UnboundedReceiver<()>,
44-
) {
45-
let result = TcpStream::connect(port).await;
46-
if let Err(err) = result {
47-
init_sender
48-
.send(Err(Error::Internal(format!("{}", err))))
49-
.unwrap_or(());
50-
return;
51-
}
52-
let client = result.unwrap();
53-
init_sender.send(Ok(())).unwrap_or(());
54-
let reader = BufReader::new(&client);
55-
let mut lines = reader.lines();
56-
let mut read_future = lines.next();
57-
let mut write_future = write_receiver.next();
58-
let mut close_future = close_receiver.next();
59-
let mut read_or_write_future = future::select(read_future, write_future);
60-
while let Either::Left((read_write_future, next_close_future)) =
61-
future::select(read_or_write_future, close_future).await
62-
{
63-
// Either a read or a write event has happened
64-
close_future = next_close_future;
65-
match read_write_future {
66-
Either::Left((read_future_result, next_write_future)) => {
67-
// Read event has happened
68-
read_future = lines.next();
69-
write_future = next_write_future;
70-
read_or_write_future = future::select(read_future, write_future);
71-
// Send the read data to the MPSC sender
72-
if let Some(Ok(line)) = read_future_result {
73-
let result = read_sender.send(line.as_bytes().to_vec()).await;
74-
if let Err(err) = result {
75-
println!("Error queing data from the socket to the module: {}", err);
76-
}
77-
}
78-
}
79-
Either::Right((write_future_result, next_read_future)) => {
80-
// Write event has happened
81-
read_future = next_read_future;
82-
write_future = write_receiver.next();
83-
read_or_write_future = future::select(read_future, write_future);
84-
// Write the recieved bytes to the socket
85-
if let Some(bytes) = write_future_result {
86-
let mut socket = &client;
87-
if let Err(err) = socket.write_all(&bytes).await {
88-
println!("Error while sending data to socket: {}", err);
89-
}
90-
}
91-
}
92-
}
93-
}
94-
// Either a read, nor a write event has happened.
95-
// This means the socket close event happened. Shutdown the socket and close any mpsc channels
96-
drop(lines);
97-
let result = read_sender.close().await;
98-
if let Err(err) = result {
99-
println!("Error closing the MPSC sender to queue data: {}", err);
100-
}
101-
write_receiver.close();
102-
close_receiver.close();
103-
}
104-
10528
#[async_trait]
10629
impl BaseConnection for InetSocketConnection {
10730
async fn setup_connection(&mut self) -> Result<(), Error> {
10831
if self.connection_setup {
10932
panic!("Cannot call setup_connection() more than once!");
11033
}
111-
let (read_data_sender, read_data_receiver) = unbounded::<Vec<u8>>();
112-
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
113-
let (close_sender, close_receiver) = unbounded::<()>();
114-
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
115-
116-
self.read_data_receiver = Some(read_data_receiver);
117-
self.write_data_sender = Some(write_data_sender);
118-
self.close_sender = Some(close_sender);
119-
let socket_path = self.socket_path.clone();
120-
121-
task::spawn(async {
122-
read_data_from_socket(
123-
socket_path,
124-
init_sender,
125-
read_data_sender,
126-
write_data_receiver,
127-
close_receiver,
128-
)
129-
.await;
130-
});
34+
let result = TcpStream::connect(&self.socket_path).await;
35+
if let Err(err) = result {
36+
return Err(Error::Internal(format!("{}", err)));
37+
}
38+
self.client = Some(result.unwrap());
13139

13240
self.connection_setup = true;
133-
init_receiver.await.unwrap()
41+
Ok(())
13442
}
13543

136-
async fn close_connection(&mut self) {
137-
if !self.connection_setup || self.close_sender.is_none() {
44+
async fn close_connection(&mut self) -> Result<(), Error> {
45+
if !self.connection_setup || self.client.is_none() {
13846
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
13947
}
140-
let mut sender = &self.close_sender.as_ref().unwrap().clone();
141-
if let Err(err) = sender.send(()).await {
142-
println!("Error attempting to close connection: {}", err);
48+
let result = self.client.as_ref().unwrap().shutdown(Shutdown::Both);
49+
if let Err(err) = result {
50+
return Err(Error::Internal(format!("{}", err)));
14351
}
52+
Ok(())
14453
}
14554

146-
async fn send(&mut self, buffer: Buffer) {
147-
if !self.connection_setup || self.write_data_sender.is_none() {
55+
async fn send(&mut self, buffer: Buffer) -> Result<(), Error> {
56+
if !self.connection_setup || self.client.is_none() {
14857
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
14958
}
150-
let mut sender = &self.write_data_sender.as_ref().unwrap().clone();
151-
if let Err(err) = sender.send(buffer).await {
152-
println!("Error attempting to send data to connection: {}", err);
59+
let result = self.client.as_mut().unwrap().write_all(&buffer).await;
60+
if let Err(err) = result {
61+
return Err(Error::Internal(format!("{}", err)));
15362
}
63+
Ok(())
15464
}
15565

156-
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer> {
157-
if !self.connection_setup || self.read_data_receiver.is_none() {
158-
panic!("Cannot get read sender to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
159-
}
160-
self.read_data_receiver.take().unwrap()
161-
}
162-
163-
fn clone_write_sender(&self) -> UnboundedSender<Buffer> {
164-
if !self.connection_setup || self.write_data_sender.is_none() {
165-
panic!("Cannot get write sender of a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
66+
async fn read_data(&mut self) -> Option<Buffer> {
67+
if self.client.is_none() {
68+
None
69+
} else {
70+
let client = self.client.as_mut().unwrap();
71+
let mut buffer = Vec::new();
72+
let mut read_size = READ_BUFFER_SIZE;
73+
while read_size > 0 {
74+
let mut buf = [0u8; READ_BUFFER_SIZE];
75+
let result = client.read(&mut buf).await;
76+
if result.is_err() {
77+
return None;
78+
}
79+
read_size = result.unwrap();
80+
buffer.extend(buf[..read_size].into_iter());
81+
}
82+
Some(buffer)
16683
}
167-
self.write_data_sender.as_ref().unwrap().clone()
16884
}
16985
}

0 commit comments

Comments
 (0)