Skip to content

Commit dda89c3

Browse files
committed
Merge branch 'feature/arc-module-impl' into feature/non-mpsc-connection
2 parents 90fddbf + 115c49f commit dda89c3

File tree

5 files changed

+33
-11
lines changed

5 files changed

+33
-11
lines changed

src/connection/inet_socket_connection.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ use crate::{
22
connection::{BaseConnection, Buffer},
33
utils::{Error, READ_BUFFER_SIZE},
44
};
5-
use std::time::Duration;
65

7-
use async_std::{io, net::Shutdown, net::TcpStream, prelude::*};
6+
use async_std::{
7+
net::{Shutdown, TcpStream},
8+
prelude::*,
9+
};
810
use async_trait::async_trait;
911

1012
pub struct InetSocketConnection {
@@ -33,8 +35,7 @@ impl BaseConnection for InetSocketConnection {
3335
if let Err(err) = result {
3436
return Err(Error::Internal(format!("{}", err)));
3537
}
36-
let client = result.unwrap();
37-
self.client = Some(client);
38+
self.client = Some(result.unwrap());
3839

3940
self.connection_setup = true;
4041
Ok(())
@@ -69,15 +70,14 @@ impl BaseConnection for InetSocketConnection {
6970
let client = self.client.as_mut().unwrap();
7071
let mut buffer = Vec::new();
7172
let mut read_size = READ_BUFFER_SIZE;
72-
7373
while read_size > 0 {
7474
let mut buf = [0u8; READ_BUFFER_SIZE];
75-
let result = io::timeout(Duration::from_millis(10), client.read(&mut buf)).await;
75+
let result = client.read(&mut buf).await;
7676
if result.is_err() {
77-
return Some(buffer);
77+
return None;
7878
}
7979
read_size = result.unwrap();
80-
buffer.extend(buf[..read_size].into_iter());
80+
buffer.extend(buf[..read_size].iter());
8181
}
8282
Some(buffer)
8383
}

src/connection/unix_socket_connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl BaseConnection for UnixSocketConnection {
7777
return Some(buffer);
7878
}
7979
read_size = result.unwrap();
80-
buffer.extend(buf[..read_size].into_iter());
80+
buffer.extend(buf[..read_size].iter());
8181
}
8282
Some(buffer)
8383
}

src/juno_module_impl.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@ use async_std::sync::{Mutex, RwLock};
99
use futures::channel::oneshot::Sender;
1010
use std::collections::HashMap;
1111

12+
type HookListeners = RwLock<HashMap<String, Vec<fn(Value)>>>;
13+
type Functions = RwLock<HashMap<String, fn(HashMap<String, Value>) -> Value>>;
14+
1215
// Create separate rwlocks for each individual element
1316
// Such that each one of them can be individually read or written independent of the other
1417
pub(crate) struct JunoModuleImpl {
1518
pub(crate) protocol: RwLock<BaseProtocol>,
1619
pub(crate) connection: RwLock<Box<dyn BaseConnection + Send + Sync>>,
1720
pub(crate) requests: RwLock<HashMap<String, Sender<Result<Value>>>>,
18-
pub(crate) functions: RwLock<HashMap<String, fn(HashMap<String, Value>) -> Value>>,
19-
pub(crate) hook_listeners: RwLock<HashMap<String, Vec<fn(Value)>>>,
21+
pub(crate) functions: Functions,
22+
pub(crate) hook_listeners: HookListeners,
2023
pub(crate) message_buffer: Mutex<Buffer>,
2124
pub(crate) registered: RwLock<bool>,
2225
}

src/protocol/base_protocol.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,11 @@ impl BaseProtocol {
126126
_ => panic!("Currently, only JsonProtocol is supported"),
127127
}
128128
}
129+
130+
pub fn decode(&self, data: &[u8]) -> BaseMessage {
131+
match self {
132+
BaseProtocol::JsonProtocol { .. } => json_protocol::decode(&self, data),
133+
_ => panic!("Currently, only JsonProtocol is supported"),
134+
}
135+
}
129136
}

src/protocol/json_protocol.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ pub fn get_next_message(protocol: &mut BaseProtocol) -> Option<BaseMessage> {
164164
}
165165
}
166166

167+
pub fn decode(protocol: &BaseProtocol, data: &[u8]) -> BaseMessage {
168+
match protocol {
169+
BaseProtocol::JsonProtocol { .. } => match decode_internal(data) {
170+
Some(message) => message,
171+
None => BaseMessage::Unknown {
172+
request_id: String::default(),
173+
},
174+
},
175+
_ => panic!("BaseProtocol tried to decode a non-JsonProtocol as a JsonProtocol"),
176+
}
177+
}
178+
167179
fn decode_internal(data: &[u8]) -> Option<BaseMessage> {
168180
let result: Result<Value> = from_slice(data);
169181
if result.is_err() {

0 commit comments

Comments
 (0)