Skip to content

Commit 007fd70

Browse files
Check fetch condition twice (#1298)
Once before, and once after locking the semaphone Change: check-twice
1 parent af42cf4 commit 007fd70

File tree

1 file changed

+78
-40
lines changed

1 file changed

+78
-40
lines changed

josh-proxy/src/bin/josh-proxy.rs

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -102,46 +102,16 @@ impl std::fmt::Debug for JoshProxyService {
102102
}
103103
}
104104

105-
#[tracing::instrument]
106-
async fn fetch_upstream(
105+
fn fetch_needed(
107106
service: Arc<JoshProxyService>,
108-
upstream_repo: String,
109-
remote_auth: &RemoteAuth,
110-
remote_url: String,
107+
remote_url: &String,
108+
upstream_repo: &String,
109+
force: bool,
111110
head_ref: Option<&str>,
112111
head_ref_resolved: Option<&str>,
113-
force: bool,
114-
) -> Result<(), FetchError> {
115-
let key = remote_url.clone();
116-
117-
let refs_to_fetch = match head_ref {
118-
Some(head_ref) if head_ref != "HEAD" && !head_ref.starts_with("refs/heads/") => {
119-
vec![
120-
"HEAD*",
121-
"refs/josh/*",
122-
"refs/heads/*",
123-
"refs/tags/*",
124-
head_ref,
125-
]
126-
}
127-
_ => {
128-
vec!["HEAD*", "refs/josh/*", "refs/heads/*", "refs/tags/*"]
129-
}
130-
};
131-
132-
let refs_to_fetch: Vec<_> = refs_to_fetch.iter().map(|x| x.to_string()).collect();
133-
134-
let us = upstream_repo.clone();
135-
let semaphore = service
136-
.fetch_permits
137-
.lock()?
138-
.entry(us.clone())
139-
.or_insert(Arc::new(tokio::sync::Semaphore::new(1)))
140-
.clone();
141-
let permit = semaphore.acquire().await;
142-
112+
) -> Result<bool, FetchError> {
143113
let fetch_timer_ok = {
144-
if let Some(last) = service.fetch_timers.read()?.get(&key) {
114+
if let Some(last) = service.fetch_timers.read()?.get(remote_url) {
145115
let since = std::time::Instant::now().duration_since(*last);
146116
let max = std::time::Duration::from_secs(ARGS.cache_duration);
147117

@@ -171,24 +141,90 @@ async fn fetch_upstream(
171141
};
172142

173143
match (force, fetch_timer_ok, head_ref, head_ref_resolved) {
174-
(false, true, None, _) => return Ok(()),
144+
(false, true, None, _) => return Ok(false),
175145
(false, true, Some(head_ref), _) => {
176146
if (resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)?).is_some() {
177147
trace!("cache ref resolved");
178-
return Ok(());
148+
return Ok(false);
179149
}
180150
}
181151
(false, false, Some(head_ref), Some(head_ref_resolved)) => {
182152
if let Some(oid) = resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)? {
183153
if oid.to_string() == head_ref_resolved {
184154
trace!("cache ref resolved and matches");
185-
return Ok(());
155+
return Ok(false);
186156
}
187157
}
188158
}
189159
_ => (),
190160
};
191161

162+
return Ok(true);
163+
}
164+
165+
#[tracing::instrument]
166+
async fn fetch_upstream(
167+
service: Arc<JoshProxyService>,
168+
upstream_repo: String,
169+
remote_auth: &RemoteAuth,
170+
remote_url: String,
171+
head_ref: Option<&str>,
172+
head_ref_resolved: Option<&str>,
173+
force: bool,
174+
) -> Result<(), FetchError> {
175+
let refs_to_fetch = match head_ref {
176+
Some(head_ref) if head_ref != "HEAD" && !head_ref.starts_with("refs/heads/") => {
177+
vec![
178+
"HEAD*",
179+
"refs/josh/*",
180+
"refs/heads/*",
181+
"refs/tags/*",
182+
head_ref,
183+
]
184+
}
185+
_ => {
186+
vec!["HEAD*", "refs/josh/*", "refs/heads/*", "refs/tags/*"]
187+
}
188+
};
189+
190+
let refs_to_fetch: Vec<_> = refs_to_fetch.iter().map(|x| x.to_string()).collect();
191+
192+
// Check if we really need to fetch before locking the semaphore. This avoids
193+
// A "no fetch" case waiting for some already running fetch just to do nothing.
194+
if !fetch_needed(
195+
service.clone(),
196+
&remote_url,
197+
&upstream_repo,
198+
force,
199+
head_ref,
200+
head_ref_resolved,
201+
)? {
202+
return Ok(());
203+
}
204+
205+
let us = upstream_repo.clone();
206+
let semaphore = service
207+
.fetch_permits
208+
.lock()?
209+
.entry(us.clone())
210+
.or_insert(Arc::new(tokio::sync::Semaphore::new(1)))
211+
.clone();
212+
let permit = semaphore.acquire().await;
213+
214+
// Check the fetch condition once again after locking the semaphore, as an unknown
215+
// amount of time might have passed and the outcome of this check might have changed
216+
// while waiting.
217+
if !fetch_needed(
218+
service.clone(),
219+
&remote_url,
220+
&upstream_repo,
221+
force,
222+
head_ref,
223+
head_ref_resolved,
224+
)? {
225+
return Ok(());
226+
}
227+
192228
let fetch_timers = service.fetch_timers.clone();
193229
let heads_map = service.heads_map.clone();
194230
let br_path = service.repo_path.join("mirror");
@@ -220,7 +256,9 @@ async fn fetch_upstream(
220256
std::mem::drop(permit);
221257

222258
if fetch_result.is_ok() {
223-
fetch_timers.write()?.insert(key, std::time::Instant::now());
259+
fetch_timers
260+
.write()?
261+
.insert(remote_url.clone(), std::time::Instant::now());
224262
}
225263

226264
match (fetch_result, remote_auth) {

0 commit comments

Comments
 (0)