Skip to content

Commit 8d70a6f

Browse files
committed
server: Async-ify the indexing status server
1 parent 42b903f commit 8d70a6f

File tree

2 files changed

+115
-132
lines changed

2 files changed

+115
-132
lines changed

server/http/tests/server.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ mod test {
153153
client.request(request)
154154
})
155155
.map_ok(|response| {
156-
let errors =
157-
test_utils::assert_error_response(response, StatusCode::OK, true);
156+
let errors = test_utils::assert_error_response(response, StatusCode::OK, true);
158157

159158
let message = errors[0]
160159
.as_object()

server/index-node/src/service.rs

Lines changed: 114 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ impl<Q, S> Clone for IndexNodeService<Q, S> {
3535
}
3636
}
3737

38+
impl<Q, S> CheapClone for IndexNodeService<Q, S> {}
39+
3840
impl<Q, S> IndexNodeService<Q, S>
3941
where
4042
Q: GraphQlRunner,
@@ -50,126 +52,104 @@ where
5052
}
5153
}
5254

53-
fn graphiql_html(&self) -> String {
54-
include_str!("../assets/index.html").into()
55+
fn graphiql_html() -> &'static str {
56+
include_str!("../assets/index.html")
5557
}
5658

5759
/// Serves a static file.
58-
fn serve_file(&self, contents: &'static str) -> IndexNodeServiceResponse {
59-
async move {
60-
Ok(Response::builder()
61-
.status(200)
62-
.body(Body::from(contents))
63-
.unwrap())
64-
}
65-
.boxed()
60+
fn serve_file(contents: &'static str) -> Response<Body> {
61+
Response::builder()
62+
.status(200)
63+
.body(Body::from(contents))
64+
.unwrap()
6665
}
6766

68-
/// Serves a dynamically created file.
69-
fn serve_dynamic_file(&self, contents: String) -> IndexNodeServiceResponse {
70-
async {
71-
Ok(Response::builder()
72-
.status(200)
73-
.body(Body::from(contents))
74-
.unwrap())
75-
}
76-
.boxed()
67+
fn index() -> Response<Body> {
68+
Response::builder()
69+
.status(200)
70+
.body(Body::from("OK"))
71+
.unwrap()
7772
}
7873

79-
fn index(&self) -> IndexNodeServiceResponse {
80-
async {
81-
Ok(Response::builder()
82-
.status(200)
83-
.body(Body::from("OK"))
84-
.unwrap())
85-
}
86-
.boxed()
74+
fn handle_graphiql() -> Response<Body> {
75+
Self::serve_file(Self::graphiql_html())
8776
}
8877

89-
fn handle_graphiql(&self) -> IndexNodeServiceResponse {
90-
self.serve_dynamic_file(self.graphiql_html())
91-
}
92-
93-
fn handle_graphql_query(&self, request_body: Body) -> IndexNodeServiceResponse {
78+
async fn handle_graphql_query(
79+
&self,
80+
request_body: Body,
81+
) -> Result<Response<Body>, GraphQLServerError> {
9482
let logger = self.logger.clone();
9583
let store = self.store.clone();
9684
let graphql_runner = self.graphql_runner.clone();
9785

9886
// Obtain the schema for the index node GraphQL API
9987
let schema = SCHEMA.clone();
10088

101-
hyper::body::to_bytes(request_body)
89+
let body = hyper::body::to_bytes(request_body)
10290
.map_err(|_| GraphQLServerError::InternalError("Failed to read request body".into()))
103-
.and_then(move |body| IndexNodeRequest::new(body, schema).compat())
104-
.and_then(move |query| {
105-
let logger = logger.clone();
106-
let graphql_runner = graphql_runner.clone();
107-
let load_manager = graphql_runner.load_manager();
108-
109-
// Run the query using the index node resolver
110-
tokio::task::block_in_place(|| {
111-
let options = QueryExecutionOptions {
112-
logger: logger.clone(),
113-
resolver: IndexNodeResolver::new(&logger, graphql_runner, store),
114-
deadline: None,
115-
max_first: std::u32::MAX,
116-
load_manager,
117-
};
118-
let result = PreparedQuery::new(query, None, 100).map(|query| {
119-
// Index status queries are not cacheable, so we may unwrap this.
120-
Arc::try_unwrap(execute_query(query, None, None, options)).unwrap()
121-
});
122-
123-
futures03::future::ok(QueryResult::from(result))
124-
})
125-
})
126-
.map_ok(|result| {
127-
result.as_http_response()
128-
})
129-
.boxed()
91+
.await?;
92+
93+
let query = IndexNodeRequest::new(body, schema).compat().await?;
94+
95+
let logger = logger.clone();
96+
let graphql_runner = graphql_runner.clone();
97+
let load_manager = graphql_runner.load_manager();
98+
99+
// Run the query using the index node resolver
100+
let result = tokio::task::spawn_blocking(move || {
101+
let options = QueryExecutionOptions {
102+
logger: logger.clone(),
103+
resolver: IndexNodeResolver::new(&logger, graphql_runner, store),
104+
deadline: None,
105+
max_first: std::u32::MAX,
106+
load_manager,
107+
};
108+
QueryResult::from(PreparedQuery::new(query, None, 100).map(|query| {
109+
// Index status queries are not cacheable, so we may unwrap this.
110+
Arc::try_unwrap(execute_query(query, None, None, options)).unwrap()
111+
}))
112+
})
113+
.await
114+
.unwrap();
115+
Ok(result.as_http_response())
130116
}
131117

132118
// Handles OPTIONS requests
133-
fn handle_graphql_options(&self, _request: Request<Body>) -> IndexNodeServiceResponse {
134-
Box::pin(async {
135-
Ok(Response::builder()
136-
.status(200)
137-
.header("Access-Control-Allow-Origin", "*")
138-
.header("Access-Control-Allow-Headers", "Content-Type, User-Agent")
139-
.header("Access-Control-Allow-Methods", "GET, OPTIONS, POST")
140-
.body(Body::from(""))
141-
.unwrap())
142-
})
119+
fn handle_graphql_options(_request: Request<Body>) -> Response<Body> {
120+
Response::builder()
121+
.status(200)
122+
.header("Access-Control-Allow-Origin", "*")
123+
.header("Access-Control-Allow-Headers", "Content-Type, User-Agent")
124+
.header("Access-Control-Allow-Methods", "GET, OPTIONS, POST")
125+
.body(Body::from(""))
126+
.unwrap()
143127
}
144128

145129
/// Handles 302 redirects
146-
fn handle_temp_redirect(&self, destination: &str) -> IndexNodeServiceResponse {
147-
Box::pin(futures03::future::ready(
148-
header::HeaderValue::from_str(destination)
149-
.map_err(|_| {
150-
GraphQLServerError::ClientError("invalid characters in redirect URL".into())
151-
})
152-
.map(|loc_header_val| {
153-
Response::builder()
154-
.status(StatusCode::FOUND)
155-
.header(header::LOCATION, loc_header_val)
156-
.body(Body::from("Redirecting..."))
157-
.unwrap()
158-
}),
159-
))
130+
fn handle_temp_redirect(destination: &str) -> Result<Response<Body>, GraphQLServerError> {
131+
header::HeaderValue::from_str(destination)
132+
.map_err(|_| {
133+
GraphQLServerError::ClientError("invalid characters in redirect URL".into())
134+
})
135+
.map(|loc_header_val| {
136+
Response::builder()
137+
.status(StatusCode::FOUND)
138+
.header(header::LOCATION, loc_header_val)
139+
.body(Body::from("Redirecting..."))
140+
.unwrap()
141+
})
160142
}
161143

162144
/// Handles 404s.
163-
fn handle_not_found(&self) -> IndexNodeServiceResponse {
164-
Box::pin(futures03::future::ok(
165-
Response::builder()
166-
.status(StatusCode::NOT_FOUND)
167-
.body(Body::from("Not found"))
168-
.unwrap(),
169-
))
145+
fn handle_not_found() -> Response<Body> {
146+
Response::builder()
147+
.status(StatusCode::NOT_FOUND)
148+
.body(Body::from("Not found"))
149+
.unwrap()
170150
}
171151

172-
fn handle_call(&mut self, req: Request<Body>) -> IndexNodeServiceResponse {
152+
async fn handle_call(self, req: Request<Body>) -> Result<Response<Body>, GraphQLServerError> {
173153
let method = req.method().clone();
174154

175155
let path = req.uri().path().to_owned();
@@ -183,24 +163,24 @@ where
183163
};
184164

185165
match (method, path_segments.as_slice()) {
186-
(Method::GET, [""]) => self.index(),
166+
(Method::GET, [""]) => Ok(Self::index()),
187167
(Method::GET, ["graphiql.css"]) => {
188-
self.serve_file(include_str!("../assets/graphiql.css"))
168+
Ok(Self::serve_file(include_str!("../assets/graphiql.css")))
189169
}
190170
(Method::GET, ["graphiql.min.js"]) => {
191-
self.serve_file(include_str!("../assets/graphiql.min.js"))
171+
Ok(Self::serve_file(include_str!("../assets/graphiql.min.js")))
192172
}
193173

194174
(Method::GET, path @ ["graphql"]) => {
195175
let dest = format!("/{}/playground", path.join("/"));
196-
self.handle_temp_redirect(&dest)
176+
Self::handle_temp_redirect(&dest)
197177
}
198-
(Method::GET, ["graphql", "playground"]) => self.handle_graphiql(),
178+
(Method::GET, ["graphql", "playground"]) => Ok(Self::handle_graphiql()),
199179

200-
(Method::POST, ["graphql"]) => self.handle_graphql_query(req.into_body()),
201-
(Method::OPTIONS, ["graphql"]) => self.handle_graphql_options(req),
180+
(Method::POST, ["graphql"]) => self.handle_graphql_query(req.into_body()).await,
181+
(Method::OPTIONS, ["graphql"]) => Ok(Self::handle_graphql_options(req)),
202182

203-
_ => self.handle_not_found(),
183+
_ => Ok(Self::handle_not_found()),
204184
}
205185
}
206186
}
@@ -223,35 +203,39 @@ where
223203

224204
// Returning Err here will prevent the client from receiving any response.
225205
// Instead, we generate a Response with an error code and return Ok
226-
Box::pin(self.handle_call(req).map(move |result| match result {
227-
Ok(response) => Ok(response),
228-
Err(err @ GraphQLServerError::ClientError(_)) => {
229-
debug!(logger, "IndexNodeService call failed: {}", err);
230-
231-
Ok(Response::builder()
232-
.status(400)
233-
.header("Content-Type", "text/plain")
234-
.body(Body::from(format!("Invalid request: {}", err)))
235-
.unwrap())
236-
}
237-
Err(err @ GraphQLServerError::QueryError(_)) => {
238-
error!(logger, "IndexNodeService call failed: {}", err);
239-
240-
Ok(Response::builder()
241-
.status(400)
242-
.header("Content-Type", "text/plain")
243-
.body(Body::from(format!("Query error: {}", err)))
244-
.unwrap())
245-
}
246-
Err(err @ GraphQLServerError::InternalError(_)) => {
247-
error!(logger, "IndexNodeService call failed: {}", err);
248-
249-
Ok(Response::builder()
250-
.status(500)
251-
.header("Content-Type", "text/plain")
252-
.body(Body::from(format!("Internal server error: {}", err)))
253-
.unwrap())
254-
}
255-
}))
206+
Box::pin(
207+
self.cheap_clone()
208+
.handle_call(req)
209+
.map(move |result| match result {
210+
Ok(response) => Ok(response),
211+
Err(err @ GraphQLServerError::ClientError(_)) => {
212+
debug!(logger, "IndexNodeService call failed: {}", err);
213+
214+
Ok(Response::builder()
215+
.status(400)
216+
.header("Content-Type", "text/plain")
217+
.body(Body::from(format!("Invalid request: {}", err)))
218+
.unwrap())
219+
}
220+
Err(err @ GraphQLServerError::QueryError(_)) => {
221+
error!(logger, "IndexNodeService call failed: {}", err);
222+
223+
Ok(Response::builder()
224+
.status(400)
225+
.header("Content-Type", "text/plain")
226+
.body(Body::from(format!("Query error: {}", err)))
227+
.unwrap())
228+
}
229+
Err(err @ GraphQLServerError::InternalError(_)) => {
230+
error!(logger, "IndexNodeService call failed: {}", err);
231+
232+
Ok(Response::builder()
233+
.status(500)
234+
.header("Content-Type", "text/plain")
235+
.body(Body::from(format!("Internal server error: {}", err)))
236+
.unwrap())
237+
}
238+
}),
239+
)
256240
}
257241
}

0 commit comments

Comments
 (0)