Skip to content

Commit f27149c

Browse files
authored
Internal refactors (#63)
* run/stop network thread. non-mutable network * ResourceRegister to Registry module * Late removing. 1 test failing. * Poll waker support. All tests passing. Network shared by multithread. Minor integration tests improvements. Thread renamed. * Removed generic event from poll. Added PollWaker * Allow removing from the callback * Fixed an issue where the register was not removed * Simplified register destruction * Fixed brust udp test * Disabled unused warnings * Removed API break (warnings) of Network mutability
1 parent 9003b4a commit f27149c

File tree

11 files changed

+416
-246
lines changed

11 files changed

+416
-246
lines changed

examples/multicast/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ fn main() {
44
let args: Vec<String> = std::env::args().collect();
55
let my_name = match args.get(1) {
66
Some(name) => name,
7-
None => return println!("Please choose a name"),
7+
None => return println!("Please, choose a name"),
88
};
99

1010
let (mut network, mut events) = Network::split();

src/adapters/framed_tcp.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ pub struct RemoteResource {
3232
decoder: RefCell<Decoder>,
3333
}
3434

35-
/// That RefCell<Decoder> can be used with Sync because it is only used in the read_event.
36-
/// This way, we save the cost of a Mutex.
35+
// SAFETY:
36+
// That RefCell<Decoder> can be used with Sync because the decoder is only used in the read_event.
37+
// This way, we save the cost of a Mutex.
3738
unsafe impl Sync for RemoteResource {}
3839

3940
impl From<TcpStream> for RemoteResource {

src/driver.rs

Lines changed: 79 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use crate::endpoint::{Endpoint};
22
use crate::resource_id::{ResourceId, ResourceType};
3-
use crate::poll::{Poll, PollRegister};
3+
use crate::poll::{Poll};
4+
use crate::registry::{ResourceRegistry};
45
use crate::remote_addr::{RemoteAddr};
5-
use crate::adapter::{Resource, Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus};
6-
use crate::util::{OTHER_THREAD_ERR};
6+
use crate::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus};
77

8-
use std::collections::{HashMap};
98
use std::net::{SocketAddr};
10-
use std::sync::{Arc, RwLock};
9+
use std::sync::{Arc};
1110
use std::io::{self};
1211

1312
/// Struct used to identify and event that an adapter has been produced.
@@ -26,53 +25,20 @@ pub enum AdapterEvent<'a> {
2625
Removed(Endpoint),
2726
}
2827

29-
pub struct ResourceRegister<S> {
30-
// We store the most significant addr of the resource because if the resource disconnects,
31-
// it can not be retrieved.
32-
// If the resource is a remote resource, the addr will be the peer addr.
33-
// If the resource is a local resource, the addr will be the local addr.
34-
resources: RwLock<HashMap<ResourceId, (S, SocketAddr)>>,
35-
poll_register: PollRegister,
28+
pub trait ActionController: Send + Sync {
29+
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>;
30+
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>;
31+
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
32+
fn remove(&self, id: ResourceId) -> bool;
3633
}
3734

38-
impl<S: Resource> ResourceRegister<S> {
39-
pub fn new(poll_register: PollRegister) -> ResourceRegister<S> {
40-
ResourceRegister { resources: RwLock::new(HashMap::new()), poll_register }
41-
}
42-
43-
pub fn add(&self, mut resource: S, addr: SocketAddr) -> ResourceId {
44-
let id = self.poll_register.add(resource.source());
45-
self.resources.write().expect(OTHER_THREAD_ERR).insert(id, (resource, addr));
46-
id
47-
}
48-
49-
pub fn remove(&self, id: ResourceId) -> Option<(S, SocketAddr)> {
50-
let poll_register = &self.poll_register;
51-
self.resources.write().expect(OTHER_THREAD_ERR).remove(&id).map(|(mut resource, addr)| {
52-
poll_register.remove(resource.source());
53-
(resource, addr)
54-
})
55-
}
56-
57-
pub fn resources(&self) -> &RwLock<HashMap<ResourceId, (S, SocketAddr)>> {
58-
&self.resources
59-
}
60-
}
61-
62-
pub trait ActionController {
63-
fn connect(&mut self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>;
64-
fn listen(&mut self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>;
65-
fn send(&mut self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
66-
fn remove(&mut self, id: ResourceId) -> bool;
67-
}
68-
69-
pub trait EventProcessor {
70-
fn try_process(&mut self, id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>));
35+
pub trait EventProcessor: Send + Sync {
36+
fn process(&self, resource_id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>));
7137
}
7238

7339
pub struct Driver<R: Remote, L: Local> {
74-
remote_register: Arc<ResourceRegister<R>>,
75-
local_register: Arc<ResourceRegister<L>>,
40+
remote_registry: Arc<ResourceRegistry<R>>,
41+
local_registry: Arc<ResourceRegistry<L>>,
7642
}
7743

7844
impl<R: Remote, L: Local> Driver<R, L> {
@@ -81,47 +47,48 @@ impl<R: Remote, L: Local> Driver<R, L> {
8147
adapter_id: u8,
8248
poll: &mut Poll,
8349
) -> Driver<R, L> {
84-
let remote_poll_register = poll.create_register(adapter_id, ResourceType::Remote);
85-
let local_poll_register = poll.create_register(adapter_id, ResourceType::Local);
50+
let remote_poll_registry = poll.create_registry(adapter_id, ResourceType::Remote);
51+
let local_poll_registry = poll.create_registry(adapter_id, ResourceType::Local);
8652

8753
Driver {
88-
remote_register: Arc::new(ResourceRegister::<R>::new(remote_poll_register)),
89-
local_register: Arc::new(ResourceRegister::<L>::new(local_poll_register)),
54+
remote_registry: Arc::new(ResourceRegistry::<R>::new(remote_poll_registry)),
55+
local_registry: Arc::new(ResourceRegistry::<L>::new(local_poll_registry)),
9056
}
9157
}
9258
}
9359

9460
impl<R: Remote, L: Local> Clone for Driver<R, L> {
9561
fn clone(&self) -> Driver<R, L> {
9662
Driver {
97-
remote_register: self.remote_register.clone(),
98-
local_register: self.local_register.clone(),
63+
remote_registry: self.remote_registry.clone(),
64+
local_registry: self.local_registry.clone(),
9965
}
10066
}
10167
}
10268

10369
impl<R: Remote, L: Local> ActionController for Driver<R, L> {
104-
fn connect(&mut self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
105-
let remotes = &mut self.remote_register;
70+
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
10671
R::connect(addr).map(|info| {
10772
(
108-
Endpoint::new(remotes.add(info.remote, info.peer_addr), info.peer_addr),
73+
Endpoint::new(
74+
self.remote_registry.add(info.remote, info.peer_addr),
75+
info.peer_addr,
76+
),
10977
info.local_addr,
11078
)
11179
})
11280
}
11381

114-
fn listen(&mut self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
115-
let locals = &mut self.local_register;
116-
L::listen(addr).map(|info| (locals.add(info.local, info.local_addr), info.local_addr))
82+
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
83+
L::listen(addr)
84+
.map(|info| (self.local_registry.add(info.local, info.local_addr), info.local_addr))
11785
}
11886

119-
fn send(&mut self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
87+
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
12088
match endpoint.resource_id().resource_type() {
12189
ResourceType::Remote => {
122-
let remotes = self.remote_register.resources().read().expect(OTHER_THREAD_ERR);
123-
match remotes.get(&endpoint.resource_id()) {
124-
Some((resource, _)) => resource.send(data),
90+
match self.remote_registry.get(endpoint.resource_id()) {
91+
Some(remote) => remote.resource.send(data),
12592

12693
// TODO: currently there is not a safe way to know if this is
12794
// reached because of a user API error (send over already resource removed)
@@ -131,76 +98,71 @@ impl<R: Remote, L: Local> ActionController for Driver<R, L> {
13198
None => SendStatus::ResourceNotFound,
13299
}
133100
}
134-
ResourceType::Local => {
135-
let locals = self.local_register.resources().read().expect(OTHER_THREAD_ERR);
136-
match locals.get(&endpoint.resource_id()) {
137-
Some((resource, _)) => resource.send_to(endpoint.addr(), data),
138-
None => {
139-
panic!(
140-
"Error: You are trying to send by a local resource \
141-
that does not exists"
142-
)
143-
}
101+
ResourceType::Local => match self.local_registry.get(endpoint.resource_id()) {
102+
Some(remote) => remote.resource.send_to(endpoint.addr(), data),
103+
None => {
104+
panic!(
105+
"Error: You are trying to send by a local resource \
106+
that does not exists"
107+
)
144108
}
145-
}
109+
},
146110
}
147111
}
148112

149-
fn remove(&mut self, id: ResourceId) -> bool {
113+
fn remove(&self, id: ResourceId) -> bool {
150114
match id.resource_type() {
151-
ResourceType::Remote => self.remote_register.remove(id).map(|_| ()).is_some(),
152-
ResourceType::Local => self.local_register.remove(id).map(|_| ()).is_some(),
115+
ResourceType::Remote => self.remote_registry.remove(id),
116+
ResourceType::Local => self.local_registry.remove(id),
153117
}
154118
}
155119
}
156120

157121
impl<R: Remote, L: Local<Remote = R>> EventProcessor for Driver<R, L> {
158-
fn try_process(&mut self, id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>)) {
122+
fn process(&self, id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>)) {
159123
match id.resource_type() {
160-
ResourceType::Remote => {
161-
let remotes = self.remote_register.resources().read().expect(OTHER_THREAD_ERR);
162-
let mut to_remove: Option<Endpoint> = None;
163-
if let Some((remote, addr)) = remotes.get(&id) {
164-
let endpoint = Endpoint::new(id, *addr);
165-
let status = remote.receive(&|data| {
166-
log::trace!("Read {} bytes from {}", data.len(), id);
167-
event_callback(AdapterEvent::Data(endpoint, data));
168-
});
169-
log::trace!("Processed receive {}, for {}", status, endpoint);
170-
if let ReadStatus::Disconnected = status {
171-
to_remove = Some(endpoint);
172-
}
173-
}
174-
175-
drop(remotes);
124+
ResourceType::Remote => self.process_remote(id, event_callback),
125+
ResourceType::Local => self.process_local(id, event_callback),
126+
}
127+
}
128+
}
176129

177-
if let Some(endpoint) = to_remove {
178-
self.remote_register.remove(id);
130+
impl<R: Remote, L: Local<Remote = R>> Driver<R, L> {
131+
fn process_remote(&self, id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>)) {
132+
if let Some(remote) = self.remote_registry.get(id) {
133+
let endpoint = Endpoint::new(id, remote.addr);
134+
log::trace!("Processed remote for {}", endpoint);
135+
let status = remote.resource.receive(&|data| {
136+
event_callback(AdapterEvent::Data(endpoint, data));
137+
});
138+
log::trace!("Processed remote receive status {}", status);
139+
140+
if let ReadStatus::Disconnected = status {
141+
// Checked becasue, the user in the callback could have removed the same resource.
142+
if self.remote_registry.remove(id) {
179143
event_callback(AdapterEvent::Removed(endpoint));
180144
}
181145
}
182-
ResourceType::Local => {
183-
let locals = self.local_register.resources().read().expect(OTHER_THREAD_ERR);
184-
185-
let remotes = &mut self.remote_register;
186-
187-
if let Some((local, _)) = locals.get(&id) {
188-
local.accept(&|accepted| {
189-
log::trace!("Processed accept {} for {}", accepted, id);
190-
match accepted {
191-
AcceptedType::Remote(addr, remote) => {
192-
let remote_id = remotes.add(remote, addr);
193-
let endpoint = Endpoint::new(remote_id, addr);
194-
event_callback(AdapterEvent::Added(endpoint, id));
195-
}
196-
AcceptedType::Data(addr, data) => {
197-
let endpoint = Endpoint::new(id, addr);
198-
event_callback(AdapterEvent::Data(endpoint, data));
199-
}
200-
}
201-
});
146+
}
147+
}
148+
149+
fn process_local(&self, id: ResourceId, event_callback: &dyn Fn(AdapterEvent<'_>)) {
150+
if let Some(local) = self.local_registry.get(id) {
151+
log::trace!("Processed local for {}", id);
152+
local.resource.accept(&|accepted| {
153+
log::trace!("Processed local accepted type {}", accepted);
154+
match accepted {
155+
AcceptedType::Remote(addr, remote) => {
156+
let remote_id = self.remote_registry.add(remote, addr);
157+
let endpoint = Endpoint::new(remote_id, addr);
158+
event_callback(AdapterEvent::Added(endpoint, id));
159+
}
160+
AcceptedType::Data(addr, data) => {
161+
let endpoint = Endpoint::new(id, addr);
162+
event_callback(AdapterEvent::Data(endpoint, data));
163+
}
202164
}
203-
}
165+
});
204166
}
205167
}
206168
}

src/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ impl Endpoint {
3131

3232
impl std::fmt::Display for Endpoint {
3333
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
34-
write!(f, "{}:{}", self.resource_id, self.addr)
34+
write!(f, "{} {}", self.resource_id, self.addr)
3535
}
3636
}

0 commit comments

Comments
 (0)