Skip to content

Commit c45353d

Browse files
committed
feat: Allow the http module to be used without a tcp listener
1 parent c56510d commit c45353d

File tree

2 files changed

+165
-100
lines changed

2 files changed

+165
-100
lines changed

scripts/sync_publish.sh

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/bin/bash
2+
3+
# Usage: sync_publish /path/to/crate -f
4+
#
5+
# Publish a crate and wait for it to become available.
6+
#
7+
# https://gist.github.com/Riateche/a1c500fe760a2b9190beb0a7134db82d
8+
9+
set -e
10+
set -o pipefail
11+
12+
TMP_DIR=/tmp/test1
13+
14+
DIR="$1"
15+
FORCE="$2"
16+
17+
NAME=$(grep '^name' "$DIR/Cargo.toml" | sed 's/name = "\([^"]*\)"/\1/')
18+
cd "$DIR"
19+
20+
VERSION=$(cargo metadata --format-version 1 2>/dev/null | jq -r '.packages[] | select(.name=="'$NAME'").version')
21+
22+
rm -rf "$TMP_DIR"
23+
cargo new "$TMP_DIR" > /dev/null 2>&1
24+
cd "$TMP_DIR"
25+
cargo add "$NAME" --vers "=$VERSION" > /dev/null 2>&1
26+
if cargo generate-lockfile > /dev/null 2>&1; then
27+
echo "$NAME=$VERSION already exists, skipping."
28+
exit 0
29+
fi
30+
31+
echo "Publishing $NAME=$VERSION"
32+
if [ "$FORCE" != "-f" ]; then
33+
echo "This is a dry run. Run with -f to publish."
34+
exit 0
35+
fi
36+
37+
cd "$DIR"
38+
cargo publish
39+
40+
cd "$TMP_DIR"
41+
while ! cargo generate-lockfile > /dev/null 2>&1; do
42+
echo "Waiting for crate to be published..."
43+
sleep 1
44+
done

src/std_lib/http.rs

Lines changed: 121 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@ use {
2323

2424
use crate::base::types::{ArcType, Type};
2525

26-
use crate::vm::{
27-
self,
28-
api::{
29-
generic, Collect, Eff, Function, Getable, OpaqueValue, PushAsRef, Pushable, VmType, WithVM,
30-
IO,
26+
use crate::{
27+
vm::{
28+
self,
29+
api::{
30+
generic, Collect, Eff, Function, Getable, OpaqueValue, PushAsRef, Pushable, VmType,
31+
WithVM, IO,
32+
},
33+
thread::{ActiveThread, RootedThread, Thread},
34+
ExternModule, Variants,
3135
},
32-
thread::{ActiveThread, RootedThread, Thread},
33-
ExternModule, Variants,
36+
Error,
3437
};
3538

3639
macro_rules! try_future {
@@ -45,7 +48,7 @@ macro_rules! try_future {
4548
};
4649
}
4750

48-
struct HttpEffect;
51+
pub struct HttpEffect;
4952
impl VmType for HttpEffect {
5053
type Type = Self;
5154
fn make_type(vm: &Thread) -> ArcType {
@@ -59,9 +62,9 @@ impl VmType for HttpEffect {
5962
}
6063
}
6164

62-
type Handler<T> = Eff<HttpEffect, T>;
65+
pub type EffectHandler<T> = Eff<HttpEffect, T>;
6366

64-
struct Headers(HeaderMap);
67+
pub struct Headers(HeaderMap);
6568

6669
impl VmType for Headers {
6770
type Type = Vec<(String, Vec<u8>)>;
@@ -207,7 +210,7 @@ type Request = record_type! {
207210
body => Body
208211
};
209212

210-
type Response = record_type! {
213+
pub type Response = record_type! {
211214
status => u16,
212215
headers => Headers
213216
};
@@ -226,7 +229,7 @@ struct Settings {
226229

227230
fn listen(
228231
settings: Settings,
229-
WithVM { vm, value }: WithVM<OpaqueValue<RootedThread, Handler<Response>>>,
232+
WithVM { vm, value }: WithVM<OpaqueValue<RootedThread, EffectHandler<Response>>>,
230233
) -> impl Future<Output = IO<()>> + Send + 'static {
231234
let vm = vm.root_thread();
232235
listen_(settings, vm, value).map(IO::from)
@@ -235,103 +238,32 @@ fn listen(
235238
async fn listen_(
236239
settings: Settings,
237240
thread: RootedThread,
238-
handler: OpaqueValue<RootedThread, Handler<Response>>,
241+
handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
239242
) -> vm::Result<()> {
240243
let thread = match thread.new_thread() {
241244
Ok(thread) => thread,
242245
Err(err) => return Err(err),
243246
};
244247

245-
// Retrieve the `handle` function from the http module which we use to evaluate values of type
246-
// `Handler Response`
247-
type ListenFn = fn(OpaqueValue<RootedThread, Handler<Response>>, HttpState) -> IO<Response>;
248-
let handle: Function<RootedThread, ListenFn> = thread
249-
.get_global("std.http.handle")
250-
.unwrap_or_else(|err| panic!("{}", err));
251-
252-
struct Listen {
253-
handle: Function<RootedThread, ListenFn>,
254-
handler: OpaqueValue<RootedThread, Handler<Response>>,
255-
}
256-
257-
impl tower_service::Service<hyper::Request<hyper::Body>> for Listen {
248+
impl tower_service::Service<hyper::Request<hyper::Body>> for Handler {
258249
type Response = hyper::Response<hyper::Body>;
259-
type Error = vm::Error;
260-
type Future = BoxFuture<'static, Result<http::Response<hyper::Body>, vm::Error>>;
250+
type Error = Error;
251+
type Future = BoxFuture<'static, Result<http::Response<hyper::Body>, Error>>;
261252

262253
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
263254
Ok(()).into()
264255
}
265256

266257
fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
267258
let (parts, body) = request.into_parts();
268-
let gluon_request = record_no_decl! {
269-
method => parts.method.as_str().to_owned(),
270-
uri => Uri(parts.uri),
271-
// Since `Body` implements `Userdata` it can be directly pushed to gluon
272-
body => Body(Arc::new(Mutex::new(Box::pin(
273-
body
274-
.map_err(|err| vm::Error::Message(format!("{}", err)))
275-
// `PushAsRef` makes the `body` parameter act as a `&[u8]` which means it is
276-
// marshalled to `Array Byte` in gluon
277-
.map_ok(PushAsRef::<_, [u8]>::new)
278-
))))
279-
};
280-
let (response_sender, response_body) = hyper::Body::channel();
281-
let response_sender = Arc::new(Mutex::new(Some(response_sender)));
282-
let http_state = record_no_decl! {
283-
request => gluon_request,
284-
response => ResponseBody(response_sender.clone())
285-
};
286-
287-
let child_thread = try_future!(self.handle.vm().new_thread());
288-
let mut handle = try_future!(self.handle.re_root(child_thread));
289-
290-
let handler = self.handler.clone();
291-
Box::pin(async move {
292-
handle
293-
.call_async(handler, http_state)
294-
.map(move |result| match result {
295-
Ok(value) => {
296-
match value {
297-
IO::Value(record_p! { status, headers }) => {
298-
// Drop the sender to so that it the receiver stops waiting for
299-
// more chunks
300-
*response_sender.lock().unwrap() = None;
301-
302-
let status = StatusCode::from_u16(status)
303-
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
304-
let mut response = http::Response::builder()
305-
.status(status)
306-
.body(response_body)
307-
.unwrap();
308-
*response.headers_mut() = headers.0;
309-
Ok(response)
310-
}
311-
IO::Exception(err) => {
312-
info!("{}", err);
313-
Ok(http::Response::builder()
314-
.status(StatusCode::INTERNAL_SERVER_ERROR)
315-
.body("".into())
316-
.unwrap())
317-
}
318-
}
319-
}
320-
Err(err) => {
321-
info!("{}", err);
322-
Ok(http::Response::builder()
323-
.status(StatusCode::INTERNAL_SERVER_ERROR)
324-
.body("".into())
325-
.unwrap())
326-
}
327-
})
328-
.await
329-
})
259+
self.handle(parts.method, parts.uri, body)
330260
}
331261
}
332262

333263
let addr = format!("0.0.0.0:{}", settings.port).parse().unwrap();
334264

265+
let listener = Handler::new(&thread, handler);
266+
335267
if let Some(cert_path) = &settings.tls_cert {
336268
let identity = fs::read(cert_path).map_err(|err| {
337269
vm::Error::Message(format!(
@@ -350,10 +282,10 @@ async fn listen_(
350282

351283
let http = hyper::server::conn::Http::new();
352284

353-
let mut listener = tokio::net::TcpListener::bind(&addr)
285+
let mut tcp_listener = tokio::net::TcpListener::bind(&addr)
354286
.map_err(|err| vm::Error::Message(err.to_string()))
355287
.await?;
356-
let incoming = listener.incoming().err_into().and_then(|stream| {
288+
let incoming = tcp_listener.incoming().err_into().and_then(|stream| {
357289
acceptor.accept(stream).map_err(|err| {
358290
info!("Unable to accept TLS connection: {}", err);
359291
Box::new(err) as Box<dyn ::std::error::Error + Send + Sync>
@@ -382,27 +314,116 @@ async fn listen_(
382314

383315
return hyper::server::Builder::new(Acceptor { incoming }, http)
384316
.serve(hyper::service::make_service_fn(move |_| {
385-
future::ready(Ok::<_, hyper::Error>(Listen {
386-
handle: handle.clone(),
387-
handler: handler.clone(),
388-
}))
317+
future::ready(Ok::<_, hyper::Error>(listener.clone()))
389318
}))
390319
.map_err(|err| vm::Error::from(format!("Server error: {}", err)))
391320
.await;
392321
}
393322

394323
Server::bind(&addr)
395324
.serve(hyper::service::make_service_fn(move |_| {
396-
future::ready(Ok::<_, hyper::Error>(Listen {
397-
handle: handle.clone(),
398-
handler: handler.clone(),
399-
}))
325+
future::ready(Ok::<_, hyper::Error>(listener.clone()))
400326
}))
401327
.map_err(|err| vm::Error::from(format!("Server error: {}", err)))
402328
.map_ok(|_| ())
403329
.await
404330
}
405331

332+
type ListenFn = fn(OpaqueValue<RootedThread, EffectHandler<Response>>, HttpState) -> IO<Response>;
333+
334+
#[derive(Clone)]
335+
pub struct Handler {
336+
handle: Function<RootedThread, ListenFn>,
337+
handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
338+
}
339+
340+
impl Handler {
341+
pub fn new(
342+
thread: &Thread,
343+
handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
344+
) -> Self {
345+
// Retrieve the `handle` function from the http module which we use to evaluate values of type
346+
// `EffectHandler Response`
347+
let handle: Function<RootedThread, ListenFn> = thread
348+
.get_global("std.http.handle")
349+
.unwrap_or_else(|err| panic!("{}", err));
350+
Self { handle, handler }
351+
}
352+
353+
pub fn handle<E>(
354+
&mut self,
355+
method: http::Method,
356+
uri: http::Uri,
357+
body: impl Stream<Item = Result<Bytes, E>> + Send + 'static,
358+
) -> BoxFuture<'static, crate::Result<hyper::Response<hyper::Body>>>
359+
where
360+
E: fmt::Display + Send + 'static,
361+
{
362+
let child_thread = try_future!(self.handle.vm().new_thread());
363+
let mut handle = try_future!(self.handle.re_root(child_thread));
364+
365+
let gluon_request = record_no_decl! {
366+
method => method.as_str().to_owned(),
367+
uri => Uri(uri),
368+
// Since `Body` implements `Userdata` it can be directly pushed to gluon
369+
body => Body(Arc::new(Mutex::new(Box::pin(
370+
body
371+
.map_err(|err| vm::Error::Message(format!("{}", err)))
372+
// `PushAsRef` makes the `body` parameter act as a `&[u8]` which means it is
373+
// marshalled to `Array Byte` in gluon
374+
.map_ok(PushAsRef::<_, [u8]>::new)
375+
))))
376+
};
377+
let (response_sender, response_body) = hyper::Body::channel();
378+
let response_sender = Arc::new(Mutex::new(Some(response_sender)));
379+
let http_state = record_no_decl! {
380+
request => gluon_request,
381+
response => ResponseBody(response_sender.clone())
382+
};
383+
384+
let handler = self.handler.clone();
385+
Box::pin(async move {
386+
handle
387+
.call_async(handler, http_state)
388+
.map(move |result| match result {
389+
Ok(value) => {
390+
match value {
391+
IO::Value(record_p! { status, headers }) => {
392+
// Drop the sender to so that it the receiver stops waiting for
393+
// more chunks
394+
*response_sender.lock().unwrap() = None;
395+
396+
let status = StatusCode::from_u16(status)
397+
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
398+
let mut response = http::Response::builder()
399+
.status(status)
400+
.body(response_body)
401+
.unwrap();
402+
*response.headers_mut() = headers.0;
403+
Ok(response)
404+
}
405+
IO::Exception(err) => {
406+
info!("{}", err);
407+
Ok(http::Response::builder()
408+
.status(StatusCode::INTERNAL_SERVER_ERROR)
409+
.body("".into())
410+
.unwrap())
411+
}
412+
}
413+
}
414+
Err(err) => {
415+
info!("{}", err);
416+
Ok(http::Response::builder()
417+
.status(StatusCode::INTERNAL_SERVER_ERROR)
418+
.body("".into())
419+
.unwrap())
420+
}
421+
})
422+
.await
423+
})
424+
}
425+
}
426+
406427
// To let the `http_types` module refer to `Body` and `ResponseBody` we register these types in a
407428
// separate function which is called before loading `http_types`
408429
pub fn load_types(vm: &Thread) -> vm::Result<ExternModule> {

0 commit comments

Comments
 (0)