Skip to content
This repository was archived by the owner on Apr 11, 2025. It is now read-only.

Commit 579d3ce

Browse files
iraizox86pup
authored andcommitted
replace tokio channels with loole (#256)
* rewrite admin handler to use loole channels * apply correct formatting * move all other services to loole channels * fix ci
1 parent c82c548 commit 579d3ce

File tree

5 files changed

+110
-84
lines changed

5 files changed

+110
-84
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ cyborgtime = "2.1.1"
7070
bytes = "1.6.0"
7171
http = "0.2.12"
7272

73+
# used to replace the channels of the tokio runtime
74+
loole = "0.3.0"
75+
7376
# standard date and time tools
7477
[dependencies.chrono]
7578
version = "0.4.37"

src/service/admin/mod.rs

Lines changed: 66 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ruma::{
2323
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
2424
};
2525
use serde_json::value::to_raw_value;
26-
use tokio::sync::{mpsc, Mutex};
26+
use tokio::sync::Mutex;
2727
use tracing::{error, warn};
2828

2929
use super::pdu::PduBuilder;
@@ -91,13 +91,13 @@ pub enum AdminRoomEvent {
9191
}
9292

9393
pub struct Service {
94-
pub sender: mpsc::UnboundedSender<AdminRoomEvent>,
95-
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
94+
pub sender: loole::Sender<AdminRoomEvent>,
95+
receiver: Mutex<loole::Receiver<AdminRoomEvent>>,
9696
}
9797

9898
impl Service {
9999
pub fn build() -> Arc<Self> {
100-
let (sender, receiver) = mpsc::unbounded_channel();
100+
let (sender, receiver) = loole::unbounded();
101101
Arc::new(Self {
102102
sender,
103103
receiver: Mutex::new(receiver),
@@ -115,7 +115,7 @@ impl Service {
115115
}
116116

117117
async fn handler(&self) -> Result<()> {
118-
let mut receiver = self.receiver.lock().await;
118+
let receiver = self.receiver.lock().await;
119119
// TODO: Use futures when we have long admin commands
120120
//let mut futures = FuturesUnordered::new();
121121

@@ -125,63 +125,72 @@ impl Service {
125125
if let Ok(Some(conduit_room)) = Self::get_admin_room() {
126126
loop {
127127
tokio::select! {
128-
Some(event) = receiver.recv() => {
129-
let (mut message_content, reply) = match event {
130-
AdminRoomEvent::SendMessage(content) => (content, None),
131-
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
132-
(self.process_admin_message(room_message).await, Some(reply_id))
128+
event = receiver.recv_async() => {
129+
match event {
130+
Ok(event) => {
131+
let (mut message_content, reply) = match event {
132+
AdminRoomEvent::SendMessage(content) => (content, None),
133+
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
134+
(self.process_admin_message(room_message).await, Some(reply_id))
135+
}
136+
};
137+
138+
let mutex_state = Arc::clone(
139+
services().globals
140+
.roomid_mutex_state
141+
.write()
142+
.await
143+
.entry(conduit_room.clone())
144+
.or_default(),
145+
);
146+
147+
let state_lock = mutex_state.lock().await;
148+
149+
if let Some(reply) = reply {
150+
message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } });
151+
}
152+
153+
if let Err(e) = services().rooms.timeline.build_and_append_pdu(
154+
PduBuilder {
155+
event_type: TimelineEventType::RoomMessage,
156+
content: to_raw_value(&message_content)
157+
.expect("event is valid, we just created it"),
158+
unsigned: None,
159+
state_key: None,
160+
redacts: None,
161+
},
162+
&conduit_user,
163+
&conduit_room,
164+
&state_lock)
165+
.await {
166+
error!("Failed to build and append admin room response PDU: \"{e}\"");
167+
168+
let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output."));
169+
170+
services().rooms.timeline.build_and_append_pdu(
171+
PduBuilder {
172+
event_type: TimelineEventType::RoomMessage,
173+
content: to_raw_value(&error_room_message)
174+
.expect("event is valid, we just created it"),
175+
unsigned: None,
176+
state_key: None,
177+
redacts: None,
178+
},
179+
&conduit_user,
180+
&conduit_room,
181+
&state_lock)
182+
.await?;
183+
}
184+
drop(state_lock);
133185
}
134-
};
186+
Err(_) => {
187+
// TODO: Handle error, Im too unfamiliar with the codebase to know what to do here
135188

136-
let mutex_state = Arc::clone(
137-
services().globals
138-
.roomid_mutex_state
139-
.write()
140-
.await
141-
.entry(conduit_room.clone())
142-
.or_default(),
143-
);
144189

145-
let state_lock = mutex_state.lock().await;
190+
// recv_async returns an error if all senders have been dropped. If the channel is empty, the returned future will yield to the async runtime.
146191

147-
if let Some(reply) = reply {
148-
message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } });
192+
}
149193
}
150-
151-
if let Err(e) = services().rooms.timeline.build_and_append_pdu(
152-
PduBuilder {
153-
event_type: TimelineEventType::RoomMessage,
154-
content: to_raw_value(&message_content)
155-
.expect("event is valid, we just created it"),
156-
unsigned: None,
157-
state_key: None,
158-
redacts: None,
159-
},
160-
&conduit_user,
161-
&conduit_room,
162-
&state_lock)
163-
.await {
164-
error!("Failed to build and append admin room response PDU: \"{e}\"");
165-
166-
let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output."));
167-
168-
services().rooms.timeline.build_and_append_pdu(
169-
PduBuilder {
170-
event_type: TimelineEventType::RoomMessage,
171-
content: to_raw_value(&error_room_message)
172-
.expect("event is valid, we just created it"),
173-
unsigned: None,
174-
state_key: None,
175-
redacts: None,
176-
},
177-
&conduit_user,
178-
&conduit_room,
179-
&state_lock)
180-
.await?;
181-
}
182-
183-
184-
drop(state_lock);
185194
}
186195
}
187196
}

src/service/presence/mod.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use ruma::{
1010
OwnedUserId, UInt, UserId,
1111
};
1212
use serde::{Deserialize, Serialize};
13-
use tokio::{
14-
sync::{mpsc, Mutex},
15-
time::sleep,
16-
};
13+
use tokio::{sync::Mutex, time::sleep};
1714
use tracing::{debug, error};
1815

1916
use crate::{services, utils, Config, Error, Result};
@@ -71,14 +68,14 @@ impl Presence {
7168

7269
pub struct Service {
7370
pub db: &'static dyn Data,
74-
pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>,
75-
timer_receiver: Mutex<mpsc::UnboundedReceiver<(OwnedUserId, Duration)>>,
71+
pub timer_sender: loole::Sender<(OwnedUserId, Duration)>,
72+
timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
7673
timeout_remote_users: bool,
7774
}
7875

7976
impl Service {
8077
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
81-
let (timer_sender, timer_receiver) = mpsc::unbounded_channel();
78+
let (timer_sender, timer_receiver) = loole::unbounded();
8279

8380
Arc::new(Self {
8481
db,
@@ -173,12 +170,21 @@ impl Service {
173170

174171
async fn handler(&self) -> Result<()> {
175172
let mut presence_timers = FuturesUnordered::new();
176-
let mut receiver = self.timer_receiver.lock().await;
173+
let receiver = self.timer_receiver.lock().await;
177174
loop {
178175
tokio::select! {
179-
Some((user_id, timeout)) = receiver.recv() => {
180-
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
181-
presence_timers.push(presence_timer(user_id, timeout));
176+
event = receiver.recv_async() => {
177+
178+
match event {
179+
Ok((user_id, timeout)) => {
180+
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
181+
presence_timers.push(presence_timer(user_id, timeout));
182+
}
183+
Err(e) => {
184+
// TODO: Handle error better? I have no idea what to do here.
185+
error!("Failed to receive presence timer: {}", e);
186+
}
187+
}
182188
}
183189

184190
Some(user_id) = presence_timers.next() => {

src/service/sending/mod.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ use ruma::{
2525
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
2626
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
2727
};
28-
use tokio::{
29-
select,
30-
sync::{mpsc, Mutex, Semaphore},
31-
};
28+
use tokio::sync::{Mutex, Semaphore};
3229
use tracing::{error, warn};
3330

3431
use crate::{services, utils::calculate_hash, Config, Error, PduEvent, Result};
@@ -43,8 +40,8 @@ pub struct Service {
4340

4441
/// The state for a given state hash.
4542
pub(super) maximum_requests: Arc<Semaphore>,
46-
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
47-
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
43+
pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec<u8>)>,
44+
receiver: Mutex<loole::Receiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
4845
startup_netburst: bool,
4946
startup_netburst_keep: i64,
5047
timeout: u64,
@@ -73,7 +70,7 @@ enum TransactionStatus {
7370

7471
impl Service {
7572
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
76-
let (sender, receiver) = mpsc::unbounded_channel();
73+
let (sender, receiver) = loole::unbounded();
7774
Arc::new(Self {
7875
db,
7976
sender,
@@ -275,7 +272,7 @@ impl Service {
275272

276273
#[tracing::instrument(skip(self), name = "sender")]
277274
async fn handler(&self) -> Result<()> {
278-
let mut receiver = self.receiver.lock().await;
275+
let receiver = self.receiver.lock().await;
279276

280277
let mut futures = FuturesUnordered::new();
281278
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
@@ -306,7 +303,7 @@ impl Service {
306303
}
307304

308305
loop {
309-
select! {
306+
tokio::select! {
310307
Some(response) = futures.next() => {
311308
match response {
312309
Ok(outgoing_kind) => {
@@ -343,13 +340,17 @@ impl Service {
343340
}
344341
};
345342
},
346-
Some((outgoing_kind, event, key)) = receiver.recv() => {
347-
if let Ok(Some(events)) = self.select_events(
348-
&outgoing_kind,
349-
vec![(event, key)],
350-
&mut current_transaction_status,
351-
) {
352-
futures.push(handle_events(outgoing_kind, events));
343+
344+
event = receiver.recv_async() => {
345+
// TODO: Error handling for this
346+
if let Ok((outgoing_kind, event, key)) = event {
347+
if let Ok(Some(events)) = self.select_events(
348+
&outgoing_kind,
349+
vec![(event, key)],
350+
&mut current_transaction_status,
351+
) {
352+
futures.push(handle_events(outgoing_kind, events));
353+
}
353354
}
354355
}
355356
}

0 commit comments

Comments
 (0)