Skip to content

Commit 3768e10

Browse files
Fix: reduce async runtime blocking (#1336)
This PR aims to optimize request latency by reducing async runtime blocking * all functions of graphql objects are blocking, and many trigger file i/o. However, graphql requests were being executed via an async function. Use sync version of graphql execution and use tokio::spawn_blocking * use tokio::spawn_blocking for operations involving file i/o * reduce sync mutex lock regions where possible ### Performance * std::sync::Mutex should be very efficient when there's no contention, and none is expected in graphql routines * I've benchmarked the overhead of tokio::spawn_blocking, since it's used more now: on my computer (core i5 macbook) I can spawn and .await around 90k tasks per second, which is at least two orders of magnitude away from the number of requests Josh can currently handle per second. commit-id:bd8098e1
1 parent 7e755c0 commit 3768e10

File tree

1 file changed

+162
-87
lines changed

1 file changed

+162
-87
lines changed

josh-proxy/src/bin/josh-proxy.rs

Lines changed: 162 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use josh::{josh_error, JoshError, JoshResult};
2222
use josh_rpc::calls::RequestedCommand;
2323
use serde::Serialize;
2424
use std::collections::HashMap;
25-
use std::ffi::OsStr;
2625
use std::io;
2726
use std::net::IpAddr;
2827
use std::path::{Path, PathBuf};
@@ -103,17 +102,22 @@ impl std::fmt::Debug for JoshProxyService {
103102
}
104103
}
105104

106-
fn fetch_needed(
105+
async fn fetch_needed(
107106
service: Arc<JoshProxyService>,
108-
remote_url: &String,
109-
upstream_repo: &String,
107+
remote_url: &str,
108+
upstream_repo: &str,
110109
force: bool,
111110
head_ref: Option<&str>,
112111
head_ref_resolved: Option<&str>,
113112
) -> Result<bool, FetchError> {
114113
let fetch_timer_ok = {
115-
if let Some(last) = service.fetch_timers.read()?.get(remote_url) {
116-
let since = std::time::Instant::now().duration_since(*last);
114+
let last = {
115+
let fetch_timers = service.fetch_timers.read()?;
116+
fetch_timers.get(remote_url).cloned()
117+
};
118+
119+
if let Some(last) = last {
120+
let since = std::time::Instant::now().duration_since(last);
117121
let max = std::time::Duration::from_secs(ARGS.cache_duration);
118122

119123
tracing::trace!("last: {:?}, since: {:?}, max: {:?}", last, since, max);
@@ -123,34 +127,46 @@ fn fetch_needed(
123127
}
124128
};
125129

126-
let resolve_cache_ref = |cache_ref: &str| -> JoshResult<Option<git2::Oid>> {
127-
let transaction = josh::cache::Transaction::open(
128-
&service.repo_path.join("mirror"),
129-
Some(&format!(
130-
"refs/josh/upstream/{}/",
131-
&josh::to_ns(&upstream_repo),
132-
)),
133-
)?;
130+
let resolve_cache_ref = |cache_ref: &str| {
131+
let cache_ref = cache_ref.to_string();
132+
let upstream_repo = upstream_repo.to_string();
134133

135-
match transaction
136-
.repo()
137-
.refname_to_id(&transaction.refname(cache_ref))
138-
{
139-
Ok(oid) => Ok(Some(oid)),
140-
Err(_) => Ok(None),
141-
}
134+
tokio::task::spawn_blocking(move || {
135+
let transaction = josh::cache::Transaction::open(
136+
&service.repo_path.join("mirror"),
137+
Some(&format!(
138+
"refs/josh/upstream/{}/",
139+
&josh::to_ns(&upstream_repo),
140+
)),
141+
)?;
142+
143+
match transaction
144+
.repo()
145+
.refname_to_id(&transaction.refname(&cache_ref))
146+
{
147+
Ok(oid) => Ok(Some(oid)),
148+
Err(_) => Ok(None),
149+
}
150+
})
142151
};
143152

144153
match (force, fetch_timer_ok, head_ref, head_ref_resolved) {
145154
(false, true, None, _) => return Ok(false),
146155
(false, true, Some(head_ref), _) => {
147-
if (resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)?).is_some() {
156+
if (resolve_cache_ref(head_ref)
157+
.await?
158+
.map_err(FetchError::from_josh_error)?)
159+
.is_some()
160+
{
148161
trace!("cache ref resolved");
149162
return Ok(false);
150163
}
151164
}
152165
(false, false, Some(head_ref), Some(head_ref_resolved)) => {
153-
if let Some(oid) = resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)? {
166+
if let Some(oid) = resolve_cache_ref(head_ref)
167+
.await?
168+
.map_err(FetchError::from_josh_error)?
169+
{
154170
if oid.to_string() == head_ref_resolved {
155171
trace!("cache ref resolved and matches");
156172
return Ok(false);
@@ -199,15 +215,16 @@ async fn fetch_upstream(
199215
force,
200216
head_ref,
201217
head_ref_resolved,
202-
)? {
218+
)
219+
.await?
220+
{
203221
return Ok(());
204222
}
205223

206-
let us = upstream_repo.clone();
207224
let semaphore = service
208225
.fetch_permits
209226
.lock()?
210-
.entry(us.clone())
227+
.entry(upstream_repo.clone())
211228
.or_insert(Arc::new(tokio::sync::Semaphore::new(1)))
212229
.clone();
213230
let permit = semaphore.acquire().await;
@@ -222,36 +239,52 @@ async fn fetch_upstream(
222239
force,
223240
head_ref,
224241
head_ref_resolved,
225-
)? {
242+
)
243+
.await?
244+
{
226245
return Ok(());
227246
}
228247

248+
let fetch_result = {
249+
let span = tracing::span!(tracing::Level::INFO, "fetch_refs_from_url");
250+
251+
let mirror_path = service.repo_path.join("mirror");
252+
let upstream_repo = upstream_repo.clone();
253+
let remote_url = remote_url.clone();
254+
let remote_auth = remote_auth.clone();
255+
256+
tokio::task::spawn_blocking(move || {
257+
let _span_guard = span.enter();
258+
josh_proxy::fetch_refs_from_url(
259+
&mirror_path,
260+
&upstream_repo,
261+
&remote_url,
262+
&refs_to_fetch,
263+
&remote_auth,
264+
)
265+
})
266+
.await?
267+
};
268+
269+
let hres = {
270+
let span = tracing::span!(tracing::Level::INFO, "get_head");
271+
272+
let mirror_path = service.repo_path.join("mirror");
273+
let remote_url = remote_url.clone();
274+
let remote_auth = remote_auth.clone();
275+
276+
tokio::task::spawn_blocking(move || {
277+
let _span_guard = span.enter();
278+
josh_proxy::get_head(&mirror_path, &remote_url, &remote_auth)
279+
})
280+
.await?
281+
};
282+
229283
let fetch_timers = service.fetch_timers.clone();
230284
let heads_map = service.heads_map.clone();
231-
let br_path = service.repo_path.join("mirror");
232-
233-
let span = tracing::span!(tracing::Level::INFO, "fetch_refs_from_url");
234-
let ru = remote_url.clone();
235-
let task_remote_auth = remote_auth.clone();
236-
let fetch_result = tokio::task::spawn_blocking(move || {
237-
let _span_guard = span.enter();
238-
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &task_remote_auth)
239-
})
240-
.await?;
241-
242-
let us = upstream_repo.clone();
243-
let s = tracing::span!(tracing::Level::INFO, "get_head");
244-
let br_path = service.repo_path.join("mirror");
245-
let ru = remote_url.clone();
246-
let task_remote_auth = remote_auth.clone();
247-
let hres = tokio::task::spawn_blocking(move || {
248-
let _e = s.enter();
249-
josh_proxy::get_head(&br_path, &ru, &task_remote_auth)
250-
})
251-
.await?;
252285

253286
if let Ok(hres) = hres {
254-
heads_map.write()?.insert(us, hres);
287+
heads_map.write()?.insert(upstream_repo.clone(), hres);
255288
}
256289

257290
std::mem::drop(permit);
@@ -1742,6 +1775,7 @@ fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
17421775
}
17431776
}
17441777

1778+
#[tracing::instrument(skip_all)]
17451779
async fn serve_graphql(
17461780
serv: Arc<JoshProxyService>,
17471781
req: Request<hyper::Body>,
@@ -1750,46 +1784,91 @@ async fn serve_graphql(
17501784
auth: josh_proxy::auth::Handle,
17511785
) -> josh::JoshResult<Response<hyper::Body>> {
17521786
let remote_url = upstream.clone() + upstream_repo.as_str();
1753-
let parsed = match josh_proxy::juniper_hyper::parse_req(req).await {
1754-
Ok(r) => r,
1787+
let parsed_request = match josh_proxy::juniper_hyper::parse_req(req).await {
1788+
Ok(parsed_request) => {
1789+
// Even though there's a mutex, it's just to manage access
1790+
// between sync and async code, so no contention is expected
1791+
Arc::new(std::sync::Mutex::new(parsed_request))
1792+
}
17551793
Err(resp) => return Ok(resp),
17561794
};
17571795

1758-
let transaction_mirror = josh::cache::Transaction::open(
1759-
&serv.repo_path.join("mirror"),
1760-
Some(&format!(
1761-
"refs/josh/upstream/{}/",
1762-
&josh::to_ns(&upstream_repo),
1763-
)),
1764-
)?;
1765-
let transaction = josh::cache::Transaction::open(&serv.repo_path.join("overlay"), None)?;
1766-
transaction.repo().odb()?.add_disk_alternate(
1767-
serv.repo_path
1768-
.join("mirror")
1769-
.join("objects")
1770-
.to_str()
1771-
.unwrap(),
1772-
)?;
1773-
let context = std::sync::Arc::new(josh::graphql::context(transaction, transaction_mirror));
1774-
let root_node = std::sync::Arc::new(josh::graphql::repo_schema(
1796+
let context = {
1797+
let upstream_repo = upstream_repo.clone();
1798+
let serv = serv.clone();
1799+
1800+
tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
1801+
let transaction_mirror = josh::cache::Transaction::open(
1802+
&serv.repo_path.join("mirror"),
1803+
Some(&format!(
1804+
"refs/josh/upstream/{}/",
1805+
&josh::to_ns(&upstream_repo),
1806+
)),
1807+
)?;
1808+
1809+
let transaction =
1810+
josh::cache::Transaction::open(&serv.repo_path.join("overlay"), None)?;
1811+
transaction.repo().odb()?.add_disk_alternate(
1812+
&serv
1813+
.repo_path
1814+
.join("mirror")
1815+
.join("objects")
1816+
.display()
1817+
.to_string(),
1818+
)?;
1819+
1820+
Ok(Arc::new(josh::graphql::context(
1821+
transaction,
1822+
transaction_mirror,
1823+
)))
1824+
})
1825+
.await??
1826+
};
1827+
1828+
let root_node = Arc::new(josh::graphql::repo_schema(
17751829
upstream_repo
17761830
.strip_suffix(".git")
17771831
.unwrap_or(&upstream_repo)
17781832
.to_string(),
17791833
false,
17801834
));
17811835

1836+
let run_request = |span: tracing::Span| {
1837+
let context = context.clone();
1838+
let parsed_request = parsed_request.clone();
1839+
let root_node = root_node.clone();
1840+
1841+
tokio::task::spawn_blocking(move || {
1842+
let _span_guard = span.enter();
1843+
1844+
let parsed_request = parsed_request.lock().unwrap();
1845+
let result = parsed_request.execute_sync(&root_node, &context);
1846+
1847+
let response_code = if result.is_ok() {
1848+
StatusCode::OK
1849+
} else {
1850+
StatusCode::BAD_REQUEST
1851+
};
1852+
1853+
let response_json = serde_json::to_string_pretty(&result)
1854+
.expect("bug: failed to serialize GraphQL response");
1855+
1856+
(response_code, response_json)
1857+
})
1858+
};
1859+
17821860
let remote_auth = RemoteAuth::Http { auth };
1783-
let res = {
1861+
let (response_code, response_json) = {
17841862
// First attempt to serve GraphQL query. If we can serve it
17851863
// that means all requested revisions were specified by SHA and we could find
17861864
// all of them locally, so no need to fetch.
1787-
let res = parsed.execute(&root_node, &context).await;
1865+
let execute_span = tracing::info_span!("execute_1");
1866+
let (response_code, response_json) = run_request(execute_span).await?;
17881867

17891868
// The "allow_refs" flag will be set by the query handler if we need to do a fetch
17901869
// to complete the query.
17911870
if !*context.allow_refs.lock().unwrap() {
1792-
res
1871+
(response_code, response_json)
17931872
} else {
17941873
match fetch_upstream(
17951874
serv.clone(),
@@ -1820,26 +1899,21 @@ async fn serve_graphql(
18201899
}
18211900
};
18221901

1823-
parsed.execute(&root_node, &context).await
1902+
let execute_span = tracing::info_span!("execute_2");
1903+
run_request(execute_span).await?
18241904
}
18251905
};
18261906

1827-
let code = if res.is_ok() {
1828-
hyper::StatusCode::OK
1829-
} else {
1830-
hyper::StatusCode::BAD_REQUEST
1907+
let response = {
1908+
let mut response = Response::new(hyper::Body::from(response_json));
1909+
*response.status_mut() = response_code;
1910+
response.headers_mut().insert(
1911+
hyper::header::CONTENT_TYPE,
1912+
hyper::header::HeaderValue::from_static("application/json"),
1913+
);
1914+
response
18311915
};
18321916

1833-
let body = hyper::Body::from(serde_json::to_string_pretty(&res).unwrap());
1834-
let mut resp = Response::new(hyper::Body::empty());
1835-
*resp.status_mut() = code;
1836-
resp.headers_mut().insert(
1837-
hyper::header::CONTENT_TYPE,
1838-
hyper::header::HeaderValue::from_static("application/json"),
1839-
);
1840-
*resp.body_mut() = body;
1841-
let gql_result = resp;
1842-
18431917
tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
18441918
let temp_ns = Arc::new(josh_proxy::TmpGitNamespace::new(
18451919
&serv.repo_path.join("overlay"),
@@ -1884,7 +1958,8 @@ async fn serve_graphql(
18841958
})
18851959
.in_current_span()
18861960
.await??;
1887-
Ok(gql_result)
1961+
1962+
Ok(response)
18881963
}
18891964

18901965
async fn shutdown_signal() {

0 commit comments

Comments
 (0)