Skip to content

Commit 74e5788

Browse files
committed
Added inet sockets support
1 parent 4fdf10d commit 74e5788

File tree

4 files changed

+196
-3
lines changed

4 files changed

+196
-3
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ name = "gotham"
55
version = "0.1.0"
66
license = "MIT"
77
description = "A helper rust library for the gotham microservices framework"
8-
homepage = "github.com/bytesonus/gotham-rust"
9-
repository = "github.com/bytesonus/gotham-rust"
8+
homepage = "https://github.com/bytesonus/gotham-rust"
9+
repository = "https://github.com/bytesonus/gotham-rust"
1010

1111
[dependencies]
1212
async-std = "1.4.0"
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use crate::{
2+
connection::{BaseConnection, Buffer},
3+
utils::Error,
4+
};
5+
6+
use async_std::{io::BufReader, net::TcpStream, prelude::*, task};
7+
use async_trait::async_trait;
8+
9+
use futures::{
10+
channel::{
11+
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
12+
oneshot::{channel, Sender},
13+
},
14+
future::{self, Either},
15+
};
16+
use futures_util::SinkExt;
17+
18+
pub struct InetSocketConnection {
19+
connection_setup: bool,
20+
read_data_receiver: Option<UnboundedReceiver<Vec<u8>>>,
21+
write_data_sender: Option<UnboundedSender<Vec<u8>>>,
22+
close_sender: Option<UnboundedSender<()>>,
23+
socket_path: String,
24+
}
25+
26+
impl InetSocketConnection {
27+
pub fn new(socket_path: String) -> Self {
28+
InetSocketConnection {
29+
connection_setup: false,
30+
read_data_receiver: None,
31+
write_data_sender: None,
32+
close_sender: None,
33+
socket_path,
34+
}
35+
}
36+
}
37+
38+
async fn read_data_from_socket(
39+
port: String,
40+
init_sender: Sender<Result<(), Error>>,
41+
mut read_sender: UnboundedSender<Vec<u8>>,
42+
mut write_receiver: UnboundedReceiver<Vec<u8>>,
43+
mut close_receiver: UnboundedReceiver<()>,
44+
) {
45+
let result = TcpStream::connect(port).await;
46+
if let Err(err) = result {
47+
init_sender
48+
.send(Err(Error::Internal(format!("{}", err))))
49+
.unwrap_or(());
50+
return;
51+
}
52+
let client = result.unwrap();
53+
init_sender.send(Ok(())).unwrap_or(());
54+
let reader = BufReader::new(&client);
55+
let mut lines = reader.lines();
56+
let mut read_future = lines.next();
57+
let mut write_future = write_receiver.next();
58+
let mut close_future = close_receiver.next();
59+
let mut read_or_write_future = future::select(read_future, write_future);
60+
while let Either::Left((read_write_future, next_close_future)) =
61+
future::select(read_or_write_future, close_future).await
62+
{
63+
// Either a read or a write event has happened
64+
close_future = next_close_future;
65+
match read_write_future {
66+
Either::Left((read_future_result, next_write_future)) => {
67+
// Read event has happened
68+
read_future = lines.next();
69+
write_future = next_write_future;
70+
read_or_write_future = future::select(read_future, write_future);
71+
// Send the read data to the MPSC sender
72+
if let Some(Ok(line)) = read_future_result {
73+
let result = read_sender.send(line.as_bytes().to_vec()).await;
74+
if let Err(err) = result {
75+
println!("Error queing data from the socket to the module: {}", err);
76+
}
77+
}
78+
}
79+
Either::Right((write_future_result, next_read_future)) => {
80+
// Write event has happened
81+
read_future = next_read_future;
82+
write_future = write_receiver.next();
83+
read_or_write_future = future::select(read_future, write_future);
84+
// Write the recieved bytes to the socket
85+
if let Some(bytes) = write_future_result {
86+
let mut socket = &client;
87+
if let Err(err) = socket.write_all(&bytes).await {
88+
println!("Error while sending data to socket: {}", err);
89+
}
90+
}
91+
}
92+
}
93+
}
94+
// Either a read, nor a write event has happened.
95+
// This means the socket close event happened. Shutdown the socket and close any mpsc channels
96+
drop(lines);
97+
let result = read_sender.close().await;
98+
if let Err(err) = result {
99+
println!("Error closing the MPSC sender to queue data: {}", err);
100+
}
101+
write_receiver.close();
102+
close_receiver.close();
103+
}
104+
105+
#[async_trait]
106+
impl BaseConnection for InetSocketConnection {
107+
async fn setup_connection(&mut self) -> Result<(), Error> {
108+
if self.connection_setup {
109+
panic!("Cannot call setup_connection() more than once!");
110+
}
111+
let (read_data_sender, read_data_receiver) = unbounded::<Vec<u8>>();
112+
let (write_data_sender, write_data_receiver) = unbounded::<Vec<u8>>();
113+
let (close_sender, close_receiver) = unbounded::<()>();
114+
let (init_sender, init_receiver) = channel::<Result<(), Error>>();
115+
116+
self.read_data_receiver = Some(read_data_receiver);
117+
self.write_data_sender = Some(write_data_sender);
118+
self.close_sender = Some(close_sender);
119+
let socket_path = self.socket_path.clone();
120+
121+
task::spawn(async {
122+
read_data_from_socket(
123+
socket_path,
124+
init_sender,
125+
read_data_sender,
126+
write_data_receiver,
127+
close_receiver,
128+
)
129+
.await;
130+
});
131+
132+
self.connection_setup = true;
133+
init_receiver.await.unwrap()
134+
}
135+
136+
async fn close_connection(&mut self) {
137+
if !self.connection_setup || self.close_sender.is_none() {
138+
panic!("Cannot close a connection that hasn't been established yet. Did you forget to call setup_connection()?");
139+
}
140+
let mut sender = &self.close_sender.as_ref().unwrap().clone();
141+
if let Err(err) = sender.send(()).await {
142+
println!("Error attempting to close connection: {}", err);
143+
}
144+
}
145+
146+
async fn send(&mut self, buffer: Buffer) {
147+
if !self.connection_setup || self.write_data_sender.is_none() {
148+
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
149+
}
150+
let mut sender = &self.write_data_sender.as_ref().unwrap().clone();
151+
if let Err(err) = sender.send(buffer).await {
152+
println!("Error attempting to send data to connection: {}", err);
153+
}
154+
}
155+
156+
fn get_data_receiver(&mut self) -> UnboundedReceiver<Buffer> {
157+
if !self.connection_setup || self.read_data_receiver.is_none() {
158+
panic!("Cannot get read sender to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
159+
}
160+
self.read_data_receiver.take().unwrap()
161+
}
162+
163+
fn clone_write_sender(&self) -> UnboundedSender<Buffer> {
164+
if !self.connection_setup || self.write_data_sender.is_none() {
165+
panic!("Cannot get write sender of a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
166+
}
167+
self.write_data_sender.as_ref().unwrap().clone()
168+
}
169+
}

src/connection/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
mod base_connection;
2+
#[cfg(target_family = "unix")]
23
mod unix_socket_connection;
4+
mod inet_socket_connection;
35

46
pub use base_connection::BaseConnection;
7+
#[cfg(target_family = "unix")]
58
pub use unix_socket_connection::UnixSocketConnection;
9+
pub use inet_socket_connection::InetSocketConnection;
610

711
pub type Buffer = Vec<u8>;

src/gotham_module.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
use crate::{
2-
connection::{BaseConnection, Buffer, UnixSocketConnection},
2+
connection::{BaseConnection, Buffer},
33
models::{BaseMessage, Value},
44
protocol::BaseProtocol,
55
utils::{self, Error, Result},
66
};
7+
8+
#[cfg(target_family = "unix")]
9+
use crate::connection::UnixSocketConnection;
10+
#[cfg(target_family = "windows")]
11+
use crate::connection::InetSocketConnection;
12+
713
use std::collections::HashMap;
814
use async_std::{
915
prelude::*,
@@ -31,6 +37,7 @@ pub struct GothamModule {
3137
}
3238

3339
impl GothamModule {
40+
#[cfg(target_family = "unix")]
3441
pub fn default(socket_path: String) -> Self {
3542
GothamModule {
3643
protocol: BaseProtocol::default(),
@@ -43,6 +50,19 @@ impl GothamModule {
4350
}
4451
}
4552

53+
#[cfg(target_family = "windows")]
54+
pub fn default(port: String) -> Self {
55+
GothamModule {
56+
protocol: BaseProtocol::default(),
57+
connection: Box::new(InetSocketConnection::new(port)),
58+
requests: Arc::new(Mutex::new(HashMap::new())),
59+
functions: Arc::new(Mutex::new(HashMap::new())),
60+
hook_listeners: Arc::new(Mutex::new(HashMap::new())),
61+
message_buffer: vec![],
62+
registered: false,
63+
}
64+
}
65+
4666
pub fn new(protocol: BaseProtocol, connection: Box<dyn BaseConnection>) -> Self {
4767
GothamModule {
4868
protocol,

0 commit comments

Comments
 (0)