Skip to content

Commit b3c9166

Browse files
Merge pull request #20 from bytesonus/feature/non-mpsc-connection
Refactor to allow independent handling of data events
2 parents 553498b + 7261adf commit b3c9166

File tree

11 files changed

+427
-387
lines changed

11 files changed

+427
-387
lines changed

src/connection/base_connection.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
use crate::{connection::Buffer, utils::Error};
1+
use crate::{connection::Buffer, juno_module_impl::JunoModuleImpl, utils::Error};
22
use async_trait::async_trait;
3-
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
3+
use std::sync::Arc;
44

55
#[async_trait]
66
pub trait BaseConnection {
77
async fn setup_connection(&mut self) -> Result<(), Error>;
8-
async fn close_connection(&mut self);
9-
async fn send(&mut self, buffer: Buffer);
8+
async fn close_connection(&mut self) -> Result<(), Error>;
9+
async fn send(&mut self, buffer: Buffer) -> Result<(), Error>;
1010

11-
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer>;
12-
fn clone_write_sender(&self) -> UnboundedSender<Buffer>;
11+
fn set_data_listener(&mut self, listener: Arc<JunoModuleImpl>);
12+
fn get_data_listener(&self) -> &Option<Arc<JunoModuleImpl>>;
1313
}
Lines changed: 75 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,109 @@
11
use crate::{
22
connection::{BaseConnection, Buffer},
3+
juno_module_impl::JunoModuleImpl,
34
utils::Error,
45
};
6+
use std::sync::Arc;
57

68
use async_std::{io::BufReader, net::TcpStream, prelude::*, task};
79
use async_trait::async_trait;
8-
910
use futures::{
1011
channel::{
1112
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
1213
oneshot::{channel, Sender},
1314
},
1415
future::{self, Either},
16+
SinkExt,
1517
};
16-
use futures_util::SinkExt;
1718

1819
pub struct InetSocketConnection {
1920
connection_setup: bool,
20-
read_data_receiver: Option<UnboundedReceiver<Vec<u8>>>,
21-
write_data_sender: Option<UnboundedSender<Vec<u8>>>,
22-
close_sender: Option<UnboundedSender<()>>,
2321
socket_path: String,
22+
on_data_handler: Option<Arc<JunoModuleImpl>>,
23+
write_data_sender: Option<UnboundedSender<Buffer>>,
24+
close_sender: Option<UnboundedSender<()>>,
2425
}
2526

2627
impl InetSocketConnection {
2728
pub fn new(socket_path: String) -> Self {
2829
InetSocketConnection {
2930
connection_setup: false,
30-
read_data_receiver: None,
31+
socket_path,
32+
on_data_handler: None,
3133
write_data_sender: None,
3234
close_sender: None,
33-
socket_path,
3435
}
3536
}
3637
}
3738

39+
#[async_trait]
40+
impl BaseConnection for InetSocketConnection {
41+
async fn setup_connection(&mut self) -> Result<(), Error> {
42+
if self.connection_setup {
43+
panic!("Cannot call setup_connection() more than once!");
44+
}
45+
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
46+
let (close_sender, close_receiver) = unbounded::<()>();
47+
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
48+
49+
self.write_data_sender = Some(write_data_sender);
50+
self.close_sender = Some(close_sender);
51+
let socket_path = self.socket_path.clone();
52+
let juno_module_impl = self.on_data_handler.as_ref().unwrap().clone();
53+
54+
task::spawn(async {
55+
read_data_from_socket(
56+
socket_path,
57+
init_sender,
58+
juno_module_impl,
59+
write_data_receiver,
60+
close_receiver,
61+
)
62+
.await;
63+
});
64+
65+
self.connection_setup = true;
66+
init_receiver.await.unwrap()
67+
}
68+
69+
async fn close_connection(&mut self) -> Result<(), Error> {
70+
if !self.connection_setup || self.close_sender.is_none() {
71+
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
72+
}
73+
self.close_sender.as_ref().unwrap().send(()).await.unwrap();
74+
Ok(())
75+
}
76+
77+
async fn send(&mut self, buffer: Buffer) -> Result<(), Error> {
78+
if !self.connection_setup || self.write_data_sender.is_none() {
79+
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
80+
}
81+
self.write_data_sender
82+
.as_ref()
83+
.unwrap()
84+
.send(buffer)
85+
.await
86+
.unwrap();
87+
Ok(())
88+
}
89+
90+
fn set_data_listener(&mut self, listener: Arc<JunoModuleImpl>) {
91+
self.on_data_handler = Some(listener);
92+
}
93+
94+
fn get_data_listener(&self) -> &Option<Arc<JunoModuleImpl>> {
95+
&self.on_data_handler
96+
}
97+
}
98+
3899
async fn read_data_from_socket(
39-
port: String,
100+
connect_addr: String,
40101
init_sender: Sender<Result<(), Error>>,
41-
mut read_sender: UnboundedSender<Vec<u8>>,
102+
juno_impl: Arc<JunoModuleImpl>,
42103
mut write_receiver: UnboundedReceiver<Vec<u8>>,
43104
mut close_receiver: UnboundedReceiver<()>,
44105
) {
45-
let result = TcpStream::connect(port).await;
106+
let result = TcpStream::connect(connect_addr).await;
46107
if let Err(err) = result {
47108
init_sender
48109
.send(Err(Error::Internal(format!("{}", err))))
@@ -70,10 +131,10 @@ async fn read_data_from_socket(
70131
read_or_write_future = future::select(read_future, write_future);
71132
// Send the read data to the MPSC sender
72133
if let Some(Ok(line)) = read_future_result {
73-
let result = read_sender.send(line.as_bytes().to_vec()).await;
74-
if let Err(err) = result {
75-
println!("Error queing data from the socket to the module: {}", err);
76-
}
134+
let juno_impl = juno_impl.clone();
135+
task::spawn(async move {
136+
juno_impl.on_data(line.as_bytes().to_vec()).await;
137+
});
77138
}
78139
}
79140
Either::Right((write_future_result, next_read_future)) => {
@@ -94,76 +155,6 @@ async fn read_data_from_socket(
94155
// Either a read, nor a write event has happened.
95156
// This means the socket close event happened. Shutdown the socket and close any mpsc channels
96157
drop(lines);
97-
let result = read_sender.close().await;
98-
if let Err(err) = result {
99-
println!("Error closing the MPSC sender to queue data: {}", err);
100-
}
101158
write_receiver.close();
102159
close_receiver.close();
103160
}
104-
105-
#[async_trait]
106-
impl BaseConnection for InetSocketConnection {
107-
async fn setup_connection(&mut self) -> Result<(), Error> {
108-
if self.connection_setup {
109-
panic!("Cannot call setup_connection() more than once!");
110-
}
111-
let (read_data_sender, read_data_receiver) = unbounded::<Vec<u8>>();
112-
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
113-
let (close_sender, close_receiver) = unbounded::<()>();
114-
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
115-
116-
self.read_data_receiver = Some(read_data_receiver);
117-
self.write_data_sender = Some(write_data_sender);
118-
self.close_sender = Some(close_sender);
119-
let socket_path = self.socket_path.clone();
120-
121-
task::spawn(async {
122-
read_data_from_socket(
123-
socket_path,
124-
init_sender,
125-
read_data_sender,
126-
write_data_receiver,
127-
close_receiver,
128-
)
129-
.await;
130-
});
131-
132-
self.connection_setup = true;
133-
init_receiver.await.unwrap()
134-
}
135-
136-
async fn close_connection(&mut self) {
137-
if !self.connection_setup || self.close_sender.is_none() {
138-
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
139-
}
140-
let mut sender = &self.close_sender.as_ref().unwrap().clone();
141-
if let Err(err) = sender.send(()).await {
142-
println!("Error attempting to close connection: {}", err);
143-
}
144-
}
145-
146-
async fn send(&mut self, buffer: Buffer) {
147-
if !self.connection_setup || self.write_data_sender.is_none() {
148-
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
149-
}
150-
let mut sender = &self.write_data_sender.as_ref().unwrap().clone();
151-
if let Err(err) = sender.send(buffer).await {
152-
println!("Error attempting to send data to connection: {}", err);
153-
}
154-
}
155-
156-
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer> {
157-
if !self.connection_setup || self.read_data_receiver.is_none() {
158-
panic!("Cannot get read sender to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
159-
}
160-
self.read_data_receiver.take().unwrap()
161-
}
162-
163-
fn clone_write_sender(&self) -> UnboundedSender<Buffer> {
164-
if !self.connection_setup || self.write_data_sender.is_none() {
165-
panic!("Cannot get write sender of a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
166-
}
167-
self.write_data_sender.as_ref().unwrap().clone()
168-
}
169-
}

0 commit comments

Comments
 (0)