Skip to content

Commit 300bf98

Browse files
committed
Added delay to allow protocol locks to be used
1 parent 4229113 commit 300bf98

File tree

4 files changed

+14
-14
lines changed

4 files changed

+14
-14
lines changed

src/connection/inet_socket_connection.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use crate::{
22
connection::{BaseConnection, Buffer},
33
utils::{Error, READ_BUFFER_SIZE},
44
};
5+
use std::time::Duration;
56

67
use async_std::{
8+
io,
79
net::{Shutdown, TcpStream},
810
prelude::*,
911
};
@@ -70,11 +72,12 @@ impl BaseConnection for InetSocketConnection {
7072
let client = self.client.as_mut().unwrap();
7173
let mut buffer = Vec::new();
7274
let mut read_size = READ_BUFFER_SIZE;
75+
7376
while read_size > 0 {
7477
let mut buf = [0u8; READ_BUFFER_SIZE];
75-
let result = client.read(&mut buf).await;
78+
let result = io::timeout(Duration::from_millis(10), client.read(&mut buf)).await;
7679
if result.is_err() {
77-
return None;
80+
return Some(buffer);
7881
}
7982
read_size = result.unwrap();
8083
buffer.extend(buf[..read_size].into_iter());

src/connection/unix_socket_connection.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use crate::{
22
connection::{BaseConnection, Buffer},
33
utils::{Error, READ_BUFFER_SIZE},
44
};
5+
use std::time::Duration;
56

6-
use std::{net::Shutdown, os::unix::net::UnixStream, io::{Write, Read}};
7+
use async_std::{io, net::Shutdown, os::unix::net::UnixStream, prelude::*};
78
use async_trait::async_trait;
89

910
pub struct UnixSocketConnection {
@@ -28,12 +29,11 @@ impl BaseConnection for UnixSocketConnection {
2829
if self.connection_setup {
2930
panic!("Cannot call setup_connection() more than once!");
3031
}
31-
let result = UnixStream::connect(&self.socket_path);
32+
let result = UnixStream::connect(&self.socket_path).await;
3233
if let Err(err) = result {
3334
return Err(Error::Internal(format!("{}", err)));
3435
}
3536
let client = result.unwrap();
36-
client.set_nonblocking(true).unwrap();
3737
self.client = Some(client);
3838

3939
self.connection_setup = true;
@@ -55,7 +55,7 @@ impl BaseConnection for UnixSocketConnection {
5555
if !self.connection_setup || self.client.is_none() {
5656
panic!("Cannot send data to a connection that hasn't been established yet. Did you forget to await the call to setup_connection()?");
5757
}
58-
let result = self.client.as_mut().unwrap().write_all(&buffer);
58+
let result = self.client.as_mut().unwrap().write_all(&buffer).await;
5959
if let Err(err) = result {
6060
return Err(Error::Internal(format!("{}", err)));
6161
}
@@ -72,10 +72,9 @@ impl BaseConnection for UnixSocketConnection {
7272

7373
while read_size > 0 {
7474
let mut buf = [0u8; READ_BUFFER_SIZE];
75-
let result = client.read(&mut buf);
75+
let result = io::timeout(Duration::from_millis(10), client.read(&mut buf)).await;
7676
if result.is_err() {
77-
println!("Error: {}", result.unwrap_err());
78-
return None;
77+
return Some(buffer);
7978
}
8079
read_size = result.unwrap();
8180
buffer.extend(buf[..read_size].into_iter());

src/juno_module.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl JunoModule {
232232
}
233233
}
234234
drop(protocol);
235+
task::sleep(std::time::Duration::from_millis(10)).await;
235236
} else {
236237
break;
237238
}

src/protocol/json_protocol.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,9 @@ pub fn encode(protocol: &BaseProtocol, req: BaseMessage) -> Buffer {
147147
pub fn get_next_message(protocol: &mut BaseProtocol) -> Option<BaseMessage> {
148148
match protocol {
149149
BaseProtocol::JsonProtocol { ref mut buffer, .. } => {
150-
let index = buffer.iter().position(|item| item == &b'\n');
151-
if index.is_none() {
152-
return None;
153-
}
150+
let index = buffer.iter().position(|item| *item == b'\n')?;
154151

155-
let new_buffer = buffer.split_off(index.unwrap());
152+
let new_buffer = buffer.split_off(index + 1);
156153
let message = decode_internal(&buffer);
157154
*buffer = new_buffer;
158155
if message.is_none() {

0 commit comments

Comments
 (0)