Skip to content

Commit 305c2ad

Browse files
committed
update explorer task to bounded channel
1 parent 63b4be3 commit 305c2ad

File tree

6 files changed

+61
-67
lines changed

6 files changed

+61
-67
lines changed

src/app/mod.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use console::style;
33
use crossterm::event::{KeyCode, KeyModifiers};
44
use ratatui::prelude::Rect;
55
use serde::{Deserialize, Serialize};
6-
use tokio::sync::mpsc::unbounded_channel;
6+
use tokio::sync::mpsc;
77

88
use crate::{
99
app::{actions::Action, config::AppConfig},
@@ -125,10 +125,7 @@ impl App {
125125
pub async fn run(&mut self) -> Result<()> {
126126
// We are going introduce a new mpsc::unbounded_channel to communicate between the different app components.
127127
// The advantage of this is that we can programmatically trigger updates to the state of the app by sending Actions on the channel.
128-
let (component_tx, mut component_rx) = unbounded_channel::<Action>();
129-
130-
// Use a separate channel to communicate with the Explorer background task
131-
let (explorer_tx, explorer_rx) = unbounded_channel::<Action>();
128+
let (component_tx, mut component_rx) = mpsc::unbounded_channel::<Action>();
132129

133130
// build the TUI
134131
let mut tui = tui::Tui::new()?
@@ -153,19 +150,19 @@ impl App {
153150
component.register_component_action_sender(component_tx.clone())?;
154151
}
155152

156-
// Register the Explorer-Action-Sender for each app component that needs it
157-
for component in self.components.iter_mut() {
158-
component.register_explorer_action_sender(explorer_tx.clone())?;
159-
}
160-
161153
// Register the config handler for each component
162154
for component in self.components.iter_mut() {
163155
component.register_config_handler(self.config.clone())?;
164156
}
165157

166158
// Init and run the explorer background task
167-
let mut explorer_task = ExplorerTask::new(component_tx.clone());
168-
explorer_task.run(explorer_rx);
159+
let mut explorer_task = ExplorerTask::new();
160+
let explorer_sender = explorer_task.run(component_tx.clone());
161+
162+
// Register the Explorer-Action-Sender for each app component that needs it
163+
for component in self.components.iter_mut() {
164+
component.register_explorer_action_sender(explorer_sender.clone())?;
165+
}
169166

170167
// This is the Application main loop
171168
loop {
@@ -190,10 +187,6 @@ impl App {
190187
KeyCode::Char('q') if key_event.modifiers == KeyModifiers::CONTROL => {
191188
component_tx.send(Action::Quit)?
192189
}
193-
// KeyCode::Char('p') => panic!("Testing the panic handler"),
194-
// KeyCode::Char('e') => component_tx.send(Action::Error(
195-
// "Testing application error".to_string(),
196-
// ))?,
197190
_ => component_tx.send(Action::None)?,
198191
},
199192
tui::Event::Resize(w, h) => component_tx.send(Action::Resize(w, h))?,
@@ -221,7 +214,7 @@ impl App {
221214
let _ = component.render(f, f.area());
222215
}
223216
})
224-
.with_context(|| "Failed to draw UI on screen while resizing")?;
217+
.with_context(|| "Failed to render UI on screen while resizing")?;
225218
}
226219
Action::Error(err) => {
227220
return Err(anyhow::anyhow!(format!(
@@ -241,8 +234,8 @@ impl App {
241234
}
242235

243236
if self.should_quit {
244-
explorer_task.stop();
245237
tui.stop();
238+
explorer_task.stop();
246239
break;
247240
}
248241
}

src/component.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub trait Component {
3535
#[allow(unused_variables)]
3636
fn register_explorer_action_sender(
3737
&mut self,
38-
tx: tokio::sync::mpsc::UnboundedSender<Action>,
38+
tx: tokio::sync::mpsc::Sender<Action>,
3939
) -> Result<()> {
4040
Ok(())
4141
}

src/file_handling/mod.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ use anyhow::Result;
22
use serde::{Deserialize, Serialize};
33
use std::path::PathBuf;
44

5-
use tokio::{
6-
sync::mpsc::{UnboundedReceiver, UnboundedSender},
7-
task::JoinHandle,
8-
};
5+
use tokio::{sync::mpsc, task::JoinHandle};
96
use tokio_util::sync::CancellationToken;
107
use walkdir::WalkDir;
118

@@ -80,62 +77,57 @@ impl DiskEntry {
8077
pub struct ExplorerTask {
8178
task: JoinHandle<()>,
8279
cancellation_token: CancellationToken,
83-
/// This sender is used to send actions back to the main thread
84-
action_sender: UnboundedSender<Action>,
8580
is_forced_shutdown: bool,
8681
}
8782

8883
impl ExplorerTask {
8984
/// Constructs a new instance of [`ExplorerTask`].
90-
pub fn new(tx: UnboundedSender<Action>) -> Self {
85+
pub fn new() -> Self {
9186
let cancellation_token = CancellationToken::new();
9287
let task = tokio::spawn(async {
9388
std::future::pending::<()>().await;
9489
});
90+
9591
Self {
9692
task,
9793
cancellation_token,
98-
action_sender: tx,
9994
is_forced_shutdown: false,
10095
}
10196
}
10297

10398
/// Runs the explorer task
104-
pub fn run(&mut self, rx: UnboundedReceiver<Action>) {
105-
let tx = self.action_sender.clone();
106-
let mut rx = rx;
107-
99+
pub fn run(&mut self, action_sender: mpsc::UnboundedSender<Action>) -> mpsc::Sender<Action> {
100+
let (explorer_sender, mut explorer_receiver) = mpsc::channel::<Action>(10);
108101
self.cancel();
109102
self.cancellation_token = CancellationToken::new();
110103
let _cancellation_token = self.cancellation_token.clone();
111-
112104
self.task = tokio::task::spawn(async move {
113105
loop {
114106
tokio::select! {
115107
_ = _cancellation_token.cancelled() => {
116108
break;
117109
}
118-
Some(action) = rx.recv() => {
110+
Some(action) = explorer_receiver.recv() => {
119111
match action {
120112
Action::LoadDir(p, follow_sym_links) => {
121-
tx.send(Action::UpdateAppState(AppState::Working("Loading directory...".into())))
113+
action_sender.send(Action::UpdateAppState(AppState::Working("Loading directory...".into())))
122114
.expect("Explorer: Unable to send 'Action::UpdateExplorerState'");
123115
let explorer = Explorer::load_directory(p, follow_sym_links);
124-
tx.send(Action::LoadDirDone(explorer)).expect("Explorer: Unable to send 'Action::LoadDirDone'");
116+
action_sender.send(Action::LoadDirDone(explorer)).expect("Explorer: Unable to send 'Action::LoadDirDone'");
125117
}
126118
Action::LoadDirMetadata(dir_name, path, follow_sym_links) => {
127119
// handle result, if it was not possible to send a Action over the channel, we don't want to panic
128120
// in this case, instead we log the error
129-
match Explorer::get_dir_metadata(tx.clone(), dir_name, path, follow_sym_links) {
130-
Ok(dir_metadata) => tx.send(Action::LoadDirMetadataDone(dir_metadata)).expect("Explorer: Unable to send 'Action::LoadDirMetadataDone'"),
121+
match Explorer::get_dir_metadata(action_sender.clone(), dir_name, path, follow_sym_links) {
122+
Ok(dir_metadata) => action_sender.send(Action::LoadDirMetadataDone(dir_metadata)).expect("Explorer: Unable to send 'Action::LoadDirMetadataDone'"),
131123
Err(_) => {
132124
log::error!("Explorer: Unable to send 'Action::UpdateExplorerState' while processing directory metadata. The channel may have been dropped or closed before the sending completed.");
133125
},
134126
}
135127
}
136128
Action::StartSearch(cwd, search_query, depth, follow_sym_links) => {
137-
match Explorer::find_entries_by_name(tx.clone(), cwd, search_query, depth, follow_sym_links) {
138-
Ok(search_result) => tx.send(Action::SearchDone(search_result)).expect("Explorer: Unable to send 'Action::SearchDone'"),
129+
match Explorer::find_entries_by_name(action_sender.clone(), cwd, search_query, depth, follow_sym_links) {
130+
Ok(search_result) => action_sender.send(Action::SearchDone(search_result)).expect("Explorer: Unable to send 'Action::SearchDone'"),
139131
Err(_) => {
140132
log::error!("Explorer: Unable to send 'Action::UpdateExplorerState' while searching for files/folders. The channel may have been dropped or closed before the sending completed.");
141133
},
@@ -147,6 +139,8 @@ impl ExplorerTask {
147139
}
148140
}
149141
});
142+
143+
explorer_sender
150144
}
151145

152146
pub fn cancel(&self) {
@@ -567,7 +561,7 @@ impl Explorer {
567561
}
568562

569563
fn get_dir_metadata(
570-
tx: UnboundedSender<Action>,
564+
tx: mpsc::UnboundedSender<Action>,
571565
dir_name: String,
572566
p: PathBuf,
573567
follow_sym_links: bool,
@@ -651,7 +645,7 @@ impl Explorer {
651645
}
652646

653647
pub fn find_entries_by_name(
654-
tx: UnboundedSender<Action>,
648+
tx: mpsc::UnboundedSender<Action>,
655649
cwd: PathBuf,
656650
search_query: String,
657651
depth: usize,

src/ui/explorer_widget.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct ExplorerWidget {
2929
/// Action sender that can send actions to all other components
3030
action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
3131
/// Associated Explorer operation sender, that can send actions to the [`Explorer`]
32-
explorer_action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
32+
explorer_action_sender: Option<tokio::sync::mpsc::Sender<Action>>,
3333
/// Terminal height used to control the number of items to display on the screen
3434
terminal_height: u16,
3535
/// Page height used to control the PageUp and PageDown operations
@@ -65,10 +65,10 @@ impl ExplorerWidget {
6565
}
6666
/// Helper function to send a [`Action`] to the [`Explorer`]
6767
/// Set the `is_working` flag to true
68-
fn send_explorer_action(&mut self, action: Action) -> Result<()> {
69-
if let Some(handler) = &self.explorer_action_sender {
68+
async fn send_explorer_action(&mut self, action: Action) -> Result<()> {
69+
if let Some(sender) = &self.explorer_action_sender {
7070
self.is_working = true;
71-
handler.send(action)?
71+
sender.send(action).await?;
7272
}
7373
Ok(())
7474
}
@@ -137,7 +137,7 @@ impl Component for ExplorerWidget {
137137

138138
fn register_explorer_action_sender(
139139
&mut self,
140-
tx: tokio::sync::mpsc::UnboundedSender<Action>,
140+
tx: tokio::sync::mpsc::Sender<Action>,
141141
) -> Result<()> {
142142
self.explorer_action_sender = Some(tx);
143143
Ok(())
@@ -232,7 +232,8 @@ impl Component for ExplorerWidget {
232232
self.send_explorer_action(Action::LoadDir(
233233
self.explorer.cwd().clone(),
234234
self.follow_sym_links,
235-
))?;
235+
))
236+
.await?;
236237
} else {
237238
return Ok(Action::UpdateAppState(AppState::Failure(
238239
"The current directory no longer exists".to_string(),
@@ -250,7 +251,8 @@ impl Component for ExplorerWidget {
250251
let new_dir = selected_entry.path.clone();
251252

252253
// send the explorer operation to change the directory
253-
self.send_explorer_action(Action::LoadDir(new_dir, self.follow_sym_links))?;
254+
self.send_explorer_action(Action::LoadDir(new_dir, self.follow_sym_links))
255+
.await?;
254256
} else {
255257
return Ok(Action::UpdateAppState(AppState::Failure(
256258
"The selected directory no longer exists".to_string(),
@@ -269,7 +271,8 @@ impl Component for ExplorerWidget {
269271
self.send_explorer_action(Action::LoadDir(
270272
parent_dir.to_path_buf(),
271273
self.follow_sym_links,
272-
))?;
274+
))
275+
.await?;
273276
}
274277
None => {
275278
self.send_app_action(Action::UpdateAppState(AppState::Failure(
@@ -331,7 +334,8 @@ impl Component for ExplorerWidget {
331334
self.send_explorer_action(Action::LoadDir(
332335
home_dir,
333336
self.follow_sym_links,
334-
))?;
337+
))
338+
.await?;
335339
} else {
336340
self.send_app_action(Action::UpdateAppState(AppState::Done(
337341
"Already in home directory".to_string(),
@@ -382,7 +386,8 @@ impl Component for ExplorerWidget {
382386
selected_entry.name.clone(),
383387
selected_entry.path.clone(),
384388
self.follow_sym_links,
385-
))?;
389+
))
390+
.await?;
386391
}
387392
}
388393

src/ui/result_widget.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ pub struct ResultWidget {
173173
/// Action sender that can send actions to all other components
174174
action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
175175
/// Associated Explorer operation sender, that can send actions to the [`Explorer`]
176-
explorer_action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
176+
explorer_action_sender: Option<tokio::sync::mpsc::Sender<Action>>,
177177
/// Flag to control the available draw area for the [`SearchWidget`]
178178
/// If the [`crate::ui::info_widget::SystemOverview`] is not visible, than use the whole draw area
179179
use_whole_draw_area: bool,
@@ -224,10 +224,10 @@ impl Default for ResultWidget {
224224
impl ResultWidget {
225225
/// Helper function to send a [`Action`] to the [`crate::file_handling::Explorer`]
226226
/// Set the `is_working` flag to true
227-
fn send_explorer_action(&mut self, action: Action) -> Result<()> {
228-
if let Some(handler) = &self.explorer_action_sender {
227+
async fn send_explorer_action(&mut self, action: Action) -> Result<()> {
228+
if let Some(sender) = &self.explorer_action_sender {
229229
self.is_working = true;
230-
handler.send(action)?
230+
sender.send(action).await?;
231231
}
232232
Ok(())
233233
}
@@ -272,7 +272,7 @@ impl Component for ResultWidget {
272272

273273
fn register_explorer_action_sender(
274274
&mut self,
275-
tx: tokio::sync::mpsc::UnboundedSender<Action>,
275+
tx: tokio::sync::mpsc::Sender<Action>,
276276
) -> Result<()> {
277277
self.explorer_action_sender = Some(tx);
278278
Ok(())
@@ -401,7 +401,8 @@ impl Component for ResultWidget {
401401
selected_entry.name.clone(),
402402
selected_entry.path.clone(),
403403
self.follow_sym_links,
404-
))?;
404+
))
405+
.await?;
405406
}
406407
}
407408
// Ctrl + c -> Copy absolute path to clipboard
@@ -472,8 +473,8 @@ impl Component for ResultWidget {
472473
for entry in items {
473474
let json_value = entry.build_as_json();
474475
// Send to writer
475-
if tx_clone.send(json_value).await.is_err() {
476-
println!("Writer task dropped, stopping producer");
476+
if let Err(err) = tx_clone.send(json_value).await {
477+
log::error!("Failed to send JSON value to writer task - Details {:?}", err);
477478
break;
478479
}
479480
}

src/ui/search_widget.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub struct SearchWidget {
4747
/// Action sender that can send actions to all other components
4848
action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
4949
/// Associated Explorer operation sender, that can send actions to the [`Explorer`]
50-
explorer_action_sender: Option<tokio::sync::mpsc::UnboundedSender<Action>>,
50+
explorer_action_sender: Option<tokio::sync::mpsc::Sender<Action>>,
5151
/// Flag to control the available draw area for the [`SearchWidget`]
5252
/// If the [`crate::ui::info_widget::SystemOverview`] is not visible, than use the whole draw area
5353
use_whole_draw_area: bool,
@@ -179,7 +179,7 @@ impl SearchWidget {
179179
self.search_query.clear();
180180
}
181181

182-
fn submit_search(&mut self) -> Result<()> {
182+
async fn submit_search(&mut self) -> Result<()> {
183183
// only if the search query does not yet exist, add it to the history
184184
if !self.history.contains(&self.search_query) {
185185
self.history.push(self.search_query.clone());
@@ -190,16 +190,17 @@ impl SearchWidget {
190190
self.search_query.clone(),
191191
self.mode.depth(),
192192
self.follow_sym_links,
193-
))?;
193+
))
194+
.await?;
194195
Ok(())
195196
}
196197

197198
/// Helper function to send a [`Action`] to the [`crate::file_handling::Explorer`]
198199
/// Set the `is_working` flag to true
199-
fn send_explorer_action(&mut self, action: Action) -> Result<()> {
200-
if let Some(handler) = &self.explorer_action_sender {
200+
async fn send_explorer_action(&mut self, action: Action) -> Result<()> {
201+
if let Some(sender) = &self.explorer_action_sender {
201202
self.is_working = true;
202-
handler.send(action)?
203+
sender.send(action).await?;
203204
}
204205
Ok(())
205206
}
@@ -232,7 +233,7 @@ impl Component for SearchWidget {
232233

233234
fn register_explorer_action_sender(
234235
&mut self,
235-
tx: tokio::sync::mpsc::UnboundedSender<Action>,
236+
tx: tokio::sync::mpsc::Sender<Action>,
236237
) -> Result<()> {
237238
self.explorer_action_sender = Some(tx);
238239
Ok(())
@@ -273,7 +274,7 @@ impl Component for SearchWidget {
273274
// Submit search
274275
crossterm::event::KeyCode::Enter => {
275276
if !self.search_query.trim().is_empty() {
276-
self.submit_search()?;
277+
self.submit_search().await?;
277278
} else {
278279
return Ok(Action::UpdateAppState(AppState::Failure(
279280
"Search query must not be empty".to_string(),

0 commit comments

Comments
 (0)