Skip to content

Commit 39fb65b

Browse files
author
Stephan Dilly
authored
Async fetch (#552)
* async fetch * reuse remote progress for fetch * prvent push/fetch popup from closing too soon
1 parent c96feb0 commit 39fb65b

File tree

15 files changed

+660
-220
lines changed

15 files changed

+660
-220
lines changed

.clippy.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
msrv = "1.50.0"
2-
cognitive-complexity-threshold = 18
2+
cognitive-complexity-threshold = 18
3+
too-many-lines-threshold = 105

asyncgit/src/fetch.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use crate::{
2+
error::{Error, Result},
3+
sync::{
4+
cred::BasicAuthCredential,
5+
remotes::{fetch_origin, push::ProgressNotification},
6+
},
7+
AsyncNotification, RemoteProgress, CWD,
8+
};
9+
use crossbeam_channel::{unbounded, Sender};
10+
use std::{
11+
sync::{Arc, Mutex},
12+
thread,
13+
};
14+
15+
///
16+
#[derive(Default, Clone, Debug)]
17+
pub struct FetchRequest {
18+
///
19+
pub remote: String,
20+
///
21+
pub branch: String,
22+
///
23+
pub basic_credential: Option<BasicAuthCredential>,
24+
}
25+
26+
#[derive(Default, Clone, Debug)]
27+
struct FetchState {
28+
request: FetchRequest,
29+
}
30+
31+
///
32+
pub struct AsyncFetch {
33+
state: Arc<Mutex<Option<FetchState>>>,
34+
last_result: Arc<Mutex<Option<(usize, String)>>>,
35+
progress: Arc<Mutex<Option<ProgressNotification>>>,
36+
sender: Sender<AsyncNotification>,
37+
}
38+
39+
impl AsyncFetch {
40+
///
41+
pub fn new(sender: &Sender<AsyncNotification>) -> Self {
42+
Self {
43+
state: Arc::new(Mutex::new(None)),
44+
last_result: Arc::new(Mutex::new(None)),
45+
progress: Arc::new(Mutex::new(None)),
46+
sender: sender.clone(),
47+
}
48+
}
49+
50+
///
51+
pub fn is_pending(&self) -> Result<bool> {
52+
let state = self.state.lock()?;
53+
Ok(state.is_some())
54+
}
55+
56+
///
57+
pub fn last_result(&self) -> Result<Option<(usize, String)>> {
58+
let res = self.last_result.lock()?;
59+
Ok(res.clone())
60+
}
61+
62+
///
63+
pub fn progress(&self) -> Result<Option<RemoteProgress>> {
64+
let res = self.progress.lock()?;
65+
Ok(res.as_ref().map(|progress| progress.clone().into()))
66+
}
67+
68+
///
69+
pub fn request(&mut self, params: FetchRequest) -> Result<()> {
70+
log::trace!("request");
71+
72+
if self.is_pending()? {
73+
return Ok(());
74+
}
75+
76+
self.set_request(&params)?;
77+
RemoteProgress::set_progress(self.progress.clone(), None)?;
78+
79+
let arc_state = Arc::clone(&self.state);
80+
let arc_res = Arc::clone(&self.last_result);
81+
let arc_progress = Arc::clone(&self.progress);
82+
let sender = self.sender.clone();
83+
84+
thread::spawn(move || {
85+
let (progress_sender, receiver) = unbounded();
86+
87+
let handle = RemoteProgress::spawn_receiver_thread(
88+
sender.clone(),
89+
receiver,
90+
arc_progress,
91+
);
92+
93+
let res = fetch_origin(
94+
CWD,
95+
&params.branch,
96+
params.basic_credential,
97+
Some(progress_sender.clone()),
98+
);
99+
100+
progress_sender
101+
.send(ProgressNotification::Done)
102+
.expect("closing send failed");
103+
104+
handle.join().expect("joining thread failed");
105+
106+
Self::set_result(arc_res, res).expect("result error");
107+
108+
Self::clear_request(arc_state).expect("clear error");
109+
110+
sender
111+
.send(AsyncNotification::Fetch)
112+
.expect("AsyncNotification error");
113+
});
114+
115+
Ok(())
116+
}
117+
118+
fn set_request(&self, params: &FetchRequest) -> Result<()> {
119+
let mut state = self.state.lock()?;
120+
121+
if state.is_some() {
122+
return Err(Error::Generic("pending request".into()));
123+
}
124+
125+
*state = Some(FetchState {
126+
request: params.clone(),
127+
});
128+
129+
Ok(())
130+
}
131+
132+
fn clear_request(
133+
state: Arc<Mutex<Option<FetchState>>>,
134+
) -> Result<()> {
135+
let mut state = state.lock()?;
136+
137+
*state = None;
138+
139+
Ok(())
140+
}
141+
142+
fn set_result(
143+
arc_result: Arc<Mutex<Option<(usize, String)>>>,
144+
res: Result<usize>,
145+
) -> Result<()> {
146+
let mut last_res = arc_result.lock()?;
147+
148+
*last_res = match res {
149+
Ok(bytes) => Some((bytes, String::new())),
150+
Err(e) => {
151+
log::error!("fetch error: {}", e);
152+
Some((0, e.to_string()))
153+
}
154+
};
155+
156+
Ok(())
157+
}
158+
}

asyncgit/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ pub mod cached;
1414
mod commit_files;
1515
mod diff;
1616
mod error;
17+
mod fetch;
1718
mod push;
19+
pub mod remote_progress;
1820
mod revlog;
1921
mod status;
2022
pub mod sync;
@@ -23,7 +25,9 @@ mod tags;
2325
pub use crate::{
2426
commit_files::AsyncCommitFiles,
2527
diff::{AsyncDiff, DiffParams, DiffType},
26-
push::{AsyncPush, PushProgress, PushProgressState, PushRequest},
28+
fetch::{AsyncFetch, FetchRequest},
29+
push::{AsyncPush, PushRequest},
30+
remote_progress::{RemoteProgress, RemoteProgressState},
2731
revlog::{AsyncLog, FetchStatus},
2832
status::{AsyncStatus, StatusParams},
2933
sync::{
@@ -54,6 +58,8 @@ pub enum AsyncNotification {
5458
Tags,
5559
///
5660
Push,
61+
///
62+
Fetch,
5763
}
5864

5965
/// current working director `./`

0 commit comments

Comments
 (0)